python: Inline expectation should have space after $

This was a regex-find-replace from `# \$(?! )` (using a negative lookahead) to `# $ `.
This commit is contained in:
Owen Mansel-Chan
2026-03-04 11:42:07 +00:00
parent 0eccd902c2
commit 5a97348e78
61 changed files with 784 additions and 785 deletions

View File

@@ -1,6 +1,6 @@
import requests
import shutil
import os
import os
from flask import Flask, request
app = Flask(__name__)
@@ -16,8 +16,8 @@ def download_from_url():
with open(tarpath, "wb") as f:
f.write(response.raw.read())
untarredpath = "/tmp/tmp123"
shutil.unpack_archive(tarpath, untarredpath) # $result=BAD
shutil.unpack_archive(tarpath, untarredpath) # $ result=BAD
# A source catching an S3 filename download
# see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_file
@@ -31,7 +31,7 @@ bucket_name = "mybucket"
s3 = boto3.client('s3')
s3.download_file(bucket_name, remote_ziped_name, local_ziped_path)
shutil.unpack_archive(local_ziped_path, base_dir) # $result=BAD
shutil.unpack_archive(local_ziped_path, base_dir) # $ result=BAD
# wget
@@ -45,11 +45,11 @@ base_dir = "/tmp/basedir"
# download(url, out, bar) contains out parameter
wget.download(url, compressed_file)
shutil.unpack_archive(compressed_file, base_dir) # $result=BAD
shutil.unpack_archive(compressed_file, base_dir) # $ result=BAD
# download(url) returns filename
compressed_file = wget.download(url)
shutil.unpack_archive(compressed_file, base_dir) # $result=BAD
shutil.unpack_archive(compressed_file, base_dir) # $ result=BAD
# A source coming from a CLI argparse module
@@ -63,7 +63,7 @@ parser.add_argument('filename', help='filename to be provided')
args = parser.parse_args()
compressed_file = args.filename
shutil.unpack_archive(compressed_file, base_dir) # $result=BAD
shutil.unpack_archive(compressed_file, base_dir) # $ result=BAD
# A source coming from a CLI and downloaded
@@ -83,8 +83,8 @@ response = requests.get(url_filename, stream=True)
tarpath = "/tmp/tmp456/tarball.tar.gz"
with open(tarpath, "wb") as f:
f.write(response.raw.read())
shutil.unpack_archive(tarpath, base_dir) # $result=BAD
shutil.unpack_archive(tarpath, base_dir) # $ result=BAD
# the django upload functionality
# see HttpRequest.FILES: https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.HttpRequest.FILES
@@ -97,19 +97,19 @@ def simple_upload(request):
base_dir = "/tmp/baase_dir"
if request.method == 'POST':
# Read uploaded files by chunks of data
# see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
# see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
savepath = os.path.join(base_dir, "tarball_compressed.tar.gz")
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES["ufile1"].chunks():
wfile.write(chunk)
shutil.unpack_archive(savepath, base_dir) # $result=BAD
shutil.unpack_archive(savepath, base_dir) # $ result=BAD
# Write in binary the uploaded tarball
myfile = request.FILES.get("ufile1")
file_path = os.path.join(base_dir, "tarball.tar")
with file_path.open('wb') as f:
f.write(myfile.read())
shutil.unpack_archive(file_path, base_dir) # $result=BAD
shutil.unpack_archive(file_path, base_dir) # $ result=BAD
# Save uploaded files using FileSystemStorage Django API
# see FileSystemStorage: https://docs.djangoproject.com/en/4.1/ref/files/storage/#django.core.files.storage.FileSystemStorage
@@ -117,8 +117,8 @@ def simple_upload(request):
fs = FileSystemStorage()
filename = fs.save(ufile.name, ufile)
uploaded_file_path = fs.path(filename)
shutil.unpack_archive(uploaded_file_path, base_dir) # $result=BAD
shutil.unpack_archive(uploaded_file_path, base_dir) # $ result=BAD
return render(request, 'simple_upload.html')
elif request.method == 'GET':
@@ -126,7 +126,7 @@ def simple_upload(request):
import shutil
import os
import os
import tarfile
import tempfile
import argparse
@@ -139,8 +139,8 @@ parser.add_argument('filename', help='filename to be provided')
args = parser.parse_args()
unsafe_filename_tar = args.filename
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar:
tar.extractall(path="/tmp/unpack/", members=tar) # $result=BAD
tar = tarfile.open(unsafe_filename_tar)
tar.extractall(path="/tmp/unpack/", members=tar) # $ result=BAD
tar = tarfile.open(unsafe_filename_tar)
from django.shortcuts import render
@@ -152,7 +152,7 @@ def simple_upload(request):
base_dir = "/tmp/baase_dir"
if request.method == 'POST':
# Read uploaded files by chunks of data
# see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
# see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
savepath = os.path.join(base_dir, "tarball_compressed.tar.gz")
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES["ufile1"].chunks():
@@ -160,11 +160,11 @@ def simple_upload(request):
tar = tarfile.open(savepath)
result = []
for member in tar:
if member.issym():
raise ValueError("But it is a symlink")
result.append(member)
tar.extractall(path=tempfile.mkdtemp(), members=result) # $result=BAD
for member in tar:
if member.issym():
raise ValueError("But it is a symlink")
result.append(member)
tar.extractall(path=tempfile.mkdtemp(), members=result) # $ result=BAD
tar.close()
@@ -173,7 +173,7 @@ tarpath = "/tmp/tmp456/tarball.tar.gz"
with open(tarpath, "wb") as f:
f.write(response.raw.read())
target_dir = "/tmp/unpack"
tarfile.TarFile(tarpath, mode="r").extractall(path=target_dir) # $result=BAD
tarfile.TarFile(tarpath, mode="r").extractall(path=target_dir) # $ result=BAD
from pathlib import Path
@@ -183,7 +183,7 @@ import boto3
def default_session() -> boto3.Session:
_SESSION = None
if _SESSION is None:
_SESSION = boto3.Session()
_SESSION = boto3.Session()
return _SESSION
cache = False
@@ -198,4 +198,4 @@ with tempfile.NamedTemporaryFile(suffix=".tar.gz") as tmp:
target = cache_dir
else:
target = Path(tempfile.mkdtemp())
shutil.unpack_archive(tmp.name, target) # $result=BAD
shutil.unpack_archive(tmp.name, target) # $ result=BAD

View File

@@ -6,7 +6,7 @@ app = Flask(__name__)
def get_input1():
input = request.args.get("input")
agent = Agent(name="Assistant", instructions="This prompt is customized for " + input) # $Alert[py/prompt-injection]
agent = Agent(name="Assistant", instructions="This prompt is customized for " + input) # $ Alert[py/prompt-injection]
result = Runner.run_sync(agent, "This is a user message.")
print(result.final_output)
@@ -22,9 +22,9 @@ def get_input2():
input=[
{
"role": "user",
"content": input, # $Alert[py/prompt-injection]
"content": input, # $ Alert[py/prompt-injection]
}
]
]
)
result2 = Runner.run_sync(
@@ -32,7 +32,7 @@ def get_input2():
[
{
"role": "user",
"content": input, # $Alert[py/prompt-injection]
"content": input, # $ Alert[py/prompt-injection]
}
]
]
)

View File

@@ -7,7 +7,7 @@ app = Flask(__name__)
@app.route("/unsafe1")
def unsafe1():
user_input = escape(request.args.get("ui"))
normalized_user_input = unicodedata.normalize("NFKC", user_input) # $result=BAD
normalized_user_input = unicodedata.normalize("NFKC", user_input) # $ result=BAD
return render_template("result.html", normalized_user_input=normalized_user_input)
@@ -17,7 +17,7 @@ def unsafe1bis():
if user_input.isascii():
normalized_user_input = user_input
else:
normalized_user_input = unicodedata.normalize("NFC", user_input) # $result=BAD
normalized_user_input = unicodedata.normalize("NFC", user_input) # $ result=BAD
return render_template("result.html", normalized_user_input=normalized_user_input)
@@ -25,6 +25,6 @@ def unsafe1bis():
def safe1():
normalized_user_input = unicodedata.normalize(
"NFKC", request.args.get("ui")
) # $result=OK
) # $ result=OK
user_input = escape(normalized_user_input)
return render_template("result.html", normalized_user_input=user_input)

View File

@@ -2,15 +2,15 @@
# Simple assignment
g = [5] # $writes=g
g = [5] # $ writes=g
# Multiple assignment
g1, g2 = [6], [7] # $writes=g1 writes=g2
g1, g2 = [6], [7] # $ writes=g1 writes=g2
# Assignment that's only referenced in this scope.
unreferenced_g = [8] # $writes=unreferenced_g
unreferenced_g = [8] # $ writes=unreferenced_g
print(unreferenced_g)
# Testing modifications of globals
@@ -24,49 +24,49 @@ g_mod = []
# but currently our analysis thinks `g_mod` might be used in the `print` call
g_mod = [10] # $ SPURIOUS: writes=g_mod
print("foo")
g_mod = [100] # $writes=g_mod
g_mod = [100] # $ writes=g_mod
# Modification by mutation
g_ins = [50] # $writes=g_ins
g_ins = [50] # $ writes=g_ins
print(g_ins)
g_ins.append(75)
# A global with multiple potential definitions
import unknown_module # $writes=unknown_module
import unknown_module # $ writes=unknown_module
if unknown_module.attr:
g_mult = [200] # $writes=g_mult
g_mult = [200] # $ writes=g_mult
else:
g_mult = [300] # $writes=g_mult
g_mult = [300] # $ writes=g_mult
# A global variable that may be redefined depending on some unknown value
g_redef = [400] # $writes=g_redef
g_redef = [400] # $ writes=g_redef
if unknown_module.attr:
g_redef = [500] # $writes=g_redef
g_redef = [500] # $ writes=g_redef
def global_access(): # $writes=global_access
def global_access(): # $ writes=global_access
l = 5
print(g) # $reads=g
print(g1) # $reads=g1
print(g2) # $reads=g2
print(g_mod) # $reads=g_mod
print(g_ins) # $reads=g_ins
print(g_mult) # $reads=g_mult
print(g_redef) # $reads=g_redef
print(g) # $ reads=g
print(g1) # $ reads=g1
print(g2) # $ reads=g2
print(g_mod) # $ reads=g_mod
print(g_ins) # $ reads=g_ins
print(g_mult) # $ reads=g_mult
print(g_redef) # $ reads=g_redef
def print_g_mod(): # $writes=print_g_mod
print(g_mod) # $reads=g_mod
def print_g_mod(): # $ writes=print_g_mod
print(g_mod) # $ reads=g_mod
def global_mod(): # $writes=global_mod
def global_mod(): # $ writes=global_mod
global g_mod
g_mod += [150] # $reads,writes=g_mod
print_g_mod() # $reads=print_g_mod
g_mod += [150] # $ reads,writes=g_mod
print_g_mod() # $ reads=print_g_mod
def global_inside_local_function(): # $writes=global_inside_local_function
def global_inside_local_function(): # $ writes=global_inside_local_function
def local_function():
print(g) # $reads=g
print(g) # $ reads=g
local_function()
## Imports
@@ -74,24 +74,24 @@ def global_inside_local_function(): # $writes=global_inside_local_function
# Direct imports
import foo_module # $writes=foo_module
import foo_module # $ writes=foo_module
def use_foo(): # $writes=use_foo
print(foo_module.attr) # $reads=foo_module
def use_foo(): # $ writes=use_foo
print(foo_module.attr) # $ reads=foo_module
# Partial imports
from bar import baz_attr, quux_attr # $writes=baz_attr writes=quux_attr
from bar import baz_attr, quux_attr # $ writes=baz_attr writes=quux_attr
def use_partial_import(): # $writes=use_partial_import
print(baz_attr, quux_attr) # $reads=baz_attr reads=quux_attr
def use_partial_import(): # $ writes=use_partial_import
print(baz_attr, quux_attr) # $ reads=baz_attr reads=quux_attr
# Aliased imports
from spam_module import ham_attr as eggs_attr # $writes=eggs_attr
from spam_module import ham_attr as eggs_attr # $ writes=eggs_attr
def use_aliased_import(): # $writes=use_aliased_import
print(eggs_attr) # $reads=eggs_attr
def use_aliased_import(): # $ writes=use_aliased_import
print(eggs_attr) # $ reads=eggs_attr
# Import star (unlikely to work unless we happen to extract/model the referenced module)
@@ -99,23 +99,23 @@ def use_aliased_import(): # $writes=use_aliased_import
from unknown import *
def secretly_use_unknown(): # $writes=secretly_use_unknown
print(unknown_attr) # $reads=unknown_attr
def secretly_use_unknown(): # $ writes=secretly_use_unknown
print(unknown_attr) # $ reads=unknown_attr
# Known modules
from known import *
def secretly_use_known(): # $writes=secretly_use_known
print(known_attr) # $reads=known_attr
def secretly_use_known(): # $ writes=secretly_use_known
print(known_attr) # $ reads=known_attr
# Local import in function
def imports_locally(): # $writes=imports_locally
def imports_locally(): # $ writes=imports_locally
import mod1
# Global import hidden in function
def imports_stuff(): # $writes=imports_stuff
def imports_stuff(): # $ writes=imports_stuff
global mod2
import mod2 # $writes=mod2
import mod2 # $ writes=mod2

View File

@@ -1,4 +1,4 @@
import threading
import threading
import time
# Test 1
@@ -7,11 +7,11 @@ foo1 = None
def bar1():
time.sleep(1)
ensure_tainted(foo1) # $tainted
ensure_tainted(foo1) # $ tainted
# The intent of these tests is to test how dataflow is handled through shared state accessed by different threads;
# The intent of these tests is to test how dataflow is handled through shared state accessed by different threads;
# but the presense or absense of the actual call to start a thread does not affect the results (there is no special modelling for Thread)
# threading.Thread(target=bar).start()
# threading.Thread(target=bar).start()
foo1 = TAINTED_STRING
@@ -21,7 +21,7 @@ foo2 = []
def bar2():
time.sleep(1)
ensure_tainted(foo2[0]) # $MISSING:tainted
ensure_tainted(foo2[0]) # $ MISSING:tainted
threading.Thread(target=bar2).start()
@@ -33,7 +33,7 @@ foo3 = []
def bar3():
time.sleep(1)
ensure_tainted(foo2[0]) # $MISSING:tainted
ensure_tainted(foo2[0]) # $ MISSING:tainted
foo3.append(TAINTED_STRING)
bar3()
@@ -42,16 +42,16 @@ bar3()
# TP - Sanity check: Flow is found through a ListElement directly without a call
foo4 = []
foo4.append(TAINTED_STRING)
ensure_tainted(foo4[0]) # $tainted
ensure_tainted(foo4[0]) # $ tainted
# Test 5
# FN - Flow is *not* tracked through a shared captured but non-global variable
def test5():
foo5 = None
foo5 = None
def bar5():
time.sleep(1)
ensure_tainted(foo5) # $MISSING:tainted
ensure_tainted(foo5) # $ MISSING:tainted
threading.Thread(target=bar5).start() # Only the presense of this thread call makes this an FN rather than a TN
@@ -64,7 +64,7 @@ def test6():
def bar6():
time.sleep(1)
ensure_tainted(foo6[0]) # $tainted
ensure_tainted(foo6[0]) # $ tainted
foo6.append(TAINTED_STRING)
bar6()
@@ -78,7 +78,7 @@ foo7 = []
def bar7():
time.sleep(1)
ensure_tainted(foo7[0]) # $MISSING: tainted
ensure_tainted(foo7[0]) # $ MISSING: tainted
def baz7(loc_foo):
loc_foo.append(TAINTED_STRING)
@@ -88,13 +88,13 @@ threading.Thread(target=bar7).start()
baz7(foo7)
# Test 8
# FN - Flow is also *not* found in the above case through a direct call
# FN - Flow is also *not* found in the above case through a direct call
foo8 = []
def bar8():
time.sleep(1)
ensure_tainted(foo8[0]) # $MISSING: tainted
ensure_tainted(foo8[0]) # $ MISSING: tainted
def baz8(loc_foo):
loc_foo.append(TAINTED_STRING)
@@ -109,10 +109,10 @@ def test9():
foo9 = []
def bar9():
time.sleep(1)
ensure_tainted(foo9[0]) # $tainted
ensure_tainted(foo9[0]) # $ tainted
def baz9(loc_foo):
loc_foo.append(TAINTED_STRING)
baz9(foo9)
bar9()
bar9()

View File

@@ -182,7 +182,7 @@ SINK(instance.instance_method(SOURCE)[1]) # $ flow="SOURCE -> instance.instance
returned_class = get_class()
SINK(returned_class(SOURCE).config) # $ flow="SOURCE -> returned_class(..).config"
SINK(returned_class().instance_method(SOURCE)[1]) # $flow="SOURCE -> returned_class().instance_method(..)[1]"
SINK(returned_class().instance_method(SOURCE)[1]) # $ flow="SOURCE -> returned_class().instance_method(..)[1]"
fatory_instance = MS_Factory.get_instance()
SINK(fatory_instance.instance_method(SOURCE)[1]) # $ flow="SOURCE -> fatory_instance.instance_method(..)[1]"

View File

@@ -2,32 +2,32 @@ class SomeClass:
pass
def simple_read_write():
x = SomeClass() # $tracked=foo
x.foo = tracked # $tracked tracked=foo
y = x.foo # $tracked=foo tracked
do_stuff(y) # $tracked
x = SomeClass() # $ tracked=foo
x.foo = tracked # $ tracked tracked=foo
y = x.foo # $ tracked=foo tracked
do_stuff(y) # $ tracked
def foo():
x = SomeClass() # $tracked=attr
bar(x) # $tracked=attr
x.attr = tracked # $tracked=attr tracked
baz(x) # $tracked=attr
x = SomeClass() # $ tracked=attr
bar(x) # $ tracked=attr
x.attr = tracked # $ tracked=attr tracked
baz(x) # $ tracked=attr
def bar(x): # $tracked=attr
z = x.attr # $tracked tracked=attr
do_stuff(z) # $tracked
def bar(x): # $ tracked=attr
z = x.attr # $ tracked tracked=attr
do_stuff(z) # $ tracked
def expects_int(x): # $int=field SPURIOUS: str=field
do_int_stuff(x.field) # $int int=field SPURIOUS: str str=field
def expects_int(x): # $ int=field SPURIOUS: str=field
do_int_stuff(x.field) # $ int int=field SPURIOUS: str str=field
def expects_string(x): # $ str=field SPURIOUS: int=field
do_string_stuff(x.field) # $str str=field SPURIOUS: int int=field
do_string_stuff(x.field) # $ str str=field SPURIOUS: int int=field
def test_incompatible_types():
x = SomeClass() # $int,str=field
x.field = int(5) # $int=field int SPURIOUS: str=field
expects_int(x) # $int=field SPURIOUS: str=field
x.field = str("Hello") # $str=field str SPURIOUS: int=field
x = SomeClass() # $ int,str=field
x.field = int(5) # $ int=field int SPURIOUS: str=field
expects_int(x) # $ int=field SPURIOUS: str=field
x.field = str("Hello") # $ str=field str SPURIOUS: int=field
expects_string(x) # $ str=field SPURIOUS: int=field
# set in different function
@@ -58,7 +58,7 @@ def test_global_attribute_read():
def test_local_attribute_assignment():
# Same as `test_global_attribute_assignment`, but the assigned variable is not global
# In this case, we don't want flow going to the `ModuleVariableNode` for `local_var`
# In this case, we don't want flow going to the `ModuleVariableNode` for `local_var`
# (which is referenced in `test_local_attribute_read`).
local_var = object() # $ tracked=foo
local_var.foo = tracked # $ tracked tracked=foo
@@ -71,11 +71,11 @@ def test_local_attribute_read():
# Attributes assigned statically to a class
# ------------------------------------------------------------------------------
class MyClass: # $tracked=field
field = tracked # $tracked
class MyClass: # $ tracked=field
field = tracked # $ tracked
lookup = MyClass.field # $tracked tracked=field
instance = MyClass() # $tracked=field
lookup = MyClass.field # $ tracked tracked=field
instance = MyClass() # $ tracked=field
lookup2 = instance.field # MISSING: tracked
# ------------------------------------------------------------------------------
@@ -85,56 +85,56 @@ lookup2 = instance.field # MISSING: tracked
# Via `getattr`/`setattr`
def setattr_immediate_write():
x = SomeClass() # $tracked=foo
setattr(x,"foo", tracked) # $tracked tracked=foo
y = x.foo # $tracked tracked=foo
do_stuff(y) # $tracked
x = SomeClass() # $ tracked=foo
setattr(x,"foo", tracked) # $ tracked tracked=foo
y = x.foo # $ tracked tracked=foo
do_stuff(y) # $ tracked
def getattr_immediate_read():
x = SomeClass() # $tracked=foo
x.foo = tracked # $tracked tracked=foo
y = getattr(x,"foo") # $tracked tracked=foo
do_stuff(y) # $tracked
x = SomeClass() # $ tracked=foo
x.foo = tracked # $ tracked tracked=foo
y = getattr(x,"foo") # $ tracked tracked=foo
do_stuff(y) # $ tracked
def setattr_indirect_write():
attr = "foo"
x = SomeClass() # $tracked=foo
setattr(x, attr, tracked) # $tracked tracked=foo
y = x.foo # $tracked tracked=foo
do_stuff(y) # $tracked
x = SomeClass() # $ tracked=foo
setattr(x, attr, tracked) # $ tracked tracked=foo
y = x.foo # $ tracked tracked=foo
do_stuff(y) # $ tracked
def getattr_indirect_read():
attr = "foo"
x = SomeClass() # $tracked=foo
x.foo = tracked # $tracked tracked=foo
x = SomeClass() # $ tracked=foo
x.foo = tracked # $ tracked tracked=foo
y = getattr(x, attr) #$tracked tracked=foo
do_stuff(y) # $tracked
do_stuff(y) # $ tracked
# Via `__dict__` -- not currently implemented.
def dunder_dict_immediate_write():
x = SomeClass() # $ MISSING: tracked=foo
x.__dict__["foo"] = tracked # $tracked MISSING: tracked=foo
x.__dict__["foo"] = tracked # $ tracked MISSING: tracked=foo
y = x.foo # $ MISSING: tracked tracked=foo
do_stuff(y) # $ MISSING: tracked
def dunder_dict_immediate_read():
x = SomeClass() # $tracked=foo
x.foo = tracked # $tracked tracked=foo
x = SomeClass() # $ tracked=foo
x.foo = tracked # $ tracked tracked=foo
y = x.__dict__["foo"] # $ tracked=foo MISSING: tracked
do_stuff(y) # $ MISSING: tracked
def dunder_dict_indirect_write():
attr = "foo"
x = SomeClass() # $ MISSING: tracked=foo
x.__dict__[attr] = tracked # $tracked MISSING: tracked=foo
x.__dict__[attr] = tracked # $ tracked MISSING: tracked=foo
y = x.foo # $ MISSING: tracked tracked=foo
do_stuff(y) # $ MISSING: tracked
def dunder_dict_indirect_read():
attr = "foo"
x = SomeClass() # $tracked=foo
x.foo = tracked # $tracked tracked=foo
x = SomeClass() # $ tracked=foo
x.foo = tracked # $ tracked tracked=foo
y = x.__dict__[attr] # $ tracked=foo MISSING: tracked
do_stuff(y) # $ MISSING: tracked

View File

@@ -1,12 +1,12 @@
# test of other content types than attributes
def test_tuple(index_arg):
tup = (tracked, other) # $tracked
tup = (tracked, other) # $ tracked
tup[0] # $ tracked
tup[1]
a,b = tup # $tracked
a,b = tup # $ tracked
a # $ tracked
b
@@ -22,7 +22,7 @@ def test_tuple(index_arg):
# nested tuples
nested_tuples = ((tracked, other), (other, tracked)) # $tracked
nested_tuples = ((tracked, other), (other, tracked)) # $ tracked
nested_tuples[0][0] # $ MISSING: tracked
nested_tuples[0][1]
@@ -44,7 +44,7 @@ def test_tuple(index_arg):
def test_dict(key_arg):
d1 = {"t": tracked, "o": other} # $tracked
d1 = {"t": tracked, "o": other} # $ tracked
d1["t"] # $ tracked
d1.get("t") # $ MISSING: tracked
d1.setdefault("t") # $ MISSING: tracked
@@ -84,7 +84,7 @@ def test_dict(key_arg):
def test_list(index_arg):
l = [tracked, other] # $tracked
l = [tracked, other] # $ tracked
l[0] # $ MISSING: tracked
l[1]

View File

@@ -1,9 +1,9 @@
def foo(foo_x): # $tracked
print("foo", foo_x) # $tracked
def foo(foo_x): # $ tracked
print("foo", foo_x) # $ tracked
def bar(bar_x): # $tracked
print("bar", bar_x) # $tracked
def bar(bar_x): # $ tracked
print("bar", bar_x) # $ tracked
if len(__file__) % 2 == 0:
@@ -11,5 +11,5 @@ if len(__file__) % 2 == 0:
else:
f = bar
x = tracked # $tracked
f(x) # $tracked
x = tracked # $ tracked
f(x) # $ tracked

View File

@@ -1,7 +1,7 @@
x = tracked # $tracked
x = tracked # $ tracked
def func():
return tracked # $tracked
return tracked # $ tracked
z = tracked # $tracked
some_func(z) # $tracked
z = tracked # $ tracked
some_func(z) # $ tracked

View File

@@ -1,64 +1,64 @@
def get_tracked():
x = tracked # $tracked
return x # $tracked
x = tracked # $ tracked
return x # $ tracked
def use_tracked_foo(x): # $tracked
do_stuff(x) # $tracked
def use_tracked_foo(x): # $ tracked
do_stuff(x) # $ tracked
def foo():
use_tracked_foo(
get_tracked() # $tracked
get_tracked() # $ tracked
)
def use_tracked_bar(x): # $tracked
do_stuff(x) # $tracked
def use_tracked_bar(x): # $ tracked
do_stuff(x) # $ tracked
def bar():
x = get_tracked() # $tracked
use_tracked_bar(x) # $tracked
x = get_tracked() # $ tracked
use_tracked_bar(x) # $ tracked
def use_tracked_baz(x): # $tracked
do_stuff(x) # $tracked
def use_tracked_baz(x): # $ tracked
do_stuff(x) # $ tracked
def baz():
x = tracked # $tracked
use_tracked_baz(x) # $tracked
x = tracked # $ tracked
use_tracked_baz(x) # $ tracked
def id(x): # $tracked
return x # $tracked
def id(x): # $ tracked
return x # $ tracked
def use_tracked_quux(x): # $ MISSING: tracked
do_stuff(y) # call after return -- not tracked in here.
def quux():
x = tracked # $tracked
y = id(x) # $tracked
x = tracked # $ tracked
y = id(x) # $ tracked
use_tracked_quux(y) # not tracked out of call to id.
g = None
def write_g(x): # $tracked
def write_g(x): # $ tracked
global g
g = x # $tracked
g = x # $ tracked
def use_g():
do_stuff(g) # $tracked
do_stuff(g) # $ tracked
def global_var_write_test():
x = tracked # $tracked
write_g(x) # $tracked
x = tracked # $ tracked
write_g(x) # $ tracked
use_g()
def test_import():
import mymodule
mymodule.x # $tracked
y = mymodule.func() # $tracked
y # $tracked
mymodule.z # $tracked
mymodule.x # $ tracked
y = mymodule.func() # $ tracked
y # $ tracked
mymodule.z # $ tracked
def to_inner_scope():
x = tracked # $tracked
x = tracked # $ tracked
def foo():
y = x # $ tracked
return y # $ tracked
@@ -67,15 +67,15 @@ def to_inner_scope():
def from_parameter_default():
x_alias = tracked # $tracked
def outer(x=tracked): # $tracked
print(x) # $tracked
x_alias = tracked # $ tracked
def outer(x=tracked): # $ tracked
print(x) # $ tracked
def inner():
print(x) # $ tracked
print(x_alias) # $tracked
return x # $tracked
also_x = outer() # $tracked
print(also_x) # $tracked
print(x_alias) # $ tracked
return x # $ tracked
also_x = outer() # $ tracked
print(also_x) # $ tracked
# ------------------------------------------------------------------------------
@@ -86,7 +86,7 @@ def my_decorator(func):
# This part doesn't make any sense in a normal decorator, but just shows how we
# handle type-tracking
func() # $tracked
func() # $ tracked
def wrapper():
print("before function call")
@@ -97,7 +97,7 @@ def my_decorator(func):
@my_decorator
def get_tracked2():
return tracked # $tracked
return tracked # $ tracked
@my_decorator
def unrelated_func():
@@ -109,17 +109,17 @@ def use_funcs_with_decorators():
# ------------------------------------------------------------------------------
def expects_int(x): # $int
do_int_stuff(x) # $int
def expects_int(x): # $ int
do_int_stuff(x) # $ int
def expects_string(x): # $str
do_string_stuff(x) # $str
def expects_string(x): # $ str
do_string_stuff(x) # $ str
def redefine_test():
x = int(5) # $int
expects_int(x) # $int
x = str("Hello") # $str
expects_string(x) # $str
x = int(5) # $ int
expects_int(x) # $ int
x = str("Hello") # $ str
expects_string(x) # $ str
# ------------------------------------------------------------------------------
# Tracking of self in methods

View File

@@ -92,12 +92,12 @@ def only_az(value):
class CommentValidatorNotUsed(models.Model):
text = models.CharField(max_length=256, validators=[only_az])
def save_comment_validator_not_used(request): # $requestHandler
def save_comment_validator_not_used(request): # $ requestHandler
comment = CommentValidatorNotUsed(text=request.POST["text"])
comment.save()
return HttpResponse("ok")
def display_comment_validator_not_used(request): # $requestHandler
def display_comment_validator_not_used(request): # $ requestHandler
comment = CommentValidatorNotUsed.objects.last()
return HttpResponse(comment.text) # NOT OK
@@ -111,12 +111,12 @@ http http://127.0.0.1:8000/display_comment_validator_not_used/
class CommentValidatorUsed(models.Model):
text = models.CharField(max_length=256, validators=[only_az])
def save_comment_validator_used(request): # $requestHandler
def save_comment_validator_used(request): # $ requestHandler
comment = CommentValidatorUsed(text=request.POST["text"])
comment.full_clean()
comment.save()
def display_comment_validator_used(request): # $requestHandler
def display_comment_validator_used(request): # $ requestHandler
comment = CommentValidatorUsed.objects.last()
return HttpResponse(comment.text) # sort of OK

View File

@@ -4,21 +4,21 @@ from django.http.response import HttpResponse
from django.views.generic import View
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def url_match_xss(request, foo, bar, no_taint=None): # $ requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $ HttpResponse
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def get_params_xss(request): # $ requestHandler
return HttpResponse(request.GET.get("untrusted")) # $ HttpResponse
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $ requestHandler
return HttpResponse(request.POST.get("untrusted")) # $ HttpResponse
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $ requestHandler
rsp = HttpResponse() # $ HttpResponse
rsp.write(request.GET.get("untrusted")) # $ HttpResponse
return rsp
@@ -27,53 +27,53 @@ class Foo(object):
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
return HttpResponse('Foo post: {}'.format(untrusted)) # $ HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
return HttpResponse('ClassView get: {}'.format(untrusted)) # $ HttpResponse
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
def show_articles(request, page_number=1): # $ requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
return HttpResponse('articles page: {}'.format(page_number)) # $ HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $ requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $ HttpResponse
urlpatterns = [
url(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
url(r"^get_params", get_params_xss), # $routeSetup="^get_params"
url(r"^post_params", post_params_xss), # $routeSetup="^post_params"
url(r"^http_resp_write", http_resp_write), # $routeSetup="^http_resp_write"
url(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $routeSetup="^class_view/(?P<untrusted>.+)"
url(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $ routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
url(r"^get_params", get_params_xss), # $ routeSetup="^get_params"
url(r"^post_params", post_params_xss), # $ routeSetup="^post_params"
url(r"^http_resp_write", http_resp_write), # $ routeSetup="^http_resp_write"
url(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $ routeSetup="^class_view/(?P<untrusted>.+)"
# one pattern to support `articles/page-<n>` and ensuring that articles/ goes to page-1
url(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
url(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $ routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
# passing as positional argument is not the recommended way of doing things, but it is certainly
# possible
url(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
url(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $ routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
]
################################################################################
# Using patterns() for routing
def show_user(request, username): # $requestHandler routedParameter=username
return HttpResponse('show_user {}'.format(username)) # $HttpResponse
def show_user(request, username): # $ requestHandler routedParameter=username
return HttpResponse('show_user {}'.format(username)) # $ HttpResponse
urlpatterns = patterns(url(r"^users/(?P<username>[^/]+)", show_user)) # $routeSetup="^users/(?P<username>[^/]+)"
urlpatterns = patterns(url(r"^users/(?P<username>[^/]+)", show_user)) # $ routeSetup="^users/(?P<username>[^/]+)"
################################################################################
# Show we understand the keyword arguments to django.conf.urls.url
def kw_args(request): # $requestHandler
return HttpResponse('kw_args') # $HttpResponse
def kw_args(request): # $ requestHandler
return HttpResponse('kw_args') # $ HttpResponse
urlpatterns = [
url(view=kw_args, regex=r"^kw_args") # $routeSetup="^kw_args"
url(view=kw_args, regex=r"^kw_args") # $ routeSetup="^kw_args"
]

View File

@@ -8,12 +8,12 @@ import json
# FP reported in https://github.com/github/codeql-python-team/issues/38
def safe__json_response(request):
# implicitly sets Content-Type to "application/json"
return JsonResponse({"foo": request.GET.get("foo")}) # $HttpResponse mimetype=application/json responseBody=Dict
return JsonResponse({"foo": request.GET.get("foo")}) # $ HttpResponse mimetype=application/json responseBody=Dict
# Not an XSS sink, since the Content-Type is not "text/html"
def safe__manual_json_response(request):
json_data = '{"json": "{}"}'.format(request.GET.get("foo"))
return HttpResponse(json_data, content_type="application/json") # $HttpResponse mimetype=application/json responseBody=json_data
return HttpResponse(json_data, content_type="application/json") # $ HttpResponse mimetype=application/json responseBody=json_data
# reproduction of FP seen here:
# Usage: https://github.com/edx/edx-platform/blob/d70ebe6343a1573c694d6cf68f92c1ad40b73d7d/lms/djangoapps/commerce/api/v0/views.py#L106
@@ -27,25 +27,25 @@ class MyJsonResponse(HttpResponse):
# Not an XSS sink, since the Content-Type is not "text/html"
def safe__custom_json_response(request):
json_data = '{"json": "{}"}'.format(request.GET.get("foo"))
return MyJsonResponse(json_data) # $HttpResponse responseBody=json_data SPURIOUS: mimetype=text/html MISSING: mimetype=application/json
return MyJsonResponse(json_data) # $ HttpResponse responseBody=json_data SPURIOUS: mimetype=text/html MISSING: mimetype=application/json
# Not an XSS sink, since the Content-Type is not "text/html"
def safe__manual_content_type(request):
return HttpResponse('<img src="0" onerror="alert(1)">', content_type="text/plain") # $HttpResponse mimetype=text/plain responseBody='<img src="0" onerror="alert(1)">'
return HttpResponse('<img src="0" onerror="alert(1)">', content_type="text/plain") # $ HttpResponse mimetype=text/plain responseBody='<img src="0" onerror="alert(1)">'
# XSS FP reported in https://github.com/github/codeql/issues/3466
# Note: This should be an open-redirect sink, but not an XSS sink.
def or__redirect(request):
next = request.GET.get("next")
return HttpResponseRedirect(next) # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation=next
return HttpResponseRedirect(next) # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation=next
def information_exposure_through_redirect(request, as_kw=False):
# This is a contrived example, but possible
private = "private"
next = request.GET.get("next")
if as_kw:
return HttpResponseRedirect(next, content=private) # $HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
return HttpResponseRedirect(next, content=private) # $ HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
else:
return HttpResponseRedirect(next, private) # $ HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
@@ -91,20 +91,20 @@ class CustomRedirectView(RedirectView):
# Ensure that simple subclasses are still vuln to XSS
def xss__not_found(request):
return HttpResponseNotFound(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
return HttpResponseNotFound(request.GET.get("name")) # $ HttpResponse mimetype=text/html responseBody=request.GET.get(..)
# Ensure we still have an XSS sink when manually setting the content_type to HTML
def xss__manual_response_type(request):
return HttpResponse(request.GET.get("name"), content_type="text/html; charset=utf-8") # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
return HttpResponse(request.GET.get("name"), content_type="text/html; charset=utf-8") # $ HttpResponse mimetype=text/html responseBody=request.GET.get(..)
def xss__write(request):
response = HttpResponse() # $HttpResponse mimetype=text/html
response.write(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
response = HttpResponse() # $ HttpResponse mimetype=text/html
response.write(request.GET.get("name")) # $ HttpResponse mimetype=text/html responseBody=request.GET.get(..)
# This is safe but probably a bug if the argument to `write` is not a result of `json.dumps` or similar.
def safe__write_json(request):
response = JsonResponse() # $HttpResponse mimetype=application/json
response.write(request.GET.get("name")) # $HttpResponse mimetype=application/json responseBody=request.GET.get(..)
response = JsonResponse() # $ HttpResponse mimetype=application/json
response.write(request.GET.get("name")) # $ HttpResponse mimetype=application/json responseBody=request.GET.get(..)
# Ensure manual subclasses are vulnerable
class CustomResponse(HttpResponse):
@@ -112,15 +112,15 @@ class CustomResponse(HttpResponse):
super().__init__(content, *args, content_type="text/html", **kwargs)
def xss__custom_response(request):
return CustomResponse("ACME Responses", request.GET("name")) # $HttpResponse MISSING: mimetype=text/html responseBody=request.GET.get(..) SPURIOUS: responseBody="ACME Responses"
return CustomResponse("ACME Responses", request.GET("name")) # $ HttpResponse MISSING: mimetype=text/html responseBody=request.GET.get(..) SPURIOUS: responseBody="ACME Responses"
class CustomJsonResponse(JsonResponse):
def __init__(self, banner, content, *args, **kwargs):
super().__init__(content, *args, content_type="text/html", **kwargs)
@csrf_protect # $CsrfLocalProtectionEnabled=safe__custom_json_response
@csrf_protect # $ CsrfLocalProtectionEnabled=safe__custom_json_response
def safe__custom_json_response(request):
return CustomJsonResponse("ACME Responses", {"foo": request.GET.get("foo")}) # $HttpResponse mimetype=application/json MISSING: responseBody=Dict SPURIOUS: responseBody="ACME Responses"
return CustomJsonResponse("ACME Responses", {"foo": request.GET.get("foo")}) # $ HttpResponse mimetype=application/json MISSING: responseBody=Dict SPURIOUS: responseBody="ACME Responses"
################################################################################
# Cookies
@@ -135,7 +135,7 @@ def setting_cookie(request):
resp.delete_cookie("key4") # $ CookieWrite CookieName="key4"
resp.delete_cookie(key="key4") # $ CookieWrite CookieName="key4"
resp["Set-Cookie"] = "key5=value5" # $ headerWriteName="Set-Cookie" headerWriteValue="key5=value5" CookieWrite CookieRawHeader="key5=value5" CookieSecure=false CookieHttpOnly=false CookieSameSite=Lax
resp.set_cookie(key="key6", value="value6", secure=True, httponly=False, samesite="None") # $ CookieWrite CookieName="key6" CookieValue="value6" CookieSecure=true CookieHttpOnly=false CookieSameSite=None
resp.set_cookie(key="key6", value="value6", secure=True, httponly=False, samesite="None") # $ CookieWrite CookieName="key6" CookieValue="value6" CookieSecure=true CookieHttpOnly=false CookieSameSite=None
kwargs = {'secure': True}
resp.set_cookie(key="key7", value="value7", **kwargs) # $ CookieWrite CookieName="key7" CookieValue="value7"
return resp

View File

@@ -6,21 +6,21 @@ import django.views.generic.base
from django.views.decorators.http import require_GET
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def url_match_xss(request, foo, bar, no_taint=None): # $ requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $ HttpResponse
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def get_params_xss(request): # $ requestHandler
return HttpResponse(request.GET.get("untrusted")) # $ HttpResponse
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $ requestHandler
return HttpResponse(request.POST.get("untrusted")) # $ HttpResponse
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $ requestHandler
rsp = HttpResponse() # $ HttpResponse
rsp.write(request.GET.get("untrusted")) # $ HttpResponse
return rsp
@@ -29,13 +29,13 @@ class Foo(object):
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
return HttpResponse('Foo post: {}'.format(untrusted)) # $ HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
return HttpResponse('ClassView get: {}'.format(untrusted)) # $ HttpResponse
# direct import with full path to `View` class
@@ -44,38 +44,38 @@ class ClassView2(django.views.generic.base.View):
pass
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
def show_articles(request, page_number=1): # $ requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
return HttpResponse('articles page: {}'.format(page_number)) # $ HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $ requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $ HttpResponse
urlpatterns = [
re_path(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
re_path(r"^get_params", get_params_xss), # $routeSetup="^get_params"
re_path(r"^post_params", post_params_xss), # $routeSetup="^post_params"
re_path(r"^http_resp_write", http_resp_write), # $routeSetup="^http_resp_write"
re_path(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $routeSetup="^class_view/(?P<untrusted>.+)"
re_path(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $ routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
re_path(r"^get_params", get_params_xss), # $ routeSetup="^get_params"
re_path(r"^post_params", post_params_xss), # $ routeSetup="^post_params"
re_path(r"^http_resp_write", http_resp_write), # $ routeSetup="^http_resp_write"
re_path(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $ routeSetup="^class_view/(?P<untrusted>.+)"
# one pattern to support `articles/page-<n>` and ensuring that articles/ goes to page-1
re_path(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
re_path(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $ routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
# passing as positional argument is not the recommended way of doing things, but it is certainly
# possible
re_path(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
re_path(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $ routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
]
# Show we understand the keyword arguments to django.urls.re_path
def re_path_kwargs(request): # $requestHandler
return HttpResponse('re_path_kwargs') # $HttpResponse
def re_path_kwargs(request): # $ requestHandler
return HttpResponse('re_path_kwargs') # $ HttpResponse
urlpatterns = [
re_path(view=re_path_kwargs, route=r"^specifying-as-kwargs-is-not-a-problem") # $routeSetup="^specifying-as-kwargs-is-not-a-problem"
re_path(view=re_path_kwargs, route=r"^specifying-as-kwargs-is-not-a-problem") # $ routeSetup="^specifying-as-kwargs-is-not-a-problem"
]
################################################################################
@@ -83,27 +83,27 @@ urlpatterns = [
################################################################################
# saying page_number is an externally controlled *string* is a bit strange, when we have an int converter :O
def page_number(request, page_number=1): # $requestHandler routedParameter=page_number
return HttpResponse('page_number: {}'.format(page_number)) # $HttpResponse
def page_number(request, page_number=1): # $ requestHandler routedParameter=page_number
return HttpResponse('page_number: {}'.format(page_number)) # $ HttpResponse
def foo_bar_baz(request, foo, bar, baz): # $requestHandler routedParameter=foo routedParameter=bar routedParameter=baz
return HttpResponse('foo_bar_baz: {} {} {}'.format(foo, bar, baz)) # $HttpResponse
def foo_bar_baz(request, foo, bar, baz): # $ requestHandler routedParameter=foo routedParameter=bar routedParameter=baz
return HttpResponse('foo_bar_baz: {} {} {}'.format(foo, bar, baz)) # $ HttpResponse
def path_kwargs(request, foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('path_kwargs: {} {} {}'.format(foo, bar)) # $HttpResponse
def path_kwargs(request, foo, bar): # $ requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('path_kwargs: {} {} {}'.format(foo, bar)) # $ HttpResponse
def not_valid_identifier(request): # $requestHandler
return HttpResponse('<foo!>') # $HttpResponse
def not_valid_identifier(request): # $ requestHandler
return HttpResponse('<foo!>') # $ HttpResponse
urlpatterns = [
path("articles/", page_number), # $routeSetup="articles/"
path("articles/page-<int:page_number>", page_number), # $routeSetup="articles/page-<int:page_number>"
path("<int:foo>/<str:bar>/<baz>", foo_bar_baz, name='foo-bar-baz'), # $routeSetup="<int:foo>/<str:bar>/<baz>"
path("articles/", page_number), # $ routeSetup="articles/"
path("articles/page-<int:page_number>", page_number), # $ routeSetup="articles/page-<int:page_number>"
path("<int:foo>/<str:bar>/<baz>", foo_bar_baz, name='foo-bar-baz'), # $ routeSetup="<int:foo>/<str:bar>/<baz>"
path(view=path_kwargs, route="<foo>/<bar>"), # $routeSetup="<foo>/<bar>"
path(view=path_kwargs, route="<foo>/<bar>"), # $ routeSetup="<foo>/<bar>"
# We should not report there is a request parameter called `not_valid!`
path("not_valid/<not_valid!>", not_valid_identifier), # $routeSetup="not_valid/<not_valid!>"
path("not_valid/<not_valid!>", not_valid_identifier), # $ routeSetup="not_valid/<not_valid!>"
]
################################################################################
@@ -113,11 +113,11 @@ urlpatterns = [
# This version 1.x way of defining urls is deprecated in Django 3.1, but still works
from django.conf.urls import url
def deprecated(request): # $requestHandler
return HttpResponse('deprecated') # $HttpResponse
def deprecated(request): # $ requestHandler
return HttpResponse('deprecated') # $ HttpResponse
urlpatterns = [
url(r"^deprecated/", deprecated), # $routeSetup="^deprecated/"
url(r"^deprecated/", deprecated), # $ routeSetup="^deprecated/"
]
@@ -130,7 +130,7 @@ class PossiblyNotRouted(View):
# consider it to be a handle incoming HTTP requests
def get(self, request, possibly_not_routed=42): # $ requestHandler routedParameter=possibly_not_routed
return HttpResponse('PossiblyNotRouted get: {}'.format(possibly_not_routed)) # $HttpResponse
return HttpResponse('PossiblyNotRouted get: {}'.format(possibly_not_routed)) # $ HttpResponse
@require_GET

View File

@@ -4,7 +4,7 @@ from django.http import HttpRequest
from django.views import View
def test_taint(request: HttpRequest, foo, bar, baz=None): # $requestHandler routedParameter=foo routedParameter=bar
def test_taint(request: HttpRequest, foo, bar, baz=None): # $ requestHandler routedParameter=foo routedParameter=bar
ensure_tainted(foo, bar) # $ tainted
ensure_not_tainted(baz)

View File

@@ -3,19 +3,19 @@ from django.urls import path, re_path
from . import views
urlpatterns = [
path("foo/", views.foo), # $routeSetup="foo/"
path("foo/", views.foo), # $ routeSetup="foo/"
# TODO: Doesn't include standard `$` to mark end of string, due to problems with
# inline expectation tests (which thinks the `$` would mark the beginning of a new
# line)
re_path(r"^ba[rz]/", views.bar_baz), # $routeSetup="^ba[rz]/"
re_path(r"^ba[rz]/", views.bar_baz), # $ routeSetup="^ba[rz]/"
path("basic-view-handler/", views.MyBasicViewHandler.as_view()), # $routeSetup="basic-view-handler/"
path("custom-inheritance-view-handler/", views.MyViewHandlerWithCustomInheritance.as_view()), # $routeSetup="custom-inheritance-view-handler/"
path("basic-view-handler/", views.MyBasicViewHandler.as_view()), # $ routeSetup="basic-view-handler/"
path("custom-inheritance-view-handler/", views.MyViewHandlerWithCustomInheritance.as_view()), # $ routeSetup="custom-inheritance-view-handler/"
path("CustomRedirectView/<foo>", views.CustomRedirectView.as_view()), # $routeSetup="CustomRedirectView/<foo>"
path("CustomRedirectView2/<foo>", views.CustomRedirectView2.as_view()), # $routeSetup="CustomRedirectView2/<foo>"
path("CustomRedirectView/<foo>", views.CustomRedirectView.as_view()), # $ routeSetup="CustomRedirectView/<foo>"
path("CustomRedirectView2/<foo>", views.CustomRedirectView2.as_view()), # $ routeSetup="CustomRedirectView2/<foo>"
path("file-test/", views.file_test), # $routeSetup="file-test/"
path("file-test/", views.file_test), # $ routeSetup="file-test/"
]
from django import __version__ as django_version
@@ -30,7 +30,7 @@ if django_version[0] == "3":
# we need this assignment to get our logic working... maybe it should be more
# sophisticated?
urlpatterns = [
url(r"^deprecated/", views.deprecated), # $routeSetup="^deprecated/"
url(r"^deprecated/", views.deprecated), # $ routeSetup="^deprecated/"
]
urlpatterns += old_urlpatterns

View File

@@ -4,16 +4,16 @@ from django.views.decorators.csrf import csrf_exempt
from .models import MyModel
def foo(request: HttpRequest): # $requestHandler
return HttpResponse("foo") # $HttpResponse
def foo(request: HttpRequest): # $ requestHandler
return HttpResponse("foo") # $ HttpResponse
def bar_baz(request: HttpRequest): # $requestHandler
return HttpResponse("bar_baz") # $HttpResponse
def bar_baz(request: HttpRequest): # $ requestHandler
return HttpResponse("bar_baz") # $ HttpResponse
def deprecated(request: HttpRequest): # $requestHandler
return HttpResponse("deprecated") # $HttpResponse
def deprecated(request: HttpRequest): # $ requestHandler
return HttpResponse("deprecated") # $ HttpResponse
class MyBasicViewHandler(View):

View File

@@ -48,7 +48,7 @@ MIDDLEWARE = [
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
] # $CsrfProtectionSetting=false
] # $ CsrfProtectionSetting=false
ROOT_URLCONF = 'testproj.urls'

View File

@@ -17,6 +17,6 @@ from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls), # $routeSetup="admin/"
path("app/", include("testapp.urls")), # $routeSetup="app/"
path("admin/", admin.site.urls), # $ routeSetup="admin/"
path("app/", include("testapp.urls")), # $ routeSetup="app/"
]

View File

@@ -4,13 +4,13 @@ from django.db.models.expressions import RawSQL
def test_plain():
cursor = connection.cursor()
cursor.execute("some sql") # $getSql="some sql"
cursor.execute("some sql") # $ getSql="some sql"
def test_context():
with connection.cursor() as cursor:
cursor.execute("some sql") # $getSql="some sql"
cursor.execute(sql="some sql") # $getSql="some sql"
cursor.execute("some sql") # $ getSql="some sql"
cursor.execute(sql="some sql") # $ getSql="some sql"
class User(models.Model):
@@ -18,20 +18,20 @@ class User(models.Model):
def test_model():
User.objects.raw("some sql") # $getSql="some sql"
User.objects.raw("some sql") # $ getSql="some sql"
User.objects.annotate(RawSQL("some sql")) # $getSql="some sql"
User.objects.annotate(RawSQL("foo"), RawSQL("bar")) # $getSql="foo" getSql="bar"
User.objects.annotate(val=RawSQL("some sql")) # $getSql="some sql"
User.objects.annotate(RawSQL("some sql")) # $ getSql="some sql"
User.objects.annotate(RawSQL("foo"), RawSQL("bar")) # $ getSql="foo" getSql="bar"
User.objects.annotate(val=RawSQL("some sql")) # $ getSql="some sql"
User.objects.alias(RawSQL("foo"), RawSQL("bar")) # $getSql="foo" getSql="bar"
User.objects.alias(val=RawSQL("some sql")) # $getSql="some sql"
User.objects.alias(RawSQL("foo"), RawSQL("bar")) # $ getSql="foo" getSql="bar"
User.objects.alias(val=RawSQL("some sql")) # $ getSql="some sql"
User.objects.extra("some sql") # $getSql="some sql"
User.objects.extra(select="select", where="where", tables="tables", order_by="order_by") # $getSql="select" getSql="where" getSql="tables" getSql="order_by"
User.objects.extra("some sql") # $ getSql="some sql"
User.objects.extra(select="select", where="where", tables="tables", order_by="order_by") # $ getSql="select" getSql="where" getSql="tables" getSql="order_by"
raw = RawSQL("so raw")
User.objects.annotate(val=raw) # $getSql="so raw"
User.objects.annotate(val=raw) # $ getSql="so raw"
# chaining QuerySet calls
User.objects.using("db-name").exclude(username="admin").extra("some sql") # $ getSql="some sql"

View File

@@ -5,15 +5,15 @@ See http://docs.fabfile.org/en/1.14/tutorial.html
from fabric.api import run, local, sudo
local("cmd1; cmd2") # $getCommand="cmd1; cmd2"
run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
local("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
local(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
local(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
run(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
sudo(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
from fabric import operations
operations.local("cmd1; cmd2") # $getCommand="cmd1; cmd2"
operations.local(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
operations.local("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
operations.local(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"

View File

@@ -9,41 +9,41 @@ from fabric import connection, Connection, group, SerialGroup, ThreadingGroup, t
# Connection
################################################################################
c = Connection("web1")
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.local("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.local("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.local(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.local(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.run(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.sudo(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# fully qualified usage
c2 = connection.Connection("web2")
c2.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c2.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# ssh proxy_command command injection with gateway parameter,
# we need to call some of the following functions to run the proxy command
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c.run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.run(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.get("afs")
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c.sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.sudo(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.open_gateway()
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.open()
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.create_session()
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.forward_local("80")
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.forward_remote("80")
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.put(local="local")
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.shell()
c = Connection("web1", gateway="cmd") # $getCommand="cmd"
c = Connection("web1", gateway="cmd") # $ getCommand="cmd"
c.sftp()
# no call to desired methods so it is safe
c = Connection("web1", gateway="cmd")
@@ -51,26 +51,26 @@ c = Connection("web1", gateway="cmd")
################################################################################
# SerialGroup
################################################################################
results = SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
results = SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
pool = SerialGroup("web1", "web2", "web3")
pool.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
pool.sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# fully qualified usage
group.SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
group.SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
################################################################################
# ThreadingGroup
################################################################################
results = ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
results = ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
pool = ThreadingGroup("web1", "web2", "web3")
pool.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
pool.sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# fully qualified usage
group.ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
group.ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
################################################################################
@@ -81,11 +81,11 @@ group.ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="c
@task
def foo(c):
# 'c' is a fabric.connection.Connection
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# fully qualified usage
@tasks.task
def bar(c):
# 'c' is a fabric.connection.Connection
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"

View File

@@ -3,9 +3,9 @@ import flask
from flask import Flask, request, make_response
app = Flask(__name__)
@app.route("/") # $routeSetup="/"
def hello_world(): # $requestHandler
return "Hello World!" # $HttpResponse
@app.route("/") # $ routeSetup="/"
def hello_world(): # $ requestHandler
return "Hello World!" # $ HttpResponse
from flask.views import MethodView
@@ -22,46 +22,46 @@ class MyView(MethodView):
the_view = MyView.as_view('my_view')
app.add_url_rule('/the/', defaults={'user_id': None},
view_func=the_view, methods=['GET',]) # $routeSetup="/the/"
view_func=the_view, methods=['GET',]) # $ routeSetup="/the/"
@app.route("/dangerous") # $routeSetup="/dangerous"
def dangerous(): # $requestHandler
return request.args.get('payload') # $HttpResponse
@app.route("/dangerous") # $ routeSetup="/dangerous"
def dangerous(): # $ requestHandler
return request.args.get('payload') # $ HttpResponse
@app.route("/dangerous-with-cfg-split") # $routeSetup="/dangerous-with-cfg-split"
def dangerous2(): # $requestHandler
@app.route("/dangerous-with-cfg-split") # $ routeSetup="/dangerous-with-cfg-split"
def dangerous2(): # $ requestHandler
x = request.form['param0']
if request.method == "POST":
return request.form['param1'] # $HttpResponse
return request.form['param1'] # $ HttpResponse
return None # $ SPURIOUS: HttpResponse
@app.route("/unsafe") # $routeSetup="/unsafe"
def unsafe(): # $requestHandler
@app.route("/unsafe") # $ routeSetup="/unsafe"
def unsafe(): # $ requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + first_name) # $HttpResponse
return make_response("Your name is " + first_name) # $ HttpResponse
@app.route("/safe") # $routeSetup="/safe"
def safe(): # $requestHandler
@app.route("/safe") # $ routeSetup="/safe"
def safe(): # $ requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + escape(first_name)) # $HttpResponse
return make_response("Your name is " + escape(first_name)) # $ HttpResponse
@app.route("/hello/<name>") # $routeSetup="/hello/<name>"
def hello(name): # $requestHandler routedParameter=name
return make_response("Your name is " + name) # $HttpResponse
@app.route("/hello/<name>") # $ routeSetup="/hello/<name>"
def hello(name): # $ requestHandler routedParameter=name
return make_response("Your name is " + name) # $ HttpResponse
@app.route("/foo/<path:subpath>") # $routeSetup="/foo/<path:subpath>"
def foo(subpath): # $requestHandler routedParameter=subpath
return make_response("The subpath is " + subpath) # $HttpResponse
@app.route("/foo/<path:subpath>") # $ routeSetup="/foo/<path:subpath>"
def foo(subpath): # $ requestHandler routedParameter=subpath
return make_response("The subpath is " + subpath) # $ HttpResponse
@app.route("/multiple/") # $routeSetup="/multiple/"
@app.route("/multiple/foo/<foo>") # $routeSetup="/multiple/foo/<foo>"
@app.route("/multiple/bar/<bar>") # $routeSetup="/multiple/bar/<bar>"
def multiple(foo=None, bar=None): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("foo={!r} bar={!r}".format(foo, bar)) # $HttpResponse
@app.route("/multiple/") # $ routeSetup="/multiple/"
@app.route("/multiple/foo/<foo>") # $ routeSetup="/multiple/foo/<foo>"
@app.route("/multiple/bar/<bar>") # $ routeSetup="/multiple/bar/<bar>"
def multiple(foo=None, bar=None): # $ requestHandler routedParameter=foo routedParameter=bar
return make_response("foo={!r} bar={!r}".format(foo, bar)) # $ HttpResponse
@app.route("/complex/<string(length=2):lang_code>") # $routeSetup="/complex/<string(length=2):lang_code>"
def complex(lang_code): # $requestHandler routedParameter=lang_code
return make_response("lang_code {}".format(lang_code)) # $HttpResponse
@app.route("/complex/<string(length=2):lang_code>") # $ routeSetup="/complex/<string(length=2):lang_code>"
def complex(lang_code): # $ requestHandler routedParameter=lang_code
return make_response("lang_code {}".format(lang_code)) # $ HttpResponse
if __name__ == "__main__":
app.run(debug=True)

View File

@@ -6,23 +6,23 @@ from werkzeug.datastructures import Headers
app = Flask(__name__)
@app.route("/html1") # $routeSetup="/html1"
def html1(): # $requestHandler
return "<h1>hello</h1>" # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html1") # $ routeSetup="/html1"
def html1(): # $ requestHandler
return "<h1>hello</h1>" # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html2") # $routeSetup="/html2"
def html2(): # $requestHandler
@app.route("/html2") # $ routeSetup="/html2"
def html2(): # $ requestHandler
# note that response saved in a variable intentionally -- we wan the annotations to
# show that we recognize the response creation, and not the return (hopefully). (and
# do the same in the following of the file)
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp = make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html3") # $routeSetup="/html3"
def html3(): # $requestHandler
resp = app.make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html3") # $ routeSetup="/html3"
def html3(): # $ requestHandler
resp = app.make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -30,50 +30,50 @@ def html3(): # $requestHandler
# https://flask.palletsprojects.com/en/1.1.x/api/#flask.Flask.make_response
@app.route("/html4") # $routeSetup="/html4"
def html4(): # $requestHandler
resp = Response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html4") # $ routeSetup="/html4"
def html4(): # $ requestHandler
resp = Response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html5") # $routeSetup="/html5"
def html5(): # $requestHandler
resp = Response(response="<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html5") # $ routeSetup="/html5"
def html5(): # $ requestHandler
resp = Response(response="<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html6") # $routeSetup="/html6"
def html6(): # $requestHandler
@app.route("/html6") # $ routeSetup="/html6"
def html6(): # $ requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp = Flask.response_class("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html7") # $routeSetup="/html7"
def html7(): # $requestHandler
@app.route("/html7") # $ routeSetup="/html7"
def html7(): # $ requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp = app.response_class("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html8") # $routeSetup="/html8"
def html8(): # $requestHandler
resp = make_response() # $HttpResponse mimetype=text/html
@app.route("/html8") # $ routeSetup="/html8"
def html8(): # $ requestHandler
resp = make_response() # $ HttpResponse mimetype=text/html
resp.set_data("<h1>hello</h1>") # $ MISSING: responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/jsonify") # $routeSetup="/jsonify"
def jsonify_route(): # $requestHandler
@app.route("/jsonify") # $ routeSetup="/jsonify"
def jsonify_route(): # $ requestHandler
x = "x"; y = "y"; z = "z"
if True:
import flask.json
resp = flask.json.jsonify(x, y, z=z) # $HttpResponse mimetype=application/json responseBody=x responseBody=y responseBody=z
resp = flask.json.jsonify(x, y, z=z) # $ HttpResponse mimetype=application/json responseBody=x responseBody=y responseBody=z
assert resp.mimetype == "application/json"
resp = app.json.response(x, y, z=z) # $HttpResponse mimetype=application/json responseBody=x responseBody=y responseBody=z
resp = app.json.response(x, y, z=z) # $ HttpResponse mimetype=application/json responseBody=x responseBody=y responseBody=z
assert resp.mimetype == "application/json"
resp = jsonify(x, y, z=z) # $ HttpResponse mimetype=application/json responseBody=x responseBody=y responseBody=z
@@ -83,24 +83,24 @@ def jsonify_route(): # $requestHandler
# Tricky return handling
################################################################################
@app.route("/tricky-return1") # $routeSetup="/tricky-return1"
def tricky_return1(): # $requestHandler
@app.route("/tricky-return1") # $ routeSetup="/tricky-return1"
def tricky_return1(): # $ requestHandler
if "raw" in request.args:
resp = "<h1>hellu</h1>"
else:
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $HttpResponse mimetype=text/html responseBody=resp
resp = make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ HttpResponse mimetype=text/html responseBody=resp
def helper():
if "raw" in request.args:
return "<h1>hellu</h1>"
else:
return make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/tricky-return2") # $routeSetup="/tricky-return2"
def tricky_return2(): # $requestHandler
@app.route("/tricky-return2") # $ routeSetup="/tricky-return2"
def tricky_return2(): # $ requestHandler
resp = helper()
return resp # $HttpResponse mimetype=text/html responseBody=resp
return resp # $ HttpResponse mimetype=text/html responseBody=resp
################################################################################
@@ -108,16 +108,16 @@ def tricky_return2(): # $requestHandler
################################################################################
@app.route("/content-type/response-modification1") # $routeSetup="/content-type/response-modification1"
def response_modification1(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/content-type/response-modification1") # $ routeSetup="/content-type/response-modification1"
def response_modification1(): # $ requestHandler
resp = make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.content_type = "text/plain" # $ MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/response-modification2") # $routeSetup="/content-type/response-modification2"
def response_modification2(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/content-type/response-modification2") # $ routeSetup="/content-type/response-modification2"
def response_modification2(): # $ requestHandler
resp = make_response("<h1>hello</h1>") # $ HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.headers["content-type"] = "text/plain" # $ headerWriteNameUnsanitized="content-type" headerWriteValue="text/plain" MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -126,61 +126,61 @@ def response_modification2(): # $requestHandler
# see https://werkzeug.palletsprojects.com/en/1.0.x/wrappers/#werkzeug.wrappers.Response
@app.route("/content-type/Response1") # $routeSetup="/content-type/Response1"
def Response1(): # $requestHandler
resp = Response("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
@app.route("/content-type/Response1") # $ routeSetup="/content-type/Response1"
def Response1(): # $ requestHandler
resp = Response("<h1>hello</h1>", mimetype="text/plain") # $ HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response2") # $routeSetup="/content-type/Response2"
def Response2(): # $requestHandler
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
@app.route("/content-type/Response2") # $ routeSetup="/content-type/Response2"
def Response2(): # $ requestHandler
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8") # $ HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response3") # $routeSetup="/content-type/Response3"
def Response3(): # $requestHandler
@app.route("/content-type/Response3") # $ routeSetup="/content-type/Response3"
def Response3(): # $ requestHandler
# content_type argument takes priority (and result is text/plain)
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8", mimetype="text/html") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8", mimetype="text/html") # $ HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response4") # $routeSetup="/content-type/Response4"
def Response4(): # $requestHandler
@app.route("/content-type/Response4") # $ routeSetup="/content-type/Response4"
def Response4(): # $ requestHandler
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/plain"}) # $ headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="Content-TYPE" headerWriteValue="text/plain" HttpResponse responseBody="<h1>hello</h1>" SPURIOUS: mimetype=text/html MISSING: mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response5") # $routeSetup="/content-type/Response5"
def Response5(): # $requestHandler
@app.route("/content-type/Response5") # $ routeSetup="/content-type/Response5"
def Response5(): # $ requestHandler
# content_type argument takes priority (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, content_type="text/plain; charset=utf-8") # $ headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="Content-TYPE" headerWriteValue="text/html" HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response6") # $routeSetup="/content-type/Response6"
def Response6(): # $requestHandler
@app.route("/content-type/Response6") # $ routeSetup="/content-type/Response6"
def Response6(): # $ requestHandler
# mimetype argument takes priority over header (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, mimetype="text/plain") # $ headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="Content-TYPE" headerWriteValue="text/html" HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Flask-response-class") # $routeSetup="/content-type/Flask-response-class"
def Flask_response_class(): # $requestHandler
@app.route("/content-type/Flask-response-class") # $ routeSetup="/content-type/Flask-response-class"
def Flask_response_class(): # $ requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
resp = Flask.response_class("<h1>hello</h1>", mimetype="text/plain") # $ HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/app-response-class") # $routeSetup="/content-type/app-response-class"
def app_response_class(): # $requestHandler
@app.route("/content-type/app-response-class") # $ routeSetup="/content-type/app-response-class"
def app_response_class(): # $ requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
resp = app.response_class("<h1>hello</h1>", mimetype="text/plain") # $ HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -192,8 +192,8 @@ def app_response_class(): # $requestHandler
################################################################################
@app.route("/redirect-simple") # $routeSetup="/redirect-simple"
def redirect_simple(): # $requestHandler
@app.route("/redirect-simple") # $ routeSetup="/redirect-simple"
def redirect_simple(): # $ requestHandler
next = request.args['next']
resp = redirect(next) # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation=next
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -206,13 +206,13 @@ def redirect_simple(): # $requestHandler
def unk():
return
@app.route("/setting_cookie") # $routeSetup="/setting_cookie"
def setting_cookie(): # $requestHandler
@app.route("/setting_cookie") # $ routeSetup="/setting_cookie"
def setting_cookie(): # $ requestHandler
resp = make_response() # $ HttpResponse mimetype=text/html
resp.set_cookie("key", "value") # $ CookieWrite CookieName="key" CookieValue="value" CookieSecure=false CookieHttpOnly=false CookieSameSite=Lax
resp.set_cookie(key="key", value="value") # $ CookieWrite CookieName="key" CookieValue="value" CookieSecure=false CookieHttpOnly=false CookieSameSite=Lax
resp.set_cookie(key="key", value="value", secure=True, httponly=True, samesite="Strict") # $ CookieWrite CookieName="key" CookieValue="value" CookieSecure=true CookieHttpOnly=true CookieSameSite=Strict
resp.set_cookie(key="key", value="value", secure=unk(), httponly=unk(), samesite=unk()) # $ CookieWrite CookieName="key" CookieValue="value"
resp.set_cookie(key="key", value="value", secure=unk(), httponly=unk(), samesite=unk()) # $ CookieWrite CookieName="key" CookieValue="value"
resp.headers.add("Set-Cookie", "key2=value2") # $ headerWriteNameUnsanitized="Set-Cookie" headerWriteValue="key2=value2" CookieWrite CookieRawHeader="key2=value2" CookieSecure=false CookieHttpOnly=false CookieSameSite=Lax
resp.delete_cookie("key3") # $ CookieWrite CookieName="key3"
resp.delete_cookie(key="key3") # $ CookieWrite CookieName="key3"
@@ -222,33 +222,33 @@ def setting_cookie(): # $requestHandler
# Headers
################################################################################
@app.route("/headers") # $routeSetup="/headers"
def headers(): # $requestHandler
@app.route("/headers") # $ routeSetup="/headers"
def headers(): # $ requestHandler
resp1 = Response() # $ HttpResponse mimetype=text/html
resp1.headers["X-MyHeader"] = "a" # $ headerWriteNameUnsanitized="X-MyHeader" headerWriteValue="a"
resp2 = make_response() # $ HttpResponse mimetype=text/html
resp2.headers["X-MyHeader"] = "aa" # $ headerWriteNameUnsanitized="X-MyHeader" headerWriteValue="aa"
resp2.headers.extend({"X-MyHeader2": "b"}) # $ headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="X-MyHeader2" headerWriteValue="b"
resp2.headers.extend({"X-MyHeader2": "b"}) # $ headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="X-MyHeader2" headerWriteValue="b"
resp3 = make_response("hello", 200, {"X-MyHeader3": "c"}) # $ HttpResponse mimetype=text/html responseBody="hello" headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="X-MyHeader3" headerWriteValue="c"
resp4 = make_response("hello", {"X-MyHeader4": "d"}) # $ HttpResponse mimetype=text/html responseBody="hello" headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="X-MyHeader4" headerWriteValue="d"
resp5 = Response(headers={"X-MyHeader5":"e"}) # $ HttpResponse mimetype=text/html headerWriteBulk=Dict headerWriteBulkUnsanitized=name headerWriteBulkUnsanitized=name headerWriteNameUnsanitized="X-MyHeader5" headerWriteValue="e"
return resp5 # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp5
@app.route("/werkzeug-headers") # $routeSetup="/werkzeug-headers"
def werkzeug_headers(): # $requestHandler
@app.route("/werkzeug-headers") # $ routeSetup="/werkzeug-headers"
def werkzeug_headers(): # $ requestHandler
response = Response() # $ HttpResponse mimetype=text/html
headers = Headers()
headers.add("X-MyHeader1", "a") # $ headerWriteNameUnsanitized="X-MyHeader1" headerWriteValue="a"
headers.add_header("X-MyHeader2", "b") # $ headerWriteNameUnsanitized="X-MyHeader2" headerWriteValue="b"
headers.set("X-MyHeader3", "c") # $ headerWriteNameUnsanitized="X-MyHeader3" headerWriteValue="c"
headers.setdefault("X-MyHeader4", "d") # $ headerWriteNameUnsanitized="X-MyHeader4" headerWriteValue="d"
headers.__setitem__("X-MyHeader5", "e") # $ headerWriteNameUnsanitized="X-MyHeader5" headerWriteValue="e"
headers["X-MyHeader6"] = "f" # $ headerWriteNameUnsanitized="X-MyHeader6" headerWriteValue="f"
headers.set("X-MyHeader3", "c") # $ headerWriteNameUnsanitized="X-MyHeader3" headerWriteValue="c"
headers.setdefault("X-MyHeader4", "d") # $ headerWriteNameUnsanitized="X-MyHeader4" headerWriteValue="d"
headers.__setitem__("X-MyHeader5", "e") # $ headerWriteNameUnsanitized="X-MyHeader5" headerWriteValue="e"
headers["X-MyHeader6"] = "f" # $ headerWriteNameUnsanitized="X-MyHeader6" headerWriteValue="f"
h1 = {"X-MyHeader7": "g"} # $ headerWriteNameUnsanitized="X-MyHeader7" headerWriteValue="g"
headers.extend(h1) # $ headerWriteBulk=h1 headerWriteBulkUnsanitized=name
headers.extend(h1) # $ headerWriteBulk=h1 headerWriteBulkUnsanitized=name
h2 = [("X-MyHeader8", "h")] # $ headerWriteNameUnsanitized="X-MyHeader8" headerWriteValue="h"
headers.extend(h2) # $ headerWriteBulk=h2 headerWriteBulkUnsanitized=name
response.headers = headers
headers.extend(h2) # $ headerWriteBulk=h2 headerWriteBulkUnsanitized=name
response.headers = headers
return response # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=response
################################################################################

View File

@@ -5,28 +5,28 @@ app = Flask(__name__)
SOME_ROUTE = "/some/route"
@app.route(SOME_ROUTE) # $routeSetup="/some/route"
def some_route(): # $requestHandler
return make_response("some_route") # $HttpResponse
@app.route(SOME_ROUTE) # $ routeSetup="/some/route"
def some_route(): # $ requestHandler
return make_response("some_route") # $ HttpResponse
def index(): # $requestHandler
return make_response("index") # $HttpResponse
app.add_url_rule('/index', 'index', index) # $routeSetup="/index"
def index(): # $ requestHandler
return make_response("index") # $ HttpResponse
app.add_url_rule('/index', 'index', index) # $ routeSetup="/index"
# We don't support this yet, and I think that's OK
def later_set(): # $ MISSING: requestHandler
return make_response("later_set") # $HttpResponse
app.add_url_rule('/later-set', 'later_set', view_func=None) # $routeSetup="/later-set"
return make_response("later_set") # $ HttpResponse
app.add_url_rule('/later-set', 'later_set', view_func=None) # $ routeSetup="/later-set"
app.view_functions['later_set'] = later_set
# We don't want to execute this at runtime (since program will crash). Just using
# `False` makes our analysis skip it, so here's a workaround :D
if __file__ == "False":
@app.route(UNKNOWN_ROUTE) # $routeSetup
def unkown_route(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("unkown_route") # $HttpResponse
@app.route(UNKNOWN_ROUTE) # $ routeSetup
def unkown_route(foo, bar): # $ requestHandler routedParameter=foo routedParameter=bar
return make_response("unkown_route") # $ HttpResponse
# View
#
@@ -40,7 +40,7 @@ class ShowUser(View):
def dispatch_request(self, user_id): # $ requestHandler routedParameter=user_id
return "user_id: {}".format(user_id) # $ HttpResponse
app.add_url_rule("/basic/user/<int:user_id>", view_func=ShowUser.as_view('show_user')) # $routeSetup="/basic/user/<int:user_id>"
app.add_url_rule("/basic/user/<int:user_id>", view_func=ShowUser.as_view('show_user')) # $ routeSetup="/basic/user/<int:user_id>"
class WithoutKnownRoute1(View):
@@ -81,9 +81,9 @@ class UserAPI(MethodView):
user_view = UserAPI.as_view("user_api")
app.add_url_rule("/users/", defaults={"user_id": None}, view_func=user_view, methods=["GET",]) # $routeSetup="/users/"
app.add_url_rule("/users/", view_func=user_view, methods=["POST",]) # $routeSetup="/users/"
app.add_url_rule("/users/<int:user_id>", view_func=user_view, methods=["GET", "PUT", "DELETE"]) # $routeSetup="/users/<int:user_id>"
app.add_url_rule("/users/", defaults={"user_id": None}, view_func=user_view, methods=["GET",]) # $ routeSetup="/users/"
app.add_url_rule("/users/", view_func=user_view, methods=["POST",]) # $ routeSetup="/users/"
app.add_url_rule("/users/<int:user_id>", view_func=user_view, methods=["GET", "PUT", "DELETE"]) # $ routeSetup="/users/<int:user_id>"
class WithoutKnownRoute2(MethodView):

View File

@@ -1,6 +1,6 @@
from flask import Flask, request
app = Flask(__name__)
@app.route("/save-uploaded-file") # $routeSetup="/save-uploaded-file"
def test_taint(): # $requestHandler
@app.route("/save-uploaded-file") # $ routeSetup="/save-uploaded-file"
def test_taint(): # $ requestHandler
request.files['key'].save("path") # $ getAPathArgument="path"

View File

@@ -1,5 +1,5 @@
import flask
import flask
class MySessionInterface(flask.sessions.SessionInterface):
def open_session(self, app, request):
ensure_tainted(request) # $tainted
ensure_tainted(request) # $ tainted

View File

@@ -1,8 +1,8 @@
from flask import Flask, request, render_template_string, stream_template_string
app = Flask(__name__)
@app.route("/test_taint/<name>/<int:number>") # $routeSetup="/test_taint/<name>/<int:number>"
def test_taint(name = "World!", number="0", foo="foo"): # $requestHandler routedParameter=name routedParameter=number
@app.route("/test_taint/<name>/<int:number>") # $ routeSetup="/test_taint/<name>/<int:number>"
def test_taint(name = "World!", number="0", foo="foo"): # $ requestHandler routedParameter=name routedParameter=number
ensure_tainted(name, number) # $ tainted
ensure_not_tainted(foo)
@@ -245,8 +245,8 @@ def test_taint(name = "World!", number="0", foo="foo"): # $requestHandler route
ensure_not_tainted(x)
@app.route("/debug/<foo>/<bar>", methods=['GET']) # $routeSetup="/debug/<foo>/<bar>"
def debug(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
@app.route("/debug/<foo>/<bar>", methods=['GET']) # $ routeSetup="/debug/<foo>/<bar>"
def debug(foo, bar): # $ requestHandler routedParameter=foo routedParameter=bar
print("request.view_args", request.view_args)
print("request.headers {!r}".format(request.headers))
@@ -254,20 +254,20 @@ def debug(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
print("request.pragma {!r}".format(request.pragma))
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/stream", methods=['POST']) # $routeSetup="/stream"
def stream(): # $requestHandler
@app.route("/stream", methods=['POST']) # $ routeSetup="/stream"
def stream(): # $ requestHandler
print(request.path)
s = request.stream
print(s)
# just works :)
print(s.read())
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/input_stream", methods=['POST']) # $routeSetup="/input_stream"
def input_stream(): # $requestHandler
@app.route("/input_stream", methods=['POST']) # $ routeSetup="/input_stream"
def input_stream(): # $ requestHandler
print(request.path)
s = request.input_stream
print(s)
@@ -275,38 +275,38 @@ def input_stream(): # $requestHandler
# be handled manually
print(s.read())
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/form", methods=['POST']) # $routeSetup="/form"
def form(): # $requestHandler
@app.route("/form", methods=['POST']) # $ routeSetup="/form"
def form(): # $ requestHandler
print(request.path)
print("request.form", request.form)
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/cache_control", methods=['POST']) # $routeSetup="/cache_control"
def cache_control(): # $requestHandler
@app.route("/cache_control", methods=['POST']) # $ routeSetup="/cache_control"
def cache_control(): # $ requestHandler
print(request.path)
print("request.cache_control.max_age", request.cache_control.max_age, type(request.cache_control.max_age))
print("request.cache_control.max_stale", request.cache_control.max_stale, type(request.cache_control.max_stale))
print("request.cache_control.min_fresh", request.cache_control.min_fresh, type(request.cache_control.min_fresh))
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/file_upload", methods=['POST']) # $routeSetup="/file_upload"
def file_upload(): # $requestHandler
@app.route("/file_upload", methods=['POST']) # $ routeSetup="/file_upload"
def file_upload(): # $ requestHandler
print(request.path)
for k,v in request.files.items():
print(k, v, v.name, v.filename, v.stream)
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
@app.route("/args", methods=['GET']) # $routeSetup="/args"
def args(): # $requestHandler
@app.route("/args", methods=['GET']) # $ routeSetup="/args"
def args(): # $ requestHandler
print(request.path)
print("request.args", request.args)
return 'ok' # $HttpResponse
return 'ok' # $ HttpResponse
# curl --header "My-Header: some-value" http://localhost:5000/debug/fooval/barval
# curl --header "Pragma: foo, bar" --header "Pragma: stuff, foo" http://localhost:5000/debug/fooval/barval

View File

@@ -1,16 +1,16 @@
from flask import Flask, Response, stream_with_context, render_template_string, stream_template_string
app = Flask(__name__)
@app.route("/a") # $routeSetup="/a"
@app.route("/a") # $ routeSetup="/a"
def a(): # $ requestHandler
r = render_template_string("abc") # $ templateConstruction="abc"
return r # $ HttpResponse
@app.route("/b") # $routeSetup="/b"
@app.route("/b") # $ routeSetup="/b"
def b(): # $ requestHandler
s = stream_template_string("abc") # $ templateConstruction="abc"
r = Response(stream_with_context(s)) # $ HttpResponse
return r # $ HttpResponse
if __name__ == "__main__":
app.run(debug=True)
app.run(debug=True)

View File

@@ -5,35 +5,35 @@ see https://www.pyinvoke.org/
import invoke
invoke.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
invoke.run(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
def with_sudo():
invoke.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
invoke.sudo(command="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
def manual_context():
c = invoke.Context()
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# invoke.Context is just an alias for invoke.context.Context
c2 = invoke.context.Context()
c2.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c2.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
manual_context()
def foo_helper(c):
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# for use with the 'invoke' command-line tool
@invoke.task
def foo(c):
# 'c' is a invoke.context.Context
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
foo_helper(c)

View File

@@ -18,7 +18,7 @@ add_salary = ("INSERT INTO salaries "
data_employee = ('Geert', 'Vanderkelen', tomorrow, 'M', date(1977, 6, 14))
# Insert new employee
cursor.execute(add_employee, data_employee) # $getSql=add_employee
cursor.execute(add_employee, data_employee) # $ getSql=add_employee
emp_no = cursor.lastrowid
# Insert salary information
@@ -28,10 +28,10 @@ data_salary = {
'from_date': tomorrow,
'to_date': date(9999, 1, 1),
}
cursor.execute(add_salary, data_salary) # $getSql=add_salary
cursor.execute(add_salary, data_salary) # $ getSql=add_salary
# Make sure data is committed to the database
cnx.commit()
cursor.close()
cnx.close()
cnx.close()

View File

@@ -4,4 +4,4 @@ db=MySQLdb.connect(passwd="moonpie",db="thangs")
c=db.cursor()
max_price=5
c.execute("some sql", (max_price,)) # $getSql="some sql"
c.execute("some sql", (max_price,)) # $ getSql="some sql"

View File

@@ -2,79 +2,79 @@ import pandas as pd
import sqlite3
df = pd.DataFrame({'temp_c': [17.0, 25.0]}, index=['Portland', 'Berkeley'])
df.sample().query("query") # $getCode="query"
df.mod().query("query") # $getCode="query"
pd.eval("pythonExpr", target=df) # $getCode="pythonExpr"
df.sample().query("query") # $ getCode="query"
df.mod().query("query") # $ getCode="query"
pd.eval("pythonExpr", target=df) # $ getCode="pythonExpr"
df = pd.read_csv("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.copy().query("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df.copy().query("query") # $ getCode="query"
df = pd.read_fwf("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_pickle("filepath") # $ decodeInput="filepath" decodeOutput=pd.read_pickle(..) decodeFormat=pickle decodeMayExecuteInput
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_table("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_clipboard("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_excel("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_html("filepath")
df[0].query("query") # $getCode="query"
df[0].query("query") # $ getCode="query"
df = pd.read_xml("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_parquet("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_orc("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_spss("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_sql_table("filepath", 'postgres:///db_name')
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
connection = sqlite3.connect("pets.db")
df = pd.read_sql_query("sql query", connection) # $getSql="sql query"
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df = pd.read_sql_query("sql query", connection) # $ getSql="sql query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_sql("sql query", connection) # $getSql="sql query"
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df = pd.read_sql("sql query", connection) # $ getSql="sql query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_gbq("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_stata("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_sas("filepath")
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"
df = pd.read_sas("filepath", iterator=True, chunksize=1)
df.query("query")
df = pd.read_sas("filepath", iterator=False, chunksize=1)
@@ -82,5 +82,5 @@ df.query("query")
df = pd.read_sas("filepath", iterator=True, chunksize=None)
df.query("query")
df = pd.read_sas("filepath", iterator=False, chunksize=None)
df.query("query") # $getCode="query"
df.eval("query") # $getCode="query"
df.query("query") # $ getCode="query"
df.eval("query") # $ getCode="query"

View File

@@ -8,4 +8,4 @@ paramiko_ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
paramiko_ssh_client.connect(hostname="127.0.0.1", port="22", username="ssh_user_name", pkey="k", timeout=11, banner_timeout=200)
cmd = "cmd"
paramiko_ssh_client.connect('hostname', username='user', password='yourpassword', sock=paramiko.ProxyCommand(cmd)) # $getCommand=cmd
paramiko_ssh_client.connect('hostname', username='user', password='yourpassword', sock=paramiko.ProxyCommand(cmd)) # $ getCommand=cmd

View File

@@ -8,7 +8,7 @@ def ignore(*args, **kwargs): pass
ensure_tainted = ensure_not_tainted = ignore
@view_config(route_name="test1") # $ routeSetup
def test1(request): # $ requestHandler
def test1(request): # $ requestHandler
ensure_tainted(
request, # $ tainted
@@ -26,7 +26,7 @@ def test1(request): # $ requestHandler
request.host_url, # $ tainted
request.if_match, # $ tainted
request.if_none_match, # $ tainted
request.if_range, # $ tainted
request.if_range, # $ tainted
request.pragma, # $ tainted
request.range, # $ tainted
request.referer, # $ tainted
@@ -71,11 +71,11 @@ def test1(request): # $ requestHandler
request.copy_get(), # $ tainted
request.copy().GET['a'], # $ tainted
request.copy_get().body # $ tainted
)
)
return Response("Ok") # $ HttpResponse responseBody="Ok" mimetype=text/html
def test2(request): # $ requestHandler
def test2(request): # $ requestHandler
ensure_tainted(request) # $ tainted
resp = Response("Ok", content_type="text/plain") # $ HttpResponse responseBody="Ok" mimetype=text/plain
@@ -83,7 +83,7 @@ def test2(request): # $ requestHandler
return resp
@view_config(route_name="test3", renderer="string") # $ routeSetup
def test3(ctx, req): # $ requestHandler
def test3(ctx, req): # $ requestHandler
ensure_tainted(req) # $ tainted
resp = req.response # $ HttpResponse mimetype=text/html
resp.set_cookie("hi", "there") # $ CookieWrite CookieName="hi" CookieValue="there" CookieSecure=false CookieHttpOnly=false CookieSameSite=Lax
@@ -93,13 +93,13 @@ def test3(ctx, req): # $ requestHandler
@view_config(route_name="test4", renderer="string") # $ routeSetup
def test4(request): # $ requestHandler
a = HTTPMultipleChoices("redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
b = HTTPMovedPermanently(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
c = HTTPFound(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
d = HTTPSeeOther(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
e = HTTPUseProxy(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
f = HTTPTemporaryRedirect(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
g = HTTPPermanentRedirect(location="redirect") # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
a = HTTPMultipleChoices("redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
b = HTTPMovedPermanently(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
c = HTTPFound(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
d = HTTPSeeOther(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
e = HTTPUseProxy(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
f = HTTPTemporaryRedirect(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
g = HTTPPermanentRedirect(location="redirect") # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation="redirect"
raise a
# Unsupported cases
@@ -121,7 +121,7 @@ class Test6:
self.req = request
def test6method(self): # $ MISSING: requestHandler
ensure_not_tainted(self)
ensure_not_tainted(self)
ensure_tainted(self.req) # $ MISSING: tainted
return "Ok" # $ MISSING: HttpResponse mimetype=text/html responseBody="Ok"
@@ -132,7 +132,7 @@ class Test6:
self.req = request
def __call__(self): # $ MISSING: requestHandler
ensure_not_tainted(self)
ensure_not_tainted(self)
ensure_tainted(self.req) # $ MISSING: tainted
return "Ok" # $ MISSING: HttpResponse mimetype=text/html responseBody="Ok"

View File

@@ -18,5 +18,5 @@ from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls), # $ routeSetup="admin/"
path("", include("testapp.urls")), # $routeSetup=""
path("", include("testapp.urls")), # $ routeSetup=""
]

View File

@@ -1,2 +1,2 @@
# exec statement is Python 2 specific
exec "print(42)" # $getCode="print(42)"
exec "print(42)" # $ getCode="print(42)"

View File

@@ -1,14 +1,14 @@
########################################
import os
os.popen2("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen2("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.popen3("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.popen4("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.popen2(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen2(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.popen3(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.popen4(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
# os.popen does not support keyword arguments, so this is a TypeError
os.popen(cmd="cmd1; cmd2")
@@ -16,21 +16,21 @@ os.popen(cmd="cmd1; cmd2")
########################################
import platform
platform.popen("cmd1; cmd2") # $getCommand="cmd1; cmd2"
platform.popen(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
platform.popen("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
platform.popen(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
########################################
# popen2 was deprecated in Python 2.6, but still available in Python 2.7
import popen2
popen2.popen2("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen2("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.popen3("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.popen4("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.Popen3("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.Popen4("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.popen2(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen2(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.popen3(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.popen4(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.Popen3(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"
popen2.Popen4(cmd="cmd1; cmd2") # $ getCommand="cmd1; cmd2"

View File

@@ -1,4 +1,4 @@
import builtins
# exec being part of builtins is Python 3 only
builtins.exec("print(42)") # $getCode="print(42)"
builtins.exec("print(42)") # $ getCode="print(42)"

View File

@@ -9,16 +9,16 @@ if sys.version_info[0] == 3:
if sys.version_info[0] == 2:
import __builtin__ as builtins
exec("print(42)") # $getCode="print(42)"
eval("print(42)") # $getCode="print(42)"
exec("print(42)") # $ getCode="print(42)"
eval("print(42)") # $ getCode="print(42)"
builtins.eval("print(42)") # $getCode="print(42)"
builtins.eval("print(42)") # $ getCode="print(42)"
cmd = compile("print(42)", "<filename>", "exec")
exec(cmd) # $getCode=cmd
exec(cmd) # $ getCode=cmd
cmd = builtins.compile("print(42)", "<filename>", "exec")
exec(cmd) # $getCode=cmd
exec(cmd) # $ getCode=cmd
# ------------------------------------------------------------------------------
# taint related

View File

@@ -2,17 +2,17 @@ import os.path
path = "un\\normalized/path"
p1 = os.path.normpath(path) # $pathNormalization
p2 = os.path.normpath(path=path) # $pathNormalization
p1 = os.path.normpath(path) # $ pathNormalization
p2 = os.path.normpath(path=path) # $ pathNormalization
np = os.path.normpath
p3 = np(path) # $pathNormalization
p4 = np(path=path) # $pathNormalization
p3 = np(path) # $ pathNormalization
p4 = np(path=path) # $ pathNormalization
def normalize(path):
return os.path.normpath(path) # $pathNormalization
return os.path.normpath(path) # $ pathNormalization
p5 = normalize(path)

View File

@@ -1,7 +1,7 @@
s = "taintedString"
if s.startswith("tainted"):
s2 = s # $SafeAccessCheck=s
s2 = s # $ SafeAccessCheck=s
pass
sw = s.startswith

View File

@@ -13,8 +13,8 @@ import os
# can't use a string literal with spaces in the tags of an InlineExpectationsTest, so using variables :|
os.popen("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.system("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
os.system("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
def os_members():
@@ -24,8 +24,8 @@ def os_members():
# :|
from os import popen, system
popen("cmd1; cmd2") # $getCommand="cmd1; cmd2"
system("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
system("cmd1; cmd2") # $ getCommand="cmd1; cmd2"
########################################
@@ -85,62 +85,62 @@ os.posix_spawnp(path="path", argv=["<progname>", "arg0"], env=env) # $ getComma
import subprocess
subprocess.Popen("cmd1; cmd2", shell=True) # $getCommand="cmd1; cmd2"
subprocess.Popen("cmd1; cmd2", shell="truthy string") # $getCommand="cmd1; cmd2"
subprocess.Popen(["cmd1; cmd2", "shell-arg"], shell=True) # $getCommand="cmd1; cmd2"
subprocess.Popen("cmd1; cmd2", shell=True, executable="/bin/bash") # $getCommand="cmd1; cmd2" getCommand="/bin/bash"
subprocess.Popen("cmd1; cmd2", shell=True) # $ getCommand="cmd1; cmd2"
subprocess.Popen("cmd1; cmd2", shell="truthy string") # $ getCommand="cmd1; cmd2"
subprocess.Popen(["cmd1; cmd2", "shell-arg"], shell=True) # $ getCommand="cmd1; cmd2"
subprocess.Popen("cmd1; cmd2", shell=True, executable="/bin/bash") # $ getCommand="cmd1; cmd2" getCommand="/bin/bash"
subprocess.Popen("executable") # $getCommand="executable"
subprocess.Popen(["executable", "arg0"]) # $getCommand="executable"
subprocess.Popen("<progname>", executable="executable") # $getCommand="executable"
subprocess.Popen(["<progname>", "arg0"], executable="executable") # $getCommand="executable"
subprocess.Popen("executable") # $ getCommand="executable"
subprocess.Popen(["executable", "arg0"]) # $ getCommand="executable"
subprocess.Popen("<progname>", executable="executable") # $ getCommand="executable"
subprocess.Popen(["<progname>", "arg0"], executable="executable") # $ getCommand="executable"
# call/check_call/check_output/run all work like Popen from a command execution point of view
subprocess.call(["executable", "arg0"]) # $getCommand="executable"
subprocess.check_call(["executable", "arg0"]) # $getCommand="executable"
subprocess.check_output(["executable", "arg0"]) # $getCommand="executable"
subprocess.run(["executable", "arg0"]) # $getCommand="executable"
subprocess.call(["executable", "arg0"]) # $ getCommand="executable"
subprocess.check_call(["executable", "arg0"]) # $ getCommand="executable"
subprocess.check_output(["executable", "arg0"]) # $ getCommand="executable"
subprocess.run(["executable", "arg0"]) # $ getCommand="executable"
########################################
# actively using known shell as the executable
subprocess.Popen(["/bin/sh", "-c", "vuln"]) # $getCommand="/bin/sh" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/bash", "-c", "vuln"]) # $getCommand="/bin/bash" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/dash", "-c", "vuln"]) # $getCommand="/bin/dash" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/zsh", "-c", "vuln"]) # $getCommand="/bin/zsh" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/sh", "-c", "vuln"]) # $ getCommand="/bin/sh" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/bash", "-c", "vuln"]) # $ getCommand="/bin/bash" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/dash", "-c", "vuln"]) # $ getCommand="/bin/dash" MISSING: getCommand="vuln"
subprocess.Popen(["/bin/zsh", "-c", "vuln"]) # $ getCommand="/bin/zsh" MISSING: getCommand="vuln"
subprocess.Popen(["sh", "-c", "vuln"]) # $getCommand="sh" MISSING: getCommand="vuln"
subprocess.Popen(["bash", "-c", "vuln"]) # $getCommand="bash" MISSING: getCommand="vuln"
subprocess.Popen(["dash", "-c", "vuln"]) # $getCommand="dash" MISSING: getCommand="vuln"
subprocess.Popen(["zsh", "-c", "vuln"]) # $getCommand="zsh" MISSING: getCommand="vuln"
subprocess.Popen(["sh", "-c", "vuln"]) # $ getCommand="sh" MISSING: getCommand="vuln"
subprocess.Popen(["bash", "-c", "vuln"]) # $ getCommand="bash" MISSING: getCommand="vuln"
subprocess.Popen(["dash", "-c", "vuln"]) # $ getCommand="dash" MISSING: getCommand="vuln"
subprocess.Popen(["zsh", "-c", "vuln"]) # $ getCommand="zsh" MISSING: getCommand="vuln"
# Check that we don't consider ANY argument a command injection sink
subprocess.Popen(["sh", "/bin/python"]) # $getCommand="sh"
subprocess.Popen(["sh", "/bin/python"]) # $ getCommand="sh"
subprocess.Popen(["cmd.exe", "/c", "vuln"]) # $getCommand="cmd.exe" MISSING: getCommand="vuln"
subprocess.Popen(["cmd.exe", "/C", "vuln"]) # $getCommand="cmd.exe" MISSING: getCommand="vuln"
subprocess.Popen(["cmd", "/c", "vuln"]) # $getCommand="cmd" MISSING: getCommand="vuln"
subprocess.Popen(["cmd", "/C", "vuln"]) # $getCommand="cmd" MISSING: getCommand="vuln"
subprocess.Popen(["cmd.exe", "/c", "vuln"]) # $ getCommand="cmd.exe" MISSING: getCommand="vuln"
subprocess.Popen(["cmd.exe", "/C", "vuln"]) # $ getCommand="cmd.exe" MISSING: getCommand="vuln"
subprocess.Popen(["cmd", "/c", "vuln"]) # $ getCommand="cmd" MISSING: getCommand="vuln"
subprocess.Popen(["cmd", "/C", "vuln"]) # $ getCommand="cmd" MISSING: getCommand="vuln"
subprocess.Popen(["<progname>", "-c", "vuln"], executable="/bin/bash") # $getCommand="/bin/bash" MISSING: getCommand="vuln"
subprocess.Popen(["<progname>", "-c", "vuln"], executable="/bin/bash") # $ getCommand="/bin/bash" MISSING: getCommand="vuln"
if UNKNOWN:
os.execl("/bin/sh", "<progname>", "-c", "vuln") # $getCommand="/bin/sh" getAPathArgument="/bin/sh" MISSING: getCommand="vuln"
os.execl("/bin/sh", "<progname>", "-c", "vuln") # $ getCommand="/bin/sh" getAPathArgument="/bin/sh" MISSING: getCommand="vuln"
os.spawnl(os.P_WAIT, "/bin/sh", "<progname>", "-c", "vuln") # $getCommand="/bin/sh" getAPathArgument="/bin/sh" MISSING: getCommand="vuln"
os.spawnl(os.P_WAIT, "/bin/sh", "<progname>", "-c", "vuln") # $ getCommand="/bin/sh" getAPathArgument="/bin/sh" MISSING: getCommand="vuln"
########################################
# Passing arguments by reference
args = ["/bin/sh", "-c", "vuln"]
subprocess.Popen(args) # $getCommand=args
subprocess.Popen(args) # $ getCommand=args
args = "<progname>"
use_shell = False
exe = "executable"
subprocess.Popen(args, shell=use_shell, executable=exe) # $getCommand=exe
subprocess.Popen(args, shell=use_shell, executable=exe) # $ getCommand=exe
################################################################################
@@ -165,14 +165,14 @@ cmd = "sh -c " + wrong_use
import asyncio
from asyncio import subprocess
asyncio.run(asyncio.create_subprocess_exec("executable", "arg0")) # $getCommand="executable" getAPathArgument="executable"
asyncio.run(subprocess.create_subprocess_exec("executable", "arg0")) # $getCommand="executable" getAPathArgument="executable"
asyncio.run(asyncio.create_subprocess_exec("executable", "arg0")) # $ getCommand="executable" getAPathArgument="executable"
asyncio.run(subprocess.create_subprocess_exec("executable", "arg0")) # $ getCommand="executable" getAPathArgument="executable"
loop = asyncio.new_event_loop()
loop.run_until_complete(loop.subprocess_exec(asyncio.SubprocessProtocol, "executable", "arg0")) # $getCommand="executable" getAPathArgument="executable"
loop.run_until_complete(loop.subprocess_exec(asyncio.SubprocessProtocol, "executable", "arg0")) # $ getCommand="executable" getAPathArgument="executable"
asyncio.run(asyncio.create_subprocess_shell("shell_command")) # $getCommand="shell_command" getAPathArgument="shell_command"
asyncio.run(subprocess.create_subprocess_shell("shell_command")) # $getCommand="shell_command" getAPathArgument="shell_command"
asyncio.run(asyncio.create_subprocess_shell("shell_command")) # $ getCommand="shell_command" getAPathArgument="shell_command"
asyncio.run(subprocess.create_subprocess_shell("shell_command")) # $ getCommand="shell_command" getAPathArgument="shell_command"
loop = asyncio.get_running_loop()
loop.run_until_complete(loop.subprocess_shell(asyncio.SubprocessProtocol, "shell_command")) # $getCommand="shell_command" getAPathArgument="shell_command"
loop.run_until_complete(loop.subprocess_shell(asyncio.SubprocessProtocol, "shell_command")) # $ getCommand="shell_command" getAPathArgument="shell_command"

View File

@@ -1,16 +1,16 @@
import yaml
# Unsafe:
yaml.load(payload) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(stream=payload) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.Loader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(stream=payload) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.Loader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load(payload) # $ decodeInput=payload decodeOutput=yaml.unsafe_load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.full_load(payload) # $ decodeInput=payload decodeOutput=yaml.full_load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
yaml.load(payload, yaml.SafeLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, Loader=yaml.SafeLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.BaseLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, Loader=yaml.SafeLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.BaseLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.safe_load(payload) # $ decodeInput=payload decodeOutput=yaml.safe_load(..) decodeFormat=YAML
################################################################################
@@ -34,5 +34,5 @@ yaml.load(payload, yaml.CLoader) # $ decodeInput=payload decodeOutput=yaml.load
yaml.load(payload, yaml.CFullLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
yaml.load(payload, yaml.CSafeLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.CBaseLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.CSafeLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.CBaseLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML

View File

@@ -10,7 +10,7 @@ import re
# Comments indicate the found locations relative to the call to `compile`.
# plain string
re.compile( # $location=1:2
re.compile( # $ location=1:2
'[this] is a test'
)
@@ -97,4 +97,4 @@ r'\A[this] is a test'
re.compile( # $ location=1:2 SPURIOUS:location=1:23 MISSING:location=2:6
'[this] is a test\
and [this] is another test'
)
)

View File

@@ -61,35 +61,35 @@ try:
val.attr
except Exception:
print (2)
except AttributeError: # $Alert[py/unreachable-except]
except AttributeError: # $ Alert[py/unreachable-except]
print (3)
class MyExc(ValueError):
pass
pass
try:
pass
except ValueError:
pass
except MyExc: # $MISSING:Alert[py/unreachable-except] # Missing due to dataflow limitiation preventing MyExc from being tracked here.
pass
pass
except MyExc: # $ MISSING:Alert[py/unreachable-except] # Missing due to dataflow limitiation preventing MyExc from being tracked here.
pass
class MyBaseExc(Exception):
pass
pass
class MySubExc(MyBaseExc):
pass
pass
try:
pass
except MyBaseExc:
pass
except MySubExc: # $MISSING:Alert[py/unreachable-except] # Missing due to dataflow limitation preventing MyExc from being tracked here.
pass
pass
except MySubExc: # $ MISSING:Alert[py/unreachable-except] # Missing due to dataflow limitation preventing MyExc from being tracked here.
pass
except Exception:
pass
#Catch BaseException
def catch_base_exception():
try:
@@ -97,13 +97,13 @@ def catch_base_exception():
except BaseException:
#Consumes KeyboardInterrupt
pass
def catch_base_exception_ok():
try:
illegal_raise()
except BaseException:
raise
def legal_handler1():
try:
illegal_raise()

View File

@@ -14,17 +14,17 @@ class Normal(object):
# not ok
@classmethod
def n_cmethod(self): # $shouldBeCls
def n_cmethod(self): # $ shouldBeCls
pass
# not ok
@classmethod
def n_cmethod2(): # $shouldBeCls
def n_cmethod2(): # $ shouldBeCls
pass
@classmethod
@id
def n_dec(any_name): # $shouldBeCls
def n_dec(any_name): # $ shouldBeCls
pass
@@ -34,13 +34,13 @@ class Class(type):
def __init__(cls):
pass
def c_method(y): # $shouldBeCls
def c_method(y): # $ shouldBeCls
pass
def c_ok(cls):
pass
# technically we could alert on mixing self for metaclasses with cls for metaclasses in the same codebase,
# technically we could alert on mixing self for metaclasses with cls for metaclasses in the same codebase,
# but it's probably not too significant.
def c_self_ok(self):
pass
@@ -48,13 +48,13 @@ class Class(type):
class NonSelf(object):
def __init__(x): # $shouldBeSelf
def __init__(x): # $ shouldBeSelf
pass
def s_method(y): # $shouldBeSelf
def s_method(y): # $ shouldBeSelf
pass
def s_method2(): # $shouldBeSelf
def s_method2(): # $ shouldBeSelf
pass
def s_ok(self):
@@ -68,7 +68,7 @@ class NonSelf(object):
def s_cmethod(cls):
pass
# we allow methods that are used in class initialization, but only detect this case when they are called.
# we allow methods that are used in class initialization, but only detect this case when they are called.
def s_smethod2(ok): # $ SPURIOUS: shouldBeSelf
pass
s_smethod2 = staticmethod(s_smethod2)
@@ -123,7 +123,7 @@ Z().meth(0)
def weird_decorator(f):
def g(self):
return f()
return g
return g
class B:
@weird_decorator
@@ -152,7 +152,7 @@ class SpecialMethodNames(object):
from dataclasses import dataclass, field
@dataclass
@dataclass
class A:
# Lambdas used in initilisation aren't methods.
x: int = field(default_factory = lambda: 2)
x: int = field(default_factory = lambda: 2)

View File

@@ -21,13 +21,13 @@ class Base(object):
class Derived(Base):
def meth1(self, spam): # $Alert[py/inheritance/signature-mismatch] # Has 1 more arg, base called in Base.foo
def meth1(self, spam): # $ Alert[py/inheritance/signature-mismatch] # Has 1 more arg, base called in Base.foo
pass
def meth2(self): # $Alert[py/inheritance/signature-mismatch] # Has 1 fewer arg, base called in Base.foo
def meth2(self): # $ Alert[py/inheritance/signature-mismatch] # Has 1 fewer arg, base called in Base.foo
pass
def meth3(self, eggs): # $Alert[py/inheritance/signature-mismatch] # Has 1 more arg. Method is not called.
def meth3(self, eggs): # $ Alert[py/inheritance/signature-mismatch] # Has 1 more arg. Method is not called.
pass
def bar(self):
@@ -66,12 +66,12 @@ class BlameBase(object):
class Correct1(BlameBase):
def meth(self, arg): # $Alert[py/inheritance/signature-mismatch] # Has 1 more arg. The incorrect-overridden-method query would alert for the base method in this case.
def meth(self, arg): # $ Alert[py/inheritance/signature-mismatch] # Has 1 more arg. The incorrect-overridden-method query would alert for the base method in this case.
pass
class Correct2(BlameBase):
def meth(self, arg): # $Alert[py/inheritance/signature-mismatch] # Has 1 more arg
def meth(self, arg): # $ Alert[py/inheritance/signature-mismatch] # Has 1 more arg
pass
c = Correct2()
@@ -122,28 +122,28 @@ class Base2:
class Derrived2(Base2):
def meth1(self): pass # $Alert[py/inheritance/signature-mismatch] # Weak mismatch (base may be called with 2 args. only alert if mismatching call exists)
def meth1(self): pass # $ Alert[py/inheritance/signature-mismatch] # Weak mismatch (base may be called with 2 args. only alert if mismatching call exists)
def meth2(self): pass # No alert (weak mismatch, but not called)
def meth3(self, x=1): pass # No alert (no mismatch - all base calls are valid for sub)
def meth4(self, x, y, z=1): pass # $Alert[py/inheritance/signature-mismatch] # sub min > base max (strong mismatch)
def meth4(self, x, y, z=1): pass # $ Alert[py/inheritance/signature-mismatch] # sub min > base max (strong mismatch)
def meth5(self, x, y=1): pass # $Alert[py/inheritance/signature-mismatch]
def meth5(self, x, y=1): pass # $ Alert[py/inheritance/signature-mismatch]
def meth6(self, x): pass # $Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with 3+ args)
def meth6(self, x): pass # $ Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with 3+ args)
def meth7(self, x, *ys): pass # $Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with 1 arg only)
def meth7(self, x, *ys): pass # $ Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with 1 arg only)
def meth8(self, x, z): pass # $Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with arg named y)
def meth8(self, x, z): pass # $ Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with arg named y)
def meth9(self, x, z): pass # No alert (never called with wrong keyword arg)
def meth10(self, x, **kwargs): pass # No alert (y is kw-only arg in base, calls that use it are valid for sub)
def meth11(self, x, z, **kwargs): pass # $MISSING:Alert[py/inheritance/signature-mismatch] # call using y kw-arg is invalid due to not specifying z, but this is not detected. Likely a fairly niche situation.
def meth11(self, x, z, **kwargs): pass # $ MISSING:Alert[py/inheritance/signature-mismatch] # call using y kw-arg is invalid due to not specifying z, but this is not detected. Likely a fairly niche situation.
def meth12(self): pass # $Alert[py/inheritance/signature-mismatch] # call including extra kwarg invalid
def meth12(self): pass # $ Alert[py/inheritance/signature-mismatch] # call including extra kwarg invalid
def meth13(self, /, y): pass # $Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with arg named x), however meth13 is incorrectly detected as having 2 minimum positional arguments, whereas x is kw-only; resulting in the witness call not being detected as a valid call to Base2.meth13.
def meth13(self, /, y): pass # $ Alert[py/inheritance/signature-mismatch] # weak mismatch (base may be called with arg named x), however meth13 is incorrectly detected as having 2 minimum positional arguments, whereas x is kw-only; resulting in the witness call not being detected as a valid call to Base2.meth13.

View File

@@ -3,10 +3,10 @@
def not_close1():
f1 = open("filename") # $ Alert # not closed on exception
f1.write("Error could occur")
f1.close()
f1.close()
def not_close2():
f2 = open("filename") # $ Alert
f2 = open("filename") # $ Alert
def closed3():
f3 = open("filename")
@@ -46,7 +46,7 @@ def closed7():
def not_closed8():
f8 = None
try:
f8 = open("filename") # $ MISSING:Alert # not closed on exception
f8 = open("filename") # $ MISSING:Alert # not closed on exception
f8.write("Error could occur")
finally:
if f8 is None: # We don't precisely consider this condition, so this result is MISSING. However, this seems uncommon.
@@ -88,7 +88,7 @@ def doesnt_raise(*args):
pass
def mostly_closed12():
f12 = open("filename")
f12 = open("filename")
try:
f12.write("IOError could occur")
f12.write("IOError could occur")
@@ -105,7 +105,7 @@ def opener_func2(name):
return t1
def not_closed13(name):
f13 = open(name) # $ Alert
f13 = open(name) # $ Alert
f13.write("Hello")
def may_not_be_closed14(name):
@@ -120,7 +120,7 @@ def closer2(t3):
closer1(t3)
def closed15():
f15 = opener_func2() # $ SPURIOUS:Alert
f15 = opener_func2() # $ SPURIOUS:Alert
closer2(f15) # We don't detect that this call closes the file, so this result is SPURIOUS.
@@ -151,11 +151,11 @@ def not_closed17():
#With statement will close the fp
def closed18(path):
try:
f18 = open(path)
f18 = open(path)
except IOError as ex:
print(ex)
raise ex
with f18:
with f18:
f18.read()
class Closed19(object):
@@ -245,7 +245,7 @@ def not_closed22(path):
f22.close()
def not_closed23(path):
f23 = open(path, "w") # $ Alert
f23 = open(path, "w") # $ Alert
wr = FileWrapper(f23)
def closed24(path):
@@ -253,11 +253,11 @@ def closed24(path):
try:
f24.write("hi")
except:
pass
pass
f24.close()
def closed25(path):
from django.http import FileResponse
from django.http import FileResponse
return FileResponse(open(path))
import os
@@ -266,13 +266,13 @@ def closed26(path):
os.close(fd)
def not_closed27(path):
fd = os.open(path, "w") # $Alert # not closed on exception
fd = os.open(path, "w") # $ Alert # not closed on exception
f27 = os.fdopen(fd, "w")
f27.write("hi")
f27.close()
def closed28(path):
fd = os.open(path, os.O_WRONLY)
fd = os.open(path, os.O_WRONLY)
f28 = os.fdopen(fd, "w")
try:
f28.write("hi")
@@ -282,9 +282,9 @@ def closed28(path):
def closed29(path):
# Due to an approximation in CFG reachability for performance, it is not detected that the `write` call that may raise occurs after the file has already been closed.
# We presume this case to be uncommon.
f28 = open(path) # $SPURIOUS:Alert # not closed on exception
f28 = open(path) # $ SPURIOUS:Alert # not closed on exception
f28.close()
f28.write("already closed")
f28.write("already closed")
# False positive in a previous implementation:
@@ -294,11 +294,11 @@ class NotWrapper:
fp.close()
def do_something(self):
pass
pass
def closed30(path):
# Combination of approximations resulted in this FP:
# - NotWrapper is treated as a wrapper class as a file handle is passed to it
# - NotWrapper is treated as a wrapper class as a file handle is passed to it
# - thing.do_something() is treated as a call that can raise an exception while a file is open
# - this call is treated as occurring after the open but not as being guarded by the with statement, as it is in the same basic block
# - - this behavior has been changed fixing the FP
@@ -306,11 +306,11 @@ def closed30(path):
with open(path) as fp: # No longer spurious alert here.
thing = NotWrapper(fp)
thing.do_something()
thing.do_something()
def closed31(path):
with open(path) as fp:
with open(path) as fp:
data = fp.readline()
data2 = fp.readline()
@@ -328,7 +328,7 @@ class Wrapper():
def closed32(path):
with open(path, "rb") as f: # No longer spurious alert here.
wrap = Wrapper(f)
# This resulted in an FP in a previous implementation,
# This resulted in an FP in a previous implementation,
# due to a check that an operation is lexically contained within a `with` block (with `expr.getParent*()`)
# not detecting this case.
return list(wrap.read())
return list(wrap.read())

View File

@@ -1,4 +1,4 @@
from flask import Flask, request, send_from_directory # $Source
from flask import Flask, request, send_from_directory # $ Source
app = Flask(__name__)
@@ -10,7 +10,7 @@ STATIC_DIR = "/server/static/"
def download_file():
filename = request.args.get('filename', '')
# ok since `send_from_directory` ensure this stays within `STATIC_DIR`
return send_from_directory(STATIC_DIR, filename) # $result=OK
return send_from_directory(STATIC_DIR, filename) # $ result=OK
# see https://flask.palletsprojects.com/en/1.1.x/api/#flask.send_from_directory

View File

@@ -37,7 +37,7 @@ def safe_path_normpath():
filename = request.args.get('filename', '')
npath = os.path.normpath(os.path.join(STATIC_DIR, filename))
if npath.startswith(STATIC_DIR):
f = open(npath) # $result=OK
f = open(npath) # $ result=OK
@app.route("/path5")
@@ -54,7 +54,7 @@ def safe_path_realpath():
filename = request.args.get('filename', '')
npath = os.path.realpath(os.path.join(STATIC_DIR, filename))
if npath.startswith(STATIC_DIR):
f = open(npath) # $result=OK
f = open(npath) # $ result=OK
@app.route("/path6")
@@ -71,7 +71,7 @@ def safe_path_abspath():
filename = request.args.get('filename', '')
npath = os.path.abspath(os.path.join(STATIC_DIR, filename))
if npath.startswith(STATIC_DIR):
f = open(npath) # $result=OK
f = open(npath) # $ result=OK
@app.route("/abspath_tricky")

View File

@@ -38,7 +38,7 @@ def normalize_then_check():
x = source()
y = normalize(x)
if y.startswith("subfolder/"):
open(y) # $result=OK
open(y) # $ result=OK
@app.route("/path")

View File

@@ -20,7 +20,7 @@ def normalize_then_check():
x = source()
y = normalize(x) # <--- this call...
if y.startswith("subfolder/"):
open(y) # $result=OK
open(y) # $ result=OK
@app.route("/path")
@@ -29,7 +29,7 @@ def normalize_check_normalize():
y = normalize(x) # (...or this call...)
if y.startswith("subfolder/"):
z = normalize(y) # <--- ...can jump to here, resulting in FP
open(z) # $result=OK
open(z) # $ result=OK
# The problem does not manifest if we simply define an alias
@@ -42,4 +42,4 @@ def normalize_check_normalize_alias():
y = normpath(x)
if y.startswith("subfolder/"):
z = normpath(y)
open(z) # $result=OK
open(z) # $ result=OK

View File

@@ -10,27 +10,27 @@ app = Flask(__name__)
def command_injection1():
files = request.args.get('files', '')
# Don't let files be `; rm -rf /`
os.system("ls " + files) # $result=BAD
os.system("ls " + files) # $ result=BAD
@app.route("/command2")
def command_injection2():
files = request.args.get('files', '')
# Don't let files be `; rm -rf /`
subprocess.Popen("ls " + files, shell=True) # $result=BAD
subprocess.Popen("ls " + files, shell=True) # $ result=BAD
@app.route("/command3")
def first_arg_injection():
cmd = request.args.get('cmd', '')
subprocess.Popen([cmd, "param1"]) # $result=BAD
subprocess.Popen([cmd, "param1"]) # $ result=BAD
@app.route("/other_cases")
def others():
files = request.args.get('files', '')
# Don't let files be `; rm -rf /`
os.popen("ls " + files) # $result=BAD
os.popen("ls " + files) # $ result=BAD
@app.route("/multiple")
@@ -38,8 +38,8 @@ def multiple():
command = request.args.get('command', '')
# We should mark flow to both calls here, which conflicts with removing flow out of
# a sink due to use-use flow.
os.system(command) # $result=BAD
os.system(command) # $result=BAD
os.system(command) # $ result=BAD
os.system(command) # $ result=BAD
@app.route("/not-into-sink-impl")
@@ -52,11 +52,11 @@ def not_into_sink_impl():
subprocess.call implementation: https://github.com/python/cpython/blob/fa7ce080175f65d678a7d5756c94f82887fc9803/Lib/subprocess.py#L341
"""
command = request.args.get('command', '')
os.system(command) # $result=BAD
os.popen(command) # $result=BAD
subprocess.call(command) # $result=BAD
subprocess.check_call(command) # $result=BAD
subprocess.run(command) # $result=BAD
os.system(command) # $ result=BAD
os.popen(command) # $ result=BAD
subprocess.call(command) # $ result=BAD
subprocess.check_call(command) # $ result=BAD
subprocess.run(command) # $ result=BAD
@app.route("/path-exists-not-sanitizer")
@@ -70,11 +70,11 @@ def path_exists_not_sanitizer():
"""
path = request.args.get('path', '')
if os.path.exists(path):
os.system("ls " + path) # $result=BAD
os.system("ls " + path) # $ result=BAD
@app.route("/restricted-characters")
def restricted_characters():
path = request.args.get('path', '')
if re.match(r'^[a-zA-Z0-9_-]+$', path):
os.system("ls " + path) # $SPURIOUS: result=BAD
os.system("ls " + path) # $ SPURIOUS: result=BAD

View File

@@ -1,4 +1,4 @@
import os
import os
def unsafe_setup(name):
os.system("ping " + name) # $result=OK - this is inside a setyp script, so it's fine.
os.system("ping " + name) # $ result=OK - this is inside a setyp script, so it's fine.

View File

@@ -2,52 +2,52 @@ import os
import subprocess
def unsafe_shell_one(name):
os.system("ping " + name) # $result=BAD
os.system("ping " + name) # $ result=BAD
# f-strings
os.system(f"ping {name}") # $result=BAD
os.system(f"ping {name}") # $ result=BAD
# array.join
os.system("ping " + " ".join(name)) # $result=BAD
os.system("ping " + " ".join(name)) # $ result=BAD
# array.join, with a list
os.system("ping " + " ".join([name])) # $result=BAD
os.system("ping " + " ".join([name])) # $ result=BAD
# format, using .format
os.system("ping {}".format(name)) # $result=BAD
os.system("ping {}".format(name)) # $ result=BAD
# format, using %
os.system("ping %s" % name) # $result=BAD
os.system("ping %s" % name) # $ result=BAD
os.system(name) # OK - seems intentional.
import fabric
def facbric_stuff (name):
def facbric_stuff (name):
fabric.api.run("ping " + name, shell=False) # OK
fabric.api.run("ping " + name, shell=True) # $result=BAD
fabric.api.run("ping " + name, shell=True) # $ result=BAD
def indirect(flag):
def indirect(flag):
fabric.api.run("ping " + name, shell=flag) # OK
indirect(False)
def subprocess_flag (name):
def subprocess_flag (name):
subprocess.run("ping " + name, shell=False) # OK - and nonsensical
subprocess.run("ping " + name, shell=True) # $result=BAD
subprocess.run("ping " + name, shell=True) # $ result=BAD
def indirect(flag, x):
subprocess.run("ping " + x, shell=flag) # $result=BAD
def indirect(flag, x):
subprocess.run("ping " + x, shell=flag) # $ result=BAD
indirect(True, name)
subprocess.Popen("ping " + name, shell=unknownValue) # OK - shell assumed to be False
def intentional(command):
os.system("fish -ic " + command) # $result=OK - intentional
def intentional(command):
os.system("fish -ic " + command) # $ result=OK - intentional
import shlex
def unsafe_shell_sanitized(name):
os.system("ping " + shlex.quote(name)) # $result=OK - sanitized
def unsafe_shell_sanitized(name):
os.system("ping " + shlex.quote(name)) # $ result=OK - sanitized

View File

@@ -5,8 +5,8 @@ app = Flask(__name__)
@app.route("/test")
def test():
resp = make_response()
resp.set_cookie("oauth", "value1") # $Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", secure=True) # $Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", httponly=True)
resp.set_cookie("oauth", "value2", samesite="Strict") # $Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", httponly=True, samesite="None")
resp.set_cookie("oauth", "value1") # $ Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", secure=True) # $ Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", httponly=True)
resp.set_cookie("oauth", "value2", samesite="Strict") # $ Alert[py/client-exposed-cookie]
resp.set_cookie("oauth", "value2", httponly=True, samesite="None")

View File

@@ -5,11 +5,10 @@ app = Flask(__name__)
@app.route("/test")
def test(oauth_cookie_name):
resp = make_response()
resp.set_cookie("password", "value1")
resp.set_cookie("authKey", "value2", samesite="Lax")
resp.set_cookie("session_id", "value2", samesite="None") # $Alert[py/samesite-none-cookie]
resp.set_cookie("oauth", "value2", secure=True, samesite="Strict")
resp.set_cookie("oauth", "value2", httponly=True, samesite="Strict")
resp.set_cookie(oauth_cookie_name, "value2", secure=True, samesite="None") # $Alert[py/samesite-none-cookie]
resp.set_cookie("password", "value1")
resp.set_cookie("authKey", "value2", samesite="Lax")
resp.set_cookie("session_id", "value2", samesite="None") # $ Alert[py/samesite-none-cookie]
resp.set_cookie("oauth", "value2", secure=True, samesite="Strict")
resp.set_cookie("oauth", "value2", httponly=True, samesite="Strict")
resp.set_cookie(oauth_cookie_name, "value2", secure=True, samesite="None") # $ Alert[py/samesite-none-cookie]
resp.set_cookie("not_sensitive", "value2", samesite="None")

View File

@@ -5,8 +5,8 @@ app = Flask(__name__)
@app.route("/test")
def test():
resp = make_response()
resp.set_cookie("authKey", "value1") # $Alert[py/insecure-cookie]
resp.set_cookie("authKey", "value2", secure=True)
resp.set_cookie("sessionID", "value2", httponly=True) # $Alert[py/insecure-cookie]
resp.set_cookie("password", "value2", samesite="Strict") # $Alert[py/insecure-cookie]
resp.set_cookie("notSensitive", "value3")
resp.set_cookie("authKey", "value1") # $ Alert[py/insecure-cookie]
resp.set_cookie("authKey", "value2", secure=True)
resp.set_cookie("sessionID", "value2", httponly=True) # $ Alert[py/insecure-cookie]
resp.set_cookie("password", "value2", samesite="Strict") # $ Alert[py/insecure-cookie]
resp.set_cookie("notSensitive", "value3")

View File

@@ -2,12 +2,12 @@
def bad1():
results = []
for x in range(10):
def inner(): # $capturedVar=x
def inner(): # $ capturedVar=x
return x
results.append(inner)
return results
a = [lambda: i for i in range(1, 4)] # $capturedVar=i
a = [lambda: i for i in range(1, 4)] # $ capturedVar=i
for f in a:
print(f())
@@ -46,30 +46,30 @@ def ok1():
result += inner()
return result
b = [lambda: i for i in range(1, 4) for j in range(1,5)] # $capturedVar=i
c = [lambda: j for i in range(1, 4) for j in range(1,5)] # $capturedVar=j
b = [lambda: i for i in range(1, 4) for j in range(1,5)] # $ capturedVar=i
c = [lambda: j for i in range(1, 4) for j in range(1,5)] # $ capturedVar=j
s = {lambda: i for i in range(1, 4)} # $capturedVar=i
s = {lambda: i for i in range(1, 4)} # $ capturedVar=i
for f in s:
print(f())
d = {i:lambda: i for i in range(1, 4)} # $capturedVar=i
d = {i:lambda: i for i in range(1, 4)} # $ capturedVar=i
for k, f in d.items():
print(k, f())
#Generator expressions are sometimes OK, if they evaluate the iteration
#Generator expressions are sometimes OK, if they evaluate the iteration
#When the captured variable is used.
#So technically this is a false positive, but it is extremely fragile
#code, so I (Mark) think it is fine to report it as a violation.
g = (lambda: i for i in range(1, 4)) # $capturedVar=i
g = (lambda: i for i in range(1, 4)) # $ capturedVar=i
for f in g:
print(f())
#But not if evaluated eagerly
l = list(lambda: i for i in range(1, 4)) # $capturedVar=i
l = list(lambda: i for i in range(1, 4)) # $ capturedVar=i
for f in l:
print(f())
# This result is MISSING since the lambda is not detected to escape the loop
def odasa4860(asset_ids):
return dict((asset_id, filter(lambda c : c.asset_id == asset_id, xxx)) for asset_id in asset_ids) # $MISSING: capturedVar=asset_id
return dict((asset_id, filter(lambda c : c.asset_id == asset_id, xxx)) for asset_id in asset_ids) # $ MISSING: capturedVar=asset_id