Convert Python qlref tests to inline expectations

This commit is contained in:
Owen Mansel-Chan
2026-06-15 11:06:48 +01:00
parent d6ade8fe95
commit 0c2df7c7e9
475 changed files with 1612 additions and 1382 deletions

View File

@@ -1 +1 @@
../CallGraph/InlineCallGraphTest.ql
query: ../CallGraph/InlineCallGraphTest.ql

View File

@@ -1 +1 @@
../CallGraph/InlineCallGraphTest.ql
query: ../CallGraph/InlineCallGraphTest.ql

View File

@@ -1 +1 @@
../CallGraph/InlineCallGraphTest.ql
query: ../CallGraph/InlineCallGraphTest.ql

View File

@@ -1 +1 @@
meta/ClassHierarchy/Find.ql
query: meta/ClassHierarchy/Find.ql

View File

@@ -1,5 +1,5 @@
# BAD, do not start class or interface name with lowercase letter
class badName:
class badName: # $ Alert
def hello(self):
print("hello")

View File

@@ -1 +1,2 @@
experimental/Classes/NamingConventionsClasses.ql
query: experimental/Classes/NamingConventionsClasses.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1,7 +1,7 @@
class Test:
# BAD, do not start function name with uppercase letter
def HelloWorld(self):
def HelloWorld(self): # $ Alert
print("hello world")
# GOOD, function name starts with lowercase letter

View File

@@ -1 +1,2 @@
experimental/Functions/NamingConventionsFunctions.ql
query: experimental/Functions/NamingConventionsFunctions.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-022bis/TarSlipImprov.ql
query: experimental/Security/CWE-022bis/TarSlipImprov.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -12,14 +12,14 @@ import os.path
unsafe_filename_tar = sys.argv[2]
safe_filename_tar = "safe_path.tar"
tar = tarfile.open(unsafe_filename_tar)
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
result = []
for member in tar:
if ".." in member.name:
raise ValueError("Path in member name !!!")
result.append(member)
path = unsafe_filename_tar
tar.extractall(path=path, members=result)
tar.extractall(path=path, members=result) # $ Alert[py/tarslip-extended]
tar.close()
@@ -35,27 +35,27 @@ def members_filter1(tarfile):
result.append(member)
return result
tar = tarfile.open(unsafe_filename_tar)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter1(tar))
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
tar.extractall(path=tempfile.mkdtemp(), members=members_filter1(tar)) # $ Alert[py/tarslip-extended]
tar.close()
with tarfile.open(unsafe_filename_tar) as tar:
with tarfile.open(unsafe_filename_tar) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
if ".." in entry.name:
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
def _validate_archive_name(name, target):
if not os.path.abspath(os.path.join(target, name)).startswith(target + os.path.sep):
raise ValueError(f"Provided language pack contains invalid name {name}")
with tarfile.open(unsafe_filename_tar) as tar:
with tarfile.open(unsafe_filename_tar) as tar: # $ Source[py/tarslip-extended]
target = "/tmp/unpack"
for entry in tar:
_validate_archive_name(entry.name, target)
tar.extract(entry, target)
tar.extract(entry, target) # $ Alert[py/tarslip-extended]
def members_filter2(tarfile):
@@ -85,10 +85,10 @@ def _validate_archive_name(name, target):
raise ValueError(f"Provided language pack contains invalid name {name}")
target = "/tmp/unpack"
with tarfile.open(unsafe_filename_tar, "r") as tar:
with tarfile.open(unsafe_filename_tar, "r") as tar: # $ Source[py/tarslip-extended]
for info in tar.getmembers():
_validate_tar_info(info, target)
tar.extractall(target)
tar.extractall(target) # $ Alert[py/tarslip-extended]
def members_filter3(tarfile):
@@ -108,11 +108,11 @@ tar.extractall(path=tempfile.mkdtemp(), members=members_filter3(tar))
tar.close()
tar = tarfile.open(unsafe_filename_tar)
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
tarf = tar.getmembers()
for f in tarf:
if not f.issym():
tar.extractall(path=tempfile.mkdtemp(), members=[f])
tar.extractall(path=tempfile.mkdtemp(), members=[f]) # $ Alert[py/tarslip-extended]
tar.close()
@@ -120,27 +120,27 @@ class MKTar(TarFile):
pass
tarball = unsafe_filename_tar
with MKTar.open(name=tarball) as tar:
with MKTar.open(name=tarball) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
tar._extract_member(entry, entry.name)
tar._extract_member(entry, entry.name) # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
with tarfile.open(tarball) as tar:
tar.extractall()
with tarfile.open(tarball) as tar: # $ Source[py/tarslip-extended]
tar.extractall() # $ Alert[py/tarslip-extended]
tar = tarfile.open(unsafe_filename_tar)
tar.extractall(path=tempfile.mkdtemp(), members=None)
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
tar.extractall(path=tempfile.mkdtemp(), members=None) # $ Alert[py/tarslip-extended]
class MKTar(tarfile.TarFile):
pass
tarball = unsafe_filename_tar
with MKTar.open(name=tarball) as tar:
with MKTar.open(name=tarball) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
tar._extract_member(entry, entry.name)
tar._extract_member(entry, entry.name) # $ Alert[py/tarslip-extended]
@contextmanager
@@ -148,7 +148,7 @@ def py2_tarxz(filename):
with tempfile.TemporaryFile() as tmp:
subprocess.check_call(["xz", "-dc", filename], stdout=tmp.fileno())
tmp.seek(0)
with closing(tarfile.TarFile(fileobj=tmp)) as tf:
with closing(tarfile.TarFile(fileobj=tmp)) as tf: # $ Source[py/tarslip-extended]
yield tf
def unpack_tarball(tar_filename, dest):
@@ -156,7 +156,7 @@ def unpack_tarball(tar_filename, dest):
# Py 2.7 lacks lzma support
tar_cm = py2_tarxz(tar_filename)
else:
tar_cm = closing(tarfile.open(tar_filename))
tar_cm = closing(tarfile.open(tar_filename)) # $ Source[py/tarslip-extended]
base_dir = None
with tar_cm as tarc:
@@ -166,32 +166,32 @@ def unpack_tarball(tar_filename, dest):
base_dir = base_name
elif base_dir != base_name:
print('Unexpected path in %s: %s' % (tar_filename, base_name))
tarc.extractall(dest)
tarc.extractall(dest) # $ Alert[py/tarslip-extended]
return os.path.join(dest, base_dir)
unpack_tarball(unsafe_filename_tar, "/tmp/unpack")
tarball = unsafe_filename_tar
with tarfile.open(name=tarball) as tar:
with tarfile.open(name=tarball) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
tar._extract_member(entry, entry.name)
tar._extract_member(entry, entry.name) # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
with tarfile.open(name=tarball) as tar:
with tarfile.open(name=tarball) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
tar = tarfile.open(tarball)
tar.extractall("/tmp/unpack/")
tar = tarfile.open(tarball) # $ Source[py/tarslip-extended]
tar.extractall("/tmp/unpack/") # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
with tarfile.open(tarball, "r") as tar:
tar.extractall(path="/tmp/unpack/", members=tar)
with tarfile.open(tarball, "r") as tar: # $ Source[py/tarslip-extended]
tar.extractall(path="/tmp/unpack/", members=tar) # $ Alert[py/tarslip-extended]
def members_filter4(tarfile):
@@ -207,8 +207,8 @@ tar.extractall(path=tempfile.mkdtemp(), members=members_filter4(tar))
tar.close()
with tarfile.open(unsafe_filename_tar, "r") as tar:
tar.extractall(path="/tmp/unpack")
with tarfile.open(unsafe_filename_tar, "r") as tar: # $ Source[py/tarslip-extended]
tar.extractall(path="/tmp/unpack") # $ Alert[py/tarslip-extended]
def members_filter5(tarfile):
@@ -228,12 +228,12 @@ filename = unsafe_filename_tar
tmp_dir = "/tmp/"
read_type = "r:gz" if filename.endswith("tgz") else "r"
with tarfile.open(filename, read_type) as corpus_tar:
with tarfile.open(filename, read_type) as corpus_tar: # $ Source[py/tarslip-extended]
members = []
for f in corpus_tar:
if not os.path.isfile(os.path.join(tmp_dir, f.name)):
members.append(f)
corpus_tar.extractall(tmp_dir, members=members)
corpus_tar.extractall(tmp_dir, members=members) # $ Alert[py/tarslip-extended]
def members_filter6(tarfile):
@@ -251,66 +251,66 @@ tar.close()
archive_path = unsafe_filename_tar
target_dir = "/tmp/unpack"
tarfile.open(archive_path, "r").extractall(path=target_dir)
tarfile.open(archive_path, "r").extractall(path=target_dir) # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
with tarfile.open(tarball) as tar:
with tarfile.open(tarball) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
if entry.isfile():
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
with tarfile.open(unsafe_filename_tar) as tar:
with tarfile.open(unsafe_filename_tar) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
if entry.name.startswith("/"):
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
tarball = unsafe_filename_tar
with tarfile.TarFile(tarball, mode="r") as tar:
with tarfile.TarFile(tarball, mode="r") as tar: # $ Source[py/tarslip-extended]
for entry in tar:
if entry.isfile():
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
with tarfile.open(unsafe_filename_tar) as tar:
with tarfile.open(unsafe_filename_tar) as tar: # $ Source[py/tarslip-extended]
for entry in tar:
if os.path.isabs(entry.name):
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")
tar.extract(entry, "/tmp/unpack/") # $ Alert[py/tarslip-extended]
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar:
tar.extractall(path="/tmp/unpack")
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar: # $ Source[py/tarslip-extended]
tar.extractall(path="/tmp/unpack") # $ Alert[py/tarslip-extended]
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=tar.getmembers())
tar = tarfile.open(filename) # $ Source[py/tarslip-extended]
tar.extractall(path=tempfile.mkdtemp(), members=tar.getmembers()) # $ Alert[py/tarslip-extended]
tar.close()
tar = tarfile.open(unsafe_filename_tar)
tar.extractall(path=tempfile.mkdtemp(), members=None)
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
tar.extractall(path=tempfile.mkdtemp(), members=None) # $ Alert[py/tarslip-extended]
tar.extractall(path=tempfile.mkdtemp(), members=members_filter4(tar))
tar.close()
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar:
tar.extractall(path="/tmp/unpack/", members=tar)
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar: # $ Source[py/tarslip-extended]
tar.extractall(path="/tmp/unpack/", members=tar) # $ Alert[py/tarslip-extended]
tar = tarfile.open(unsafe_filename_tar)
tar = tarfile.open(unsafe_filename_tar) # $ Source[py/tarslip-extended]
result = []
for member in tar:
if member.issym():
raise ValueError("But it is a symlink")
result.append(member)
tar.extractall(path=tempfile.mkdtemp(), members=result)
tar.extractall(path=tempfile.mkdtemp(), members=result) # $ Alert[py/tarslip-extended]
tar.close()
archive_path = unsafe_filename_tar
target_dir = "/tmp/unpack"
tarfile.TarFile(unsafe_filename_tar, mode="r").extractall(path=target_dir)
tarfile.TarFile(unsafe_filename_tar, mode="r").extractall(path=target_dir) # $ Alert[py/tarslip-extended]

View File

@@ -1 +1,2 @@
experimental/Security/CWE-022/ZipSlip.ql
query: experimental/Security/CWE-022/ZipSlip.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -5,35 +5,35 @@ import gzip
import zipfile
def unzip(filename):
with tarfile.open(filename) as zipf:
with tarfile.open(filename) as zipf: # $ Alert[py/zipslip]
#BAD : This could write any file on the filesystem.
for entry in zipf:
shutil.move(entry, "/tmp/unpack/")
shutil.move(entry, "/tmp/unpack/") # $ Sink[py/zipslip]
def unzip1(filename):
with gzip.open(filename) as zipf:
with gzip.open(filename) as zipf: # $ Alert[py/zipslip]
#BAD : This could write any file on the filesystem.
for entry in zipf:
shutil.copy2(entry, "/tmp/unpack/")
shutil.copy2(entry, "/tmp/unpack/") # $ Sink[py/zipslip]
def unzip2(filename):
with bz2.open(filename) as zipf:
with bz2.open(filename) as zipf: # $ Alert[py/zipslip]
#BAD : This could write any file on the filesystem.
for entry in zipf:
shutil.copyfile(entry, "/tmp/unpack/")
shutil.copyfile(entry, "/tmp/unpack/") # $ Sink[py/zipslip]
def unzip3(filename):
zf = zipfile.ZipFile(filename)
with zf.namelist() as filelist:
with zf.namelist() as filelist: # $ Alert[py/zipslip]
#BAD : This could write any file on the filesystem.
for x in filelist:
shutil.copy(x, "/tmp/unpack/")
shutil.copy(x, "/tmp/unpack/") # $ Sink[py/zipslip]
def unzip4(filename):
zf = zipfile.ZipFile(filename)
filelist = zf.namelist()
filelist = zf.namelist() # $ Alert[py/zipslip]
for x in filelist:
with zf.open(x) as srcf:
shutil.copyfileobj(x, "/tmp/unpack/")
shutil.copyfileobj(x, "/tmp/unpack/") # $ Sink[py/zipslip]
import tty # to set the import root so we can identify the standard library

View File

@@ -1 +1 @@
experimental/Security/CWE-074/remoteCommandExecution/RemoteCommandExecution.ql
query: experimental/Security/CWE-074/remoteCommandExecution/RemoteCommandExecution.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-079/EmailXss.ql
query: experimental/Security/CWE-079/EmailXss.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -11,7 +11,7 @@ def django_response(request):
https://github.com/django/django/blob/ca9872905559026af82000e46cde6f7dedc897b6/django/core/mail/__init__.py#L64
"""
send_mail("Subject", "plain-text body", "from@example.com",
["to@example.com"], html_message=django.http.request.GET.get("html"))
["to@example.com"], html_message=django.http.request.GET.get("html")) # $ Alert
def django_response(request):
@@ -20,6 +20,6 @@ def django_response(request):
https://github.com/django/django/blob/ca9872905559026af82000e46cde6f7dedc897b6/django/core/mail/__init__.py#L90-L121
"""
mail_admins("Subject", "plain-text body",
html_message=django.http.request.GET.get("html"))
html_message=django.http.request.GET.get("html")) # $ Alert
mail_managers("Subject", "plain-text body",
html_message=django.http.request.GET.get("html"))
html_message=django.http.request.GET.get("html")) # $ Alert

View File

@@ -1,4 +1,4 @@
from flask import request, Flask
from flask import request, Flask # $ Source
from flask_mail import Mail, Message
app = Flask(__name__)
@@ -10,12 +10,12 @@ def send():
sender="from@example.com",
recipients=["to@example.com"],
body="plain-text body",
html=request.args["html"])
html=request.args["html"]) # $ Alert
# The message can contain a body and/or HTML:
msg.body = "plain-text body"
# The email's HTML can be set via msg.html or as an initialize argument when creating a Message object.
msg.html = request.args["html"]
msg.html = request.args["html"] # $ Alert
mail.send(msg)
@@ -28,5 +28,5 @@ def connect():
msg = Message(subject="Subject",
sender="from@example.com",
recipients=["to@example.com"],
html=request.args["html"])
html=request.args["html"]) # $ Alert
conn.send(msg)

View File

@@ -1,4 +1,4 @@
from flask import request, Flask
from flask import request, Flask # $ Source
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Email, To, Content, MimeType, HtmlContent
@@ -11,7 +11,7 @@ def send():
from_email='from_email@example.com',
to_emails='to@example.com',
subject='Sending with Twilio SendGrid is Fun',
html_content=request.args["html_content"])
html_content=request.args["html_content"]) # $ Alert
sg = SendGridAPIClient('SENDGRID_API_KEY')
sg.send(message)
@@ -23,7 +23,7 @@ def send():
from_email='from_email@example.com',
to_emails='to@example.com',
subject='Sending with Twilio SendGrid is Fun',
html_content=HtmlContent(request.args["html_content"]))
html_content=HtmlContent(request.args["html_content"])) # $ Alert
sg = SendGridAPIClient('SENDGRID_API_KEY')
sg.send(message)
@@ -34,7 +34,7 @@ def send_post():
from_email = Email("test@example.com")
to_email = To("test@example.com")
subject = "Sending with SendGrid is Fun"
html_content = Content("text/html", request.args["html_content"])
html_content = Content("text/html", request.args["html_content"]) # $ Alert
plain_content = Content("text/plain", request.args["plain_content"])
mail = Mail(from_email, to_email, subject, plain_content, html_content)

View File

@@ -1,6 +1,6 @@
import sendgrid
import os
from flask import request, Flask
from flask import request, Flask # $ Source
app = Flask(__name__)
@@ -13,7 +13,7 @@ def send():
"content": [
{
"type": "text/html",
"value": "<html>{}</html>".format(request.args["html_content"])
"value": "<html>{}</html>".format(request.args["html_content"]) # $ Alert
}
],
"from": {
@@ -24,7 +24,7 @@ def send():
"mail_settings": {
"footer": {
"enable": True,
"html": "<html>{}</html>".format(request.args["html_footer"]),
"html": "<html>{}</html>".format(request.args["html_footer"]), # $ Alert
"text": "Thanks,/n The SendGrid Team"
},
},
@@ -38,7 +38,7 @@ def send():
"tracking_settings": {
"subscription_tracking": {
"enable": True,
"html": "<html>{}</html>".format(request.args["html_tracking"]),
"html": "<html>{}</html>".format(request.args["html_tracking"]), # $ Alert
"substitution_tag": "<%click here%>",
"text": "If you would like to unsubscribe and stop receiving these emails <% click here %>."
}

View File

@@ -1,5 +1,5 @@
# This test checks that the developer doesn't pass a MIMEText instance to a MIMEMultipart initializer via the subparts parameter.
from flask import Flask, request
from flask import Flask, request # $ Source
import json
import smtplib
import ssl
@@ -21,7 +21,7 @@ def email_person():
# Turn these into plain/html MIMEText objects
part1 = MIMEText(text, "plain")
part2 = MIMEText(html, "html")
part2 = MIMEText(html, "html") # $ Alert
message = MIMEMultipart(_subparts=(part1, part2))
message["Subject"] = "multipart test"

View File

@@ -1,5 +1,5 @@
# This test checks that the developer doesn't pass a MIMEText instance to a MIMEMultipart message.
from flask import Flask, request
from flask import Flask, request # $ Source
import json
import smtplib, ssl
from email.mime.text import MIMEText
@@ -24,7 +24,7 @@ def email_person():
# Turn these into plain/html MIMEText objects
part1 = MIMEText(text, "plain")
part2 = MIMEText(html, "html")
part2 = MIMEText(html, "html") # $ Alert
# Add HTML/plain-text parts to MIMEMultipart message
# The email client will try to render the last part first

View File

@@ -1,3 +1,10 @@
#select
| xslt.py:14:29:14:37 | ControlFlowNode for xslt_root | xslt.py:3:26:3:32 | ControlFlowNode for ImportMember | xslt.py:14:29:14:37 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xslt.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:12:28:12:36 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:12:28:12:36 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:21:29:21:37 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:21:29:21:37 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:31:24:31:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:31:24:31:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:40:24:40:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:40:24:40:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
edges
| xslt.py:3:26:3:32 | ControlFlowNode for ImportMember | xslt.py:3:26:3:32 | ControlFlowNode for request | provenance | |
| xslt.py:3:26:3:32 | ControlFlowNode for request | xslt.py:10:17:10:23 | ControlFlowNode for request | provenance | |
@@ -122,10 +129,10 @@ nodes
| xsltInjection.py:46:38:46:48 | ControlFlowNode for xsltStrings [List element] | semmle.label | ControlFlowNode for xsltStrings [List element] |
| xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | semmle.label | ControlFlowNode for xslt_root |
subpaths
#select
| xslt.py:14:29:14:37 | ControlFlowNode for xslt_root | xslt.py:3:26:3:32 | ControlFlowNode for ImportMember | xslt.py:14:29:14:37 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xslt.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:12:28:12:36 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:12:28:12:36 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:21:29:21:37 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:21:29:21:37 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:31:24:31:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:31:24:31:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:40:24:40:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:40:24:40:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
| xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | This XSLT query depends on $@. | xsltInjection.py:3:26:3:32 | ControlFlowNode for ImportMember | user-provided value |
testFailures
| xslt.py:14:29:14:37 | ControlFlowNode for xslt_root | Unexpected result: Alert |
| xsltInjection.py:12:28:12:36 | ControlFlowNode for xslt_root | Unexpected result: Alert |
| xsltInjection.py:21:29:21:37 | ControlFlowNode for xslt_root | Unexpected result: Alert |
| xsltInjection.py:31:24:31:32 | ControlFlowNode for xslt_root | Unexpected result: Alert |
| xsltInjection.py:40:24:40:32 | ControlFlowNode for xslt_root | Unexpected result: Alert |
| xsltInjection.py:50:24:50:32 | ControlFlowNode for xslt_root | Unexpected result: Alert |

View File

@@ -1 +1,2 @@
experimental/Security/CWE-091/XsltInjection.ql
query: experimental/Security/CWE-091/XsltInjection.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1,6 +1,6 @@
from lxml import etree
from io import StringIO
from flask import Flask, request
from flask import Flask, request # $ Source
app = Flask(__name__)
@@ -11,4 +11,4 @@ def bad():
xslt_root = etree.XML(xsltQuery)
f = StringIO('<foo><bar></bar></foo>')
tree = etree.parse(f)
result_tree = tree.xslt(xslt_root) # Not OK
result_tree = tree.xslt(xslt_root) # Not OK # $ Alert

View File

@@ -1,6 +1,6 @@
from lxml import etree
from io import StringIO
from flask import Flask, request
from flask import Flask, request # $ Source
app = Flask(__name__)
@@ -9,7 +9,7 @@ app = Flask(__name__)
def a():
xsltQuery = request.args.get('xml', '')
xslt_root = etree.XML(xsltQuery)
transform = etree.XSLT(xslt_root) # Not OK
transform = etree.XSLT(xslt_root) # Not OK # $ Alert
@app.route("/xslt2")
@@ -18,7 +18,7 @@ def b():
xslt_root = etree.XML(xsltQuery)
f = StringIO('<foo><bar></bar></foo>')
tree = etree.parse(f)
result_tree = tree.xslt(xslt_root) # Not OK
result_tree = tree.xslt(xslt_root) # Not OK # $ Alert
@app.route("/xslt3")
@@ -28,7 +28,7 @@ def c():
f = StringIO('<foo><bar></bar></foo>')
tree = etree.parse(f)
result = tree.xslt(xslt_root, a="'A'") # Not OK
result = tree.xslt(xslt_root, a="'A'") # Not OK # $ Alert
@app.route("/xslt4")
def d():
@@ -37,7 +37,7 @@ def d():
f = StringIO('<foo><bar></bar></foo>')
tree = etree.parse(f)
result = tree.xslt(xslt_root, a="'A'") # Not OK
result = tree.xslt(xslt_root, a="'A'") # Not OK # $ Alert
@app.route("/xslt5")
def e():
@@ -47,7 +47,7 @@ def e():
f = StringIO('<foo><bar></bar></foo>')
tree = etree.parse(f)
result = tree.xslt(xslt_root, a="'A'") # Not OK
result = tree.xslt(xslt_root, a="'A'") # Not OK # $ Alert
@app.route("/xslt6")

View File

@@ -1 +1,2 @@
experimental/Security/CWE-094/Js2Py.ql
query: experimental/Security/CWE-094/Js2Py.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -6,5 +6,5 @@ bp = flask.Blueprint("app", __name__, url_prefix="/")
@bp.route("/bad")
def bad():
jk = flask.request.form["jk"]
jk = eval_js(f"{jk} f()")
jk = flask.request.form["jk"] # $ Source
jk = eval_js(f"{jk} f()") # $ Alert

View File

@@ -1,3 +1,7 @@
#select
| csv_bad.py:18:24:18:31 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:18:24:18:31 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |
| csv_bad.py:19:25:19:32 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:19:25:19:32 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |
| csv_bad.py:25:46:25:53 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:25:46:25:53 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |
edges
| csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:9:19:9:25 | ControlFlowNode for request | provenance | |
| csv_bad.py:9:19:9:25 | ControlFlowNode for request | csv_bad.py:16:16:16:22 | ControlFlowNode for request | provenance | |
@@ -26,7 +30,3 @@ nodes
| csv_bad.py:24:16:24:38 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| csv_bad.py:25:46:25:53 | ControlFlowNode for csv_data | semmle.label | ControlFlowNode for csv_data |
subpaths
#select
| csv_bad.py:18:24:18:31 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:18:24:18:31 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |
| csv_bad.py:19:25:19:32 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:19:25:19:32 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |
| csv_bad.py:25:46:25:53 | ControlFlowNode for csv_data | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | csv_bad.py:25:46:25:53 | ControlFlowNode for csv_data | Csv injection might include code from $@. | csv_bad.py:9:19:9:25 | ControlFlowNode for ImportMember | this user input |

View File

@@ -1 +1,2 @@
experimental/Security/CWE-1236/CsvInjection.ql
query: experimental/Security/CWE-1236/CsvInjection.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -6,7 +6,7 @@
import copy
import csv
from flask import Flask
from flask import request
from flask import request # $ Source
from typing import List
app = Flask(__name__)
@@ -15,17 +15,17 @@ app = Flask(__name__)
def bad1():
csv_data = request.args.get('csv')
csvWriter = csv.writer(open("test.csv", "wt"))
csvWriter.writerow(csv_data) # bad
csvWriter.writerows(csv_data) # bad
csvWriter.writerow(csv_data) # $ Alert # bad
csvWriter.writerows(csv_data) # $ Alert # bad
return "bad1"
@app.route('/bad2')
def bad2():
csv_data = request.args.get('csv')
csvWriter = csv.DictWriter(f, fieldnames=csv_data) # bad
csvWriter = csv.DictWriter(f, fieldnames=csv_data) # $ Alert # bad
csvWriter.writeheader()
return "bad2"
if __name__ == '__main__':
app.debug = True
app.run()
app.run()

View File

@@ -1 +1 @@
experimental/Security/CWE-176/UnicodeBypassValidation.ql
query: experimental/Security/CWE-176/UnicodeBypassValidation.ql

View File

@@ -1,3 +1,6 @@
#select
| TimingAttackAgainstHash.py:27:24:27:32 | ControlFlowNode for signature | TimingAttackAgainstHash.py:26:17:26:41 | ControlFlowNode for Attribute() | TimingAttackAgainstHash.py:27:24:27:32 | ControlFlowNode for signature | Possible Timing attack against $@ validation. | TimingAttackAgainstHash.py:26:17:26:41 | ControlFlowNode for Attribute() | signature message |
| TimingAttackAgainstHash.py:37:19:37:48 | ControlFlowNode for sign() | TimingAttackAgainstHash.py:30:12:30:47 | ControlFlowNode for Attribute() | TimingAttackAgainstHash.py:37:19:37:48 | ControlFlowNode for sign() | Possible Timing attack against $@ validation. | TimingAttackAgainstHash.py:30:12:30:47 | ControlFlowNode for Attribute() | MAC message |
edges
| TimingAttackAgainstHash.py:26:5:26:13 | ControlFlowNode for signature | TimingAttackAgainstHash.py:27:24:27:32 | ControlFlowNode for signature | provenance | |
| TimingAttackAgainstHash.py:26:17:26:41 | ControlFlowNode for Attribute() | TimingAttackAgainstHash.py:26:5:26:13 | ControlFlowNode for signature | provenance | |
@@ -9,6 +12,3 @@ nodes
| TimingAttackAgainstHash.py:30:12:30:47 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| TimingAttackAgainstHash.py:37:19:37:48 | ControlFlowNode for sign() | semmle.label | ControlFlowNode for sign() |
subpaths
#select
| TimingAttackAgainstHash.py:27:24:27:32 | ControlFlowNode for signature | TimingAttackAgainstHash.py:26:17:26:41 | ControlFlowNode for Attribute() | TimingAttackAgainstHash.py:27:24:27:32 | ControlFlowNode for signature | Possible Timing attack against $@ validation. | TimingAttackAgainstHash.py:26:17:26:41 | ControlFlowNode for Attribute() | signature message |
| TimingAttackAgainstHash.py:37:19:37:48 | ControlFlowNode for sign() | TimingAttackAgainstHash.py:30:12:30:47 | ControlFlowNode for Attribute() | TimingAttackAgainstHash.py:37:19:37:48 | ControlFlowNode for sign() | Possible Timing attack against $@ validation. | TimingAttackAgainstHash.py:30:12:30:47 | ControlFlowNode for Attribute() | MAC message |

View File

@@ -1 +1,2 @@
experimental/Security/CWE-208/TimingAttackAgainstHash/PossibleTimingAttackAgainstHash.ql
query: experimental/Security/CWE-208/TimingAttackAgainstHash/PossibleTimingAttackAgainstHash.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -16,25 +16,25 @@ def UnsafeCmacCheck(actualCmac):
expected = cmac.CMAC(algorithms.AES(key))
expected.update(b"message to authenticate")
expected.finalize()
return actualCmac == expected
return actualCmac == expected
def UnsafeCheckSignature(expected):
message = b'To be signed'
key = RSA.import_key(open('private_key.der').read())
h = SHA256.new(message)
signature = pkcs1_15.new(key).sign(h)
return expected == signature
signature = pkcs1_15.new(key).sign(h) # $ Source[py/possible-timing-attack-against-hash]
return expected == signature # $ Alert[py/possible-timing-attack-against-hash]
def sign(pre_key, msg, alg):
return hmac.new(pre_key, msg, alg).digest()
return hmac.new(pre_key, msg, alg).digest() # $ Source[py/possible-timing-attack-against-hash]
def verifyGood(msg, sig):
return constant_time_string_compare(sig, sign(key, msg, hashlib.sha256)) #good
def verifyBad(msg, sig):
key = "e179017a-62b0-4996-8a38-e91aa9f1"
return sig == sign(key, msg, hashlib.sha256) #bad
return sig == sign(key, msg, hashlib.sha256) # $ Alert[py/possible-timing-attack-against-hash] #bad
def constant_time_string_compare(a, b):
if len(a) != len(b):

View File

@@ -1 +1,2 @@
experimental/Security/CWE-208/TimingAttackAgainstHash/TimingAttackAgainstHash.ql
query: experimental/Security/CWE-208/TimingAttackAgainstHash/TimingAttackAgainstHash.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -11,7 +11,7 @@ app = Flask(__name__)
@app.route('/bad')
def bad():
if not request.headers.get('X-Auth-Token') == "token":
if not request.headers.get('X-Auth-Token') == "token": # $ Alert
raise Exception('bad token')
return 'bad'

View File

@@ -1 +1,2 @@
experimental/Security/CWE-208/TimingAttackAgainstHeaderValue/TimingAttackAgainstHeaderValue.ql
query: experimental/Security/CWE-208/TimingAttackAgainstHeaderValue/TimingAttackAgainstHeaderValue.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-208/TimingAttackAgainstSensitiveInfo/PossibleTimingAttackAgainstSensitiveInfo.ql
query: experimental/Security/CWE-208/TimingAttackAgainstSensitiveInfo/PossibleTimingAttackAgainstSensitiveInfo.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -12,8 +12,8 @@ app = Flask(__name__)
@app.route('/bad', methods = ['POST', 'GET'])
def bad():
if request.method == 'POST':
password = request.form['pwd']
return password == "1234"
password = request.form['pwd'] # $ Source
return password == "1234" # $ Alert
@app.route('/good', methods = ['POST', 'GET'])
def good():

View File

@@ -1 +1,2 @@
experimental/Security/CWE-287-ConstantSecretKey/WebAppConstantSecretKey.ql
query: experimental/Security/CWE-287-ConstantSecretKey/WebAppConstantSecretKey.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -2,7 +2,7 @@ from flask import Flask, session
from flask_session import Session
app = Flask(__name__)
app.config['SECRET_KEY'] = 'CHANGEME'
app.config['SECRET_KEY'] = 'CHANGEME' # $ Alert
Session(app)

View File

@@ -1,11 +1,11 @@
from flask import Flask, session
app = Flask(__name__)
aConstant = 'CHANGEME1'
app.config['SECRET_KEY'] = aConstant
app.secret_key = aConstant
app.config.update(SECRET_KEY=aConstant)
app.config.from_mapping(SECRET_KEY=aConstant)
aConstant = 'CHANGEME1' # $ Source
app.config['SECRET_KEY'] = aConstant # $ Alert
app.secret_key = aConstant # $ Alert
app.config.update(SECRET_KEY=aConstant) # $ Alert
app.config.from_mapping(SECRET_KEY=aConstant) # $ Alert
app.config.from_pyfile("config.py")
app.config.from_object('config.Config')

View File

@@ -4,16 +4,16 @@ import os
import random
FLASK_DEBUG = True
aConstant = 'CHANGEME2'
aConstant = 'CHANGEME2' # $ Source
class Config:
SECRET_KEY = environ.get("envKey")
SECRET_KEY = aConstant
SECRET_KEY = aConstant # $ Alert
SECRET_KEY = os.getenv('envKey')
SECRET_KEY = os.environ.get('envKey')
SECRET_KEY = os.environ.get('envKey', random.randint)
SECRET_KEY = os.getenv('envKey', random.randint)
SECRET_KEY = os.getenv('envKey', aConstant)
SECRET_KEY = os.environ.get('envKey', aConstant)
SECRET_KEY = os.getenv('envKey', aConstant) # $ Alert
SECRET_KEY = os.environ.get('envKey', aConstant) # $ Alert
SECRET_KEY = os.environ['envKey']

View File

@@ -2,4 +2,4 @@
# General Config
FLASK_DEBUG = True
SECRET_KEY = "CHANGEME5"
SECRET_KEY = "CHANGEME5" # $ Alert

View File

@@ -1 +1,2 @@
experimental/Security/CWE-287/ImproperLdapAuth.ql
query: experimental/Security/CWE-287/ImproperLdapAuth.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -16,7 +16,7 @@ def simple_bind_example():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.simple_bind('cn=root')
ldap_connection.simple_bind('cn=root') # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
@@ -30,7 +30,7 @@ def simple_bind_s_example():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.simple_bind_s('cn=root')
ldap_connection.simple_bind_s('cn=root') # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
@@ -44,7 +44,7 @@ def bind_s_example():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.bind_s('cn=root', None)
ldap_connection.bind_s('cn=root', None) # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
@app.route("/bind_s_example")
@@ -57,7 +57,7 @@ def bind_s_example_kwargs():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.bind_s(who='cn=root', cred=None)
ldap_connection.bind_s(who='cn=root', cred=None) # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
@app.route("/bind_example")
@@ -70,7 +70,7 @@ def bind_example():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.bind('cn=root', "")
ldap_connection.bind('cn=root', "") # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
@@ -84,7 +84,7 @@ def bind_example():
search_filter = "(user={})".format(ldap.filter.escape_filter_chars(request.args['search']))
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
ldap_connection.bind(who='cn=root', cred="")
ldap_connection.bind(who='cn=root', cred="") # $ Alert
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)

View File

@@ -16,7 +16,7 @@ def passwordNone():
search_filter = "(user={})".format(escape_filter_chars(request.args['search']))
srv = Server('servername', get_info=ALL)
conn = Connection(srv, 'user_dn', None)
conn = Connection(srv, 'user_dn', None) # $ Alert
status, result, response, _ = conn.search(dn, search_filter)
@@ -30,7 +30,7 @@ def passwordNoneKwargs():
search_filter = "(user={})".format(escape_filter_chars(request.args['search']))
srv = Server('servername', get_info=ALL)
conn = Connection(srv, user='user_dn', password=None)
conn = Connection(srv, user='user_dn', password=None) # $ Alert
status, result, response, _ = conn.search(dn, search_filter)
@app.route("/passwordEmpty")
@@ -43,7 +43,7 @@ def passwordEmpty():
search_filter = "(user={})".format(escape_filter_chars(request.args['search']))
srv = Server('servername', get_info=ALL)
conn = Connection(srv, user='user_dn', password="")
conn = Connection(srv, user='user_dn', password="") # $ Alert
status, result, response, _ = conn.search(dn, search_filter)
@@ -57,7 +57,7 @@ def notPassword():
search_filter = "(user={})".format(escape_filter_chars(request.args['search']))
srv = Server('servername', get_info=ALL)
conn = Connection(srv, user='user_dn')
conn = Connection(srv, user='user_dn') # $ Alert
status, result, response, _ = conn.search(dn, search_filter)

View File

@@ -1,3 +1,9 @@
#select
| test.py:11:9:11:19 | ControlFlowNode for blob_client | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:11:9:11:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:21:9:21:19 | ControlFlowNode for blob_client | test.py:15:27:15:71 | ControlFlowNode for Attribute() | test.py:21:9:21:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:31:9:31:19 | ControlFlowNode for blob_client | test.py:25:24:25:66 | ControlFlowNode for Attribute() | test.py:31:9:31:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:43:9:43:19 | ControlFlowNode for blob_client | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:43:9:43:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:75:9:75:10 | ControlFlowNode for bc | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:75:9:75:10 | ControlFlowNode for bc | Unsafe usage of v1 version of Azure Storage client-side encryption |
edges
| test.py:3:1:3:3 | ControlFlowNode for BSC | test.py:7:19:7:21 | ControlFlowNode for BSC | provenance | |
| test.py:3:1:3:3 | ControlFlowNode for BSC | test.py:35:19:35:21 | ControlFlowNode for BSC | provenance | |
@@ -86,9 +92,3 @@ nodes
| test.py:73:10:73:33 | ControlFlowNode for get_unsafe_blob_client() | semmle.label | ControlFlowNode for get_unsafe_blob_client() |
| test.py:75:9:75:10 | ControlFlowNode for bc | semmle.label | ControlFlowNode for bc |
subpaths
#select
| test.py:11:9:11:19 | ControlFlowNode for blob_client | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:11:9:11:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:21:9:21:19 | ControlFlowNode for blob_client | test.py:15:27:15:71 | ControlFlowNode for Attribute() | test.py:21:9:21:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:31:9:31:19 | ControlFlowNode for blob_client | test.py:25:24:25:66 | ControlFlowNode for Attribute() | test.py:31:9:31:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:43:9:43:19 | ControlFlowNode for blob_client | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:43:9:43:19 | ControlFlowNode for blob_client | Unsafe usage of v1 version of Azure Storage client-side encryption |
| test.py:75:9:75:10 | ControlFlowNode for bc | test.py:3:7:3:51 | ControlFlowNode for Attribute() | test.py:75:9:75:10 | ControlFlowNode for bc | Unsafe usage of v1 version of Azure Storage client-side encryption |

View File

@@ -1 +1,2 @@
experimental/Security/CWE-327/Azure/UnsafeUsageOfClientSideEncryptionVersion.ql
query: experimental/Security/CWE-327/Azure/UnsafeUsageOfClientSideEncryptionVersion.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1,6 +1,6 @@
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient
BSC = BlobServiceClient.from_connection_string(...)
BSC = BlobServiceClient.from_connection_string(...) # $ Source
def unsafe():
# does not set encryption_version to 2.0, default is unsafe
@@ -8,27 +8,27 @@ def unsafe():
blob_client.require_encryption = True
blob_client.key_encryption_key = ...
with open("decryptedcontentfile.txt", "rb") as stream:
blob_client.upload_blob(stream) # BAD
blob_client.upload_blob(stream) # $ Alert # BAD
def unsafe_setting_on_blob_service_client():
blob_service_client = BlobServiceClient.from_connection_string(...)
blob_service_client = BlobServiceClient.from_connection_string(...) # $ Source
blob_service_client.require_encryption = True
blob_service_client.key_encryption_key = ...
blob_client = blob_service_client.get_blob_client(...)
with open("decryptedcontentfile.txt", "rb") as stream:
blob_client.upload_blob(stream)
blob_client.upload_blob(stream) # $ Alert
def unsafe_setting_on_container_client():
container_client = ContainerClient.from_connection_string(...)
container_client = ContainerClient.from_connection_string(...) # $ Source
container_client.require_encryption = True
container_client.key_encryption_key = ...
blob_client = container_client.get_blob_client(...)
with open("decryptedcontentfile.txt", "rb") as stream:
blob_client.upload_blob(stream)
blob_client.upload_blob(stream) # $ Alert
def potentially_unsafe(use_new_version=False):
@@ -40,7 +40,7 @@ def potentially_unsafe(use_new_version=False):
blob_client.encryption_version = '2.0'
with open("decryptedcontentfile.txt", "rb") as stream:
blob_client.upload_blob(stream) # BAD
blob_client.upload_blob(stream) # $ Alert # BAD
def safe():
@@ -72,7 +72,7 @@ def get_unsafe_blob_client():
def unsafe_with_calls():
bc = get_unsafe_blob_client()
with open("decryptedcontentfile.txt", "rb") as stream:
bc.upload_blob(stream) # BAD
bc.upload_blob(stream) # $ Alert # BAD
def get_safe_blob_client():

View File

@@ -2,4 +2,4 @@ import random
def generatePassword():
# BAD: the random is not cryptographically secure
return random.random()
return random.random() # $ Alert

View File

@@ -1 +1,2 @@
experimental/Security/CWE-338/InsecureRandomness.ql
query: experimental/Security/CWE-338/InsecureRandomness.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -4,8 +4,8 @@ def bad():
request = cherrypy.request
validCors = "domain.com"
if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:
origin = request.headers.get('Origin', None)
if origin.startswith(validCors):
origin = request.headers.get('Origin', None) # $ Source
if origin.startswith(validCors): # $ Alert
print("Origin Valid")
def good():

View File

@@ -1 +1,2 @@
experimental/Security/CWE-346/CorsBypass.ql
query: experimental/Security/CWE-346/CorsBypass.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-347/JWTEmptyKeyOrAlgorithm.ql
query: experimental/Security/CWE-347/JWTEmptyKeyOrAlgorithm.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-347/JWTMissingSecretOrPublicKeyVerification.ql
query: experimental/Security/CWE-347/JWTMissingSecretOrPublicKeyVerification.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -8,8 +8,8 @@ jwt.encode({"alg": "HS256"}, token, "key")
JsonWebToken().encode({"alg": "HS256"}, token, "key")
# bad - empty key
jwt.encode({"alg": "HS256"}, token, "")
JsonWebToken().encode({"alg": "HS256"}, token, "")
jwt.encode({"alg": "HS256"}, token, "") # $ Alert[py/jwt-empty-secret-or-algorithm]
JsonWebToken().encode({"alg": "HS256"}, token, "") # $ Alert[py/jwt-empty-secret-or-algorithm]
# Decoding

View File

@@ -7,11 +7,11 @@ jwt.encode(token, "key", "HS256")
jwt.encode(token, key="key", algorithm="HS256")
# bad - both key and algorithm set to None
jwt.encode(token, None, None)
jwt.encode(token, None, None) # $ Alert[py/jwt-empty-secret-or-algorithm]
# bad - empty key
jwt.encode(token, "", algorithm="HS256")
jwt.encode(token, key="", algorithm="HS256")
jwt.encode(token, "", algorithm="HS256") # $ Alert[py/jwt-empty-secret-or-algorithm]
jwt.encode(token, key="", algorithm="HS256") # $ Alert[py/jwt-empty-secret-or-algorithm]
# Decoding
@@ -19,8 +19,8 @@ jwt.encode(token, key="", algorithm="HS256")
jwt.decode(token, "key", "HS256")
# bad - unverified decoding
jwt.decode(token, verify=False)
jwt.decode(token, key, options={"verify_signature": False})
jwt.decode(token, verify=False) # $ Alert[py/jwt-missing-verification]
jwt.decode(token, key, options={"verify_signature": False}) # $ Alert[py/jwt-missing-verification]
# good - verified decoding
jwt.decode(token, verify=True)

View File

@@ -7,8 +7,8 @@ jwt.encode(token, "key", "HS256")
jwt.encode(token, key="key", algorithm="HS256")
# bad - empty key
jwt.encode(token, "", algorithm="HS256")
jwt.encode(token, key="", algorithm="HS256")
jwt.encode(token, "", algorithm="HS256") # $ Alert[py/jwt-empty-secret-or-algorithm]
jwt.encode(token, key="", algorithm="HS256") # $ Alert[py/jwt-empty-secret-or-algorithm]
# Decoding
@@ -16,7 +16,7 @@ jwt.encode(token, key="", algorithm="HS256")
jwt.decode(token, "key", "HS256")
# bad - unverified decoding
jwt.decode(token, key, options={"verify_signature": False})
jwt.decode(token, key, options={"verify_signature": False}) # $ Alert[py/jwt-missing-verification]
# good - verified decoding
jwt.decode(token, key, options={"verify_signature": True})

View File

@@ -11,4 +11,4 @@ def good(token):
def bad(token):
python_jwt.process_jwt(token)
python_jwt.process_jwt(token) # $ Alert[py/jwt-missing-verification]

View File

@@ -1 +1,2 @@
experimental/Security/CWE-348/ClientSuppliedIpUsedInSecurityCheck.ql
query: experimental/Security/CWE-348/ClientSuppliedIpUsedInSecurityCheck.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -10,15 +10,15 @@ app = Flask(__name__)
@app.route('/bad1')
def bad1():
client_ip = request.headers.get('x-forwarded-for')
if not client_ip.startswith('192.168.'):
client_ip = request.headers.get('x-forwarded-for') # $ Source
if not client_ip.startswith('192.168.'): # $ Alert
raise Exception('ip illegal')
return 'bad1'
@app.route('/bad2')
def bad2():
client_ip = request.headers.get('x-forwarded-for')
if not client_ip == '127.0.0.1':
client_ip = request.headers.get('x-forwarded-for') # $ Source
if not client_ip == '127.0.0.1': # $ Alert
raise Exception('ip illegal')
return 'bad2'

View File

@@ -19,8 +19,8 @@ class IndexHandler(tornado.web.RequestHandler):
if client_ip:
client_ip = client_ip.split(',')[len(client_ip.split(',')) - 1]
else:
client_ip = self.request.headers.get('REMOTE_ADDR', None)
if not client_ip == '127.0.0.1':
client_ip = self.request.headers.get('REMOTE_ADDR', None) # $ Source
if not client_ip == '127.0.0.1': # $ Alert
raise Exception('ip illegal')
self.write("hello.")

View File

@@ -1 +1,2 @@
experimental/Security/CWE-522/LdapInsecureAuth.ql
query: experimental/Security/CWE-522/LdapInsecureAuth.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1 +1,2 @@
experimental/Security/CWE-522/LdapInsecureAuth.ql
query: experimental/Security/CWE-522/LdapInsecureAuth.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -42,7 +42,7 @@ def two():
@app.route("/one_bad")
def one_bad():
ldap_connection_7 = ldap.initialize(schema + remote_host)
ldap_connection_7 = ldap.initialize(schema + remote_host) # $ Alert
ldap_connection_7.set_option(ldap.OPT_X_TLS_DEMAND, False)
ldap_connection_7.simple_bind_s('', '')
user = ldap_connection_7.search_s(
@@ -53,7 +53,7 @@ def one_bad():
@app.route("/one_bad_2")
def one_bad_2():
ldap_connection_8 = ldap.initialize(schema + remote_host)
ldap_connection_8 = ldap.initialize(schema + remote_host) # $ Alert
ldap_connection_8.set_option(ldap.OPT_X_TLS_NEVER, True)
ldap_connection_8.simple_bind_s('', '')
user = ldap_connection_8.search_s(

View File

@@ -1,5 +1,5 @@
from ldap3 import Server, Connection, ALL
from flask import request, Flask
from flask import request, Flask # $ Source
app = Flask(__name__)
@@ -98,8 +98,8 @@ def seven():
@app.route("/eight")
def eight():
host = schema + "somethingon.theinternet.com"
srv = Server(host, port=1337)
host = schema + "somethingon.theinternet.com" # $ Source
srv = Server(host, port=1337) # $ Alert
conn = Connection(srv, "dn", "password")
conn.start_tls()
conn.search("dn", "search_filter")
@@ -111,8 +111,8 @@ def eight():
@app.route("/nine")
def nine():
host = schema + "somethingon.theinternet.com"
srv = Server(host, 1337, False)
host = schema + "somethingon.theinternet.com" # $ Source
srv = Server(host, 1337, False) # $ Alert
conn = Connection(srv, "dn", "password")
conn.search("dn", "search_filter")
return conn.response
@@ -123,8 +123,8 @@ def nine():
@app.route("/ten")
def ten():
host = schema + remote_host
srv = Server(host, port=1337, use_ssl=False)
host = schema + remote_host # $ Source
srv = Server(host, port=1337, use_ssl=False) # $ Alert
conn = Connection(srv, "dn", "password")
conn.search("dn", "search_filter")
return conn.response
@@ -136,7 +136,7 @@ def ten():
@app.route("/eleven")
def eleven():
host = schema + request.args['host']
srv = Server(host, port=1337)
srv = Server(host, port=1337) # $ Alert
conn = Connection(srv, "dn", "password")
conn.search("dn", "search_filter")
return conn.response

View File

@@ -1 +1,2 @@
experimental/Security/CWE-611/SimpleXmlRpcServer.ql
query: experimental/Security/CWE-611/SimpleXmlRpcServer.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -4,7 +4,7 @@ def foo(n: str):
print("foo called with arg:", n, type(n))
return "ok"
server = SimpleXMLRPCServer(("127.0.0.1", 8000))
server = SimpleXMLRPCServer(("127.0.0.1", 8000)) # $ Alert
server.register_function(foo, "foo")
server.serve_forever()

View File

@@ -1 +1,2 @@
experimental/Security/CWE-770/UnicodeDoS.ql
query: experimental/Security/CWE-770/UnicodeDoS.ql
postprocess: utils/test/InlineExpectationsTestQuery.ql

View File

@@ -1,4 +1,4 @@
from flask import Flask, jsonify, request
from flask import Flask, jsonify, request # $ Source
import unicodedata
app = Flask(__name__)
@@ -13,7 +13,7 @@ def bad_1():
# Normalize the file path using NFKC Unicode normalization
return (
unicodedata.normalize("NFKC", file_path),
unicodedata.normalize("NFKC", file_path), # $ Alert
200,
{"Content-Type": "application/octet-stream"},
)
@@ -25,7 +25,7 @@ def bad_2():
if len(r) >= 10:
# Normalize the r using NFKD Unicode normalization
r = unicodedata.normalize("NFKD", r)
r = unicodedata.normalize("NFKD", r) # $ Alert
return r, 200, {"Content-Type": "application/octet-stream"}
else:
return jsonify({"error": "File not found"}), 404
@@ -37,7 +37,7 @@ def bad_3():
length = len(r)
if length >= 1_000:
# Normalize the r using NFKD Unicode normalization
r = unicodedata.normalize("NFKD", r)
r = unicodedata.normalize("NFKD", r) # $ Alert
return r, 200, {"Content-Type": "application/octet-stream"}
else:
return jsonify({"error": "File not found"}), 404
@@ -49,7 +49,7 @@ def bad_4():
length = len(r)
if 1_000 <= length:
# Normalize the r using NFKD Unicode normalization
r = unicodedata.normalize("NFKD", r)
r = unicodedata.normalize("NFKD", r) # $ Alert
return r, 200, {"Content-Type": "application/octet-stream"}
else:
return jsonify({"error": "File not found"}), 404
@@ -61,7 +61,7 @@ def bad_5():
length = len(r)
if not length < 1_000:
# Normalize the r using NFKD Unicode normalization
r = unicodedata.normalize("NFKD", r)
r = unicodedata.normalize("NFKD", r) # $ Alert
return r, 200, {"Content-Type": "application/octet-stream"}
else:
return jsonify({"error": "File not found"}), 404
@@ -73,7 +73,7 @@ def bad_6():
length = len(r)
if not 1_000 > length:
# Normalize the r using NFKD Unicode normalization
r = unicodedata.normalize("NFKD", r)
r = unicodedata.normalize("NFKD", r) # $ Alert
return r, 200, {"Content-Type": "application/octet-stream"}
else:
return jsonify({"error": "File not found"}), 404