Merge branch 'main' of github.com:github/codeql into python-dataflow-unpacking-assignment

This commit is contained in:
Rasmus Lerchedahl Petersen
2021-01-20 19:27:34 +01:00
1103 changed files with 92371 additions and 36094 deletions

View File

@@ -1,4 +1,5 @@
uniqueEnclosingCallable
| test.py:239:27:239:27 | ControlFlowNode for p | Node should have one enclosing callable but has 0. |
uniqueType
uniqueNodeLocation
missingLocation

View File

@@ -235,3 +235,17 @@ def non_const_eq_preserves_taint(x):
SINK(tainted) # unsafe
if tainted == x:
SINK(tainted) # unsafe
def overflowCallee(*args, p="", **kwargs):
print("args", args)
print("p", p)
print("kwargs", kwargs)
def synth_arg_posOverflow():
overflowCallee(42)
def synth_arg_kwOverflow():
overflowCallee(foo=42)
def synth_arg_kwUnpacked():
overflowCallee(**{"p": "42"})

View File

@@ -1,27 +1,23 @@
| test1.py:1:8:1:12 | ControlFlowNode for ImportExpr | mypkg |
| test1.py:1:8:1:12 | GSSA Variable mypkg | mypkg |
| test2.py:1:6:1:10 | ControlFlowNode for ImportExpr | mypkg |
| test2.py:1:6:1:10 | ControlFlowNode for ImportExpr | mypkg |
| test2.py:1:19:1:21 | GSSA Variable foo | mypkg.foo |
| test2.py:1:24:1:26 | GSSA Variable bar | mypkg.bar |
| test2.py:1:19:1:21 | ControlFlowNode for ImportMember | mypkg.foo |
| test2.py:1:24:1:26 | ControlFlowNode for ImportMember | mypkg.bar |
| test3.py:1:8:1:16 | ControlFlowNode for ImportExpr | mypkg |
| test3.py:1:8:1:16 | ControlFlowNode for ImportExpr | mypkg.foo |
| test3.py:2:8:2:16 | ControlFlowNode for ImportExpr | mypkg |
| test3.py:2:8:2:16 | ControlFlowNode for ImportExpr | mypkg.bar |
| test3.py:2:8:2:16 | GSSA Variable mypkg | mypkg |
| test4.py:1:8:1:16 | ControlFlowNode for ImportExpr | mypkg.foo |
| test4.py:1:21:1:24 | GSSA Variable _foo | mypkg.foo |
| test4.py:2:8:2:16 | ControlFlowNode for ImportExpr | mypkg.bar |
| test4.py:2:21:2:24 | GSSA Variable _bar | mypkg.bar |
| test5.py:1:8:1:12 | ControlFlowNode for ImportExpr | mypkg |
| test5.py:1:8:1:12 | GSSA Variable mypkg | mypkg |
| test5.py:9:6:9:10 | ControlFlowNode for ImportExpr | mypkg |
| test5.py:9:26:9:29 | GSSA Variable _bar | mypkg.bar |
| test5.py:9:19:9:29 | ControlFlowNode for ImportMember | mypkg.bar |
| test6.py:1:8:1:12 | ControlFlowNode for ImportExpr | mypkg |
| test6.py:1:8:1:12 | GSSA Variable mypkg | mypkg |
| test6.py:5:8:5:16 | ControlFlowNode for ImportExpr | mypkg |
| test6.py:5:8:5:16 | ControlFlowNode for ImportExpr | mypkg.foo |
| test6.py:5:8:5:16 | GSSA Variable mypkg | mypkg |
| test7.py:1:6:1:10 | ControlFlowNode for ImportExpr | mypkg |
| test7.py:1:19:1:21 | GSSA Variable foo | mypkg.foo |
| test7.py:1:19:1:21 | ControlFlowNode for ImportMember | mypkg.foo |
| test7.py:5:8:5:16 | ControlFlowNode for ImportExpr | mypkg |
| test7.py:5:8:5:16 | ControlFlowNode for ImportExpr | mypkg.foo |
| test7.py:5:8:5:16 | GSSA Variable mypkg | mypkg |
| test7.py:9:6:9:10 | ControlFlowNode for ImportExpr | mypkg |
| test7.py:9:19:9:21 | GSSA Variable foo | mypkg.foo |
| test7.py:9:19:9:21 | ControlFlowNode for ImportMember | mypkg.foo |

View File

@@ -0,0 +1,27 @@
| test_string_const_compare.py:16 | ok | test_eq | ts |
| test_string_const_compare.py:18 | ok | test_eq | ts |
| test_string_const_compare.py:20 | ok | test_eq | ts |
| test_string_const_compare.py:27 | ok | test_eq_unsafe | ts |
| test_string_const_compare.py:29 | ok | test_eq_unsafe | ts |
| test_string_const_compare.py:35 | fail | test_eq_with_or | ts |
| test_string_const_compare.py:37 | ok | test_eq_with_or | ts |
| test_string_const_compare.py:43 | ok | test_non_eq1 | ts |
| test_string_const_compare.py:45 | ok | test_non_eq1 | ts |
| test_string_const_compare.py:51 | ok | test_non_eq2 | ts |
| test_string_const_compare.py:53 | fail | test_non_eq2 | ts |
| test_string_const_compare.py:59 | ok | test_in_list | ts |
| test_string_const_compare.py:61 | ok | test_in_list | ts |
| test_string_const_compare.py:67 | ok | test_in_tuple | ts |
| test_string_const_compare.py:69 | ok | test_in_tuple | ts |
| test_string_const_compare.py:75 | ok | test_in_set | ts |
| test_string_const_compare.py:77 | ok | test_in_set | ts |
| test_string_const_compare.py:83 | ok | test_in_unsafe1 | ts |
| test_string_const_compare.py:85 | ok | test_in_unsafe1 | ts |
| test_string_const_compare.py:91 | ok | test_in_unsafe2 | ts |
| test_string_const_compare.py:93 | ok | test_in_unsafe2 | ts |
| test_string_const_compare.py:99 | ok | test_not_in1 | ts |
| test_string_const_compare.py:101 | ok | test_not_in1 | ts |
| test_string_const_compare.py:107 | ok | test_not_in2 | ts |
| test_string_const_compare.py:109 | fail | test_not_in2 | ts |
| test_string_const_compare.py:119 | fail | test_eq_thorugh_func | ts |
| test_string_const_compare.py:121 | ok | test_eq_thorugh_func | ts |

View File

@@ -0,0 +1,8 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.BarrierGuards
class CustomSanitizerOverrides extends TestTaintTrackingConfiguration {
override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
guard instanceof StringConstCompare
}
}

View File

@@ -0,0 +1,138 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
def test_eq():
ts = TAINTED_STRING
if ts == "safe":
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
# ts should still be tainted after exiting the if block
ensure_tainted(ts)
def test_eq_unsafe(x="foo"):
"""This test-case might seem strange, but it was a FP in our old points-to based analysis."""
ts = TAINTED_STRING
if ts == ts:
ensure_tainted(ts)
if ts == x:
ensure_tainted(ts)
def test_eq_with_or():
ts = TAINTED_STRING
if ts == "safe" or ts == "also_safe":
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
def test_non_eq1():
ts = TAINTED_STRING
if ts != "safe":
ensure_tainted(ts)
else:
ensure_not_tainted(ts)
def test_non_eq2():
ts = TAINTED_STRING
if not ts == "safe":
ensure_tainted(ts)
else:
ensure_not_tainted(ts)
def test_in_list():
ts = TAINTED_STRING
if ts in ["safe", "also_safe"]:
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
def test_in_tuple():
ts = TAINTED_STRING
if ts in ("safe", "also_safe"):
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
def test_in_set():
ts = TAINTED_STRING
if ts in {"safe", "also_safe"}:
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
def test_in_unsafe1(xs):
ts = TAINTED_STRING
if ts in xs:
ensure_tainted(ts)
else:
ensure_tainted(ts)
def test_in_unsafe2(x):
ts = TAINTED_STRING
if ts in ["safe", x]:
ensure_tainted(ts)
else:
ensure_tainted(ts)
def test_not_in1():
ts = TAINTED_STRING
if ts not in ["safe", "also_safe"]:
ensure_tainted(ts)
else:
ensure_not_tainted(ts)
def test_not_in2():
ts = TAINTED_STRING
if not ts in ["safe", "also_safe"]:
ensure_tainted(ts)
else:
ensure_not_tainted(ts)
def is_safe(x):
return x == "safe"
def test_eq_thorugh_func():
ts = TAINTED_STRING
if is_safe(ts):
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
# Make tests runable
test_eq()
test_eq_unsafe()
test_eq_with_or()
test_non_eq1()
test_non_eq2()
test_in_list()
test_in_tuple()
test_in_set()
test_in_unsafe1(["unsafe", "foo"])
test_in_unsafe2("unsafe")
test_not_in1()
test_not_in2()
test_eq_thorugh_func()

View File

@@ -4,8 +4,58 @@ test_taint
| test.py:38 | ok | test_custom_sanitizer_guard | s |
| test.py:40 | ok | test_custom_sanitizer_guard | s |
| test.py:51 | ok | test_escape | s2 |
| test_logical.py:30 | ok | test_basic | s |
| test_logical.py:32 | ok | test_basic | s |
| test_logical.py:35 | ok | test_basic | s |
| test_logical.py:37 | fail | test_basic | s |
| test_logical.py:45 | ok | test_or | s |
| test_logical.py:47 | ok | test_or | s |
| test_logical.py:51 | ok | test_or | s |
| test_logical.py:53 | ok | test_or | s |
| test_logical.py:57 | ok | test_or | s |
| test_logical.py:59 | ok | test_or | s |
| test_logical.py:67 | ok | test_and | s |
| test_logical.py:69 | ok | test_and | s |
| test_logical.py:73 | ok | test_and | s |
| test_logical.py:75 | ok | test_and | s |
| test_logical.py:79 | ok | test_and | s |
| test_logical.py:81 | fail | test_and | s |
| test_logical.py:89 | fail | test_tricky | s |
| test_logical.py:93 | fail | test_tricky | s_ |
| test_logical.py:100 | fail | test_nesting_not | s |
| test_logical.py:102 | ok | test_nesting_not | s |
| test_logical.py:105 | ok | test_nesting_not | s |
| test_logical.py:107 | fail | test_nesting_not | s |
| test_logical.py:116 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:118 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:121 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:123 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:126 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:128 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:137 | fail | test_with_return | s |
| test_logical.py:146 | fail | test_with_exception | s |
| test_reference.py:31 | fail | test_basic | s2 |
| test_reference.py:31 | ok | test_basic | s |
| test_reference.py:33 | ok | test_basic | s |
| test_reference.py:33 | ok | test_basic | s2 |
| test_reference.py:41 | fail | test_identical_call | s.strip() |
| test_reference.py:43 | ok | test_identical_call | s.strip() |
| test_reference.py:56 | fail | test_class_attribute_access | c.foo |
| test_reference.py:58 | ok | test_class_attribute_access | c.foo |
isSanitizer
| TestTaintTrackingConfiguration | test.py:21:39:21:39 | ControlFlowNode for s |
| TestTaintTrackingConfiguration | test.py:50:10:50:29 | ControlFlowNode for emulated_escaping() |
isSanitizerGuard
| TestTaintTrackingConfiguration | test.py:35:8:35:26 | ControlFlowNode for emulated_is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:29:8:29:17 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:44:8:44:17 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:50:12:50:21 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:66:8:66:17 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:72:12:72:21 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:92:8:92:17 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:115:12:115:21 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:120:16:120:25 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_logical.py:125:20:125:29 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_reference.py:30:8:30:17 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_reference.py:40:8:40:25 | ControlFlowNode for is_safe() |
| TestTaintTrackingConfiguration | test_reference.py:55:8:55:21 | ControlFlowNode for is_safe() |

View File

@@ -1,7 +1,9 @@
import experimental.dataflow.tainttracking.TestTaintLib
class IsSafeCheck extends DataFlow::BarrierGuard {
IsSafeCheck() { this.(CallNode).getNode().getFunc().(Name).getId() = "emulated_is_safe" }
IsSafeCheck() {
this.(CallNode).getNode().getFunc().(Name).getId() in ["is_safe", "emulated_is_safe"]
}
override predicate checks(ControlFlowNode node, boolean branch) {
node = this.(CallNode).getAnArg() and

View File

@@ -128,6 +128,23 @@ def test_nesting_not_with_and_true():
ensure_not_tainted(s)
def test_with_return():
s = TAINTED_STRING
if not is_safe(s):
return
ensure_not_tainted(s)
def test_with_exception():
s = TAINTED_STRING
if not is_safe(s):
raise Exception("unsafe")
ensure_not_tainted(s)
# Make tests runable
test_basic()
@@ -136,3 +153,8 @@ test_and()
test_tricky()
test_nesting_not()
test_nesting_not_with_and_true()
test_with_return()
try:
test_with_exception()
except:
pass

View File

@@ -0,0 +1,65 @@
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
"""Testing logical constructs not/and/or works out of the box.
"""
import random
def random_choice():
return bool(random.randint(0, 1))
def is_safe(arg):
return arg == "safe"
def test_basic():
s = TAINTED_STRING
s2 = s
if is_safe(s):
ensure_not_tainted(s, s2)
else:
ensure_tainted(s, s2)
def test_identical_call():
"""This code pattern is being used in real world code"""
s = TAINTED_STRING
if is_safe(s.strip()):
ensure_not_tainted(s.strip())
else:
ensure_tainted(s.strip())
class C(object):
def __init__(self, value):
self.foo = value
def test_class_attribute_access():
s = TAINTED_STRING
c = C(s)
if is_safe(c.foo):
ensure_not_tainted(c.foo)
else:
ensure_tainted(c.foo)
# Make tests runable
test_basic()
test_identical_call()
test_class_attribute_access()

View File

@@ -68,9 +68,9 @@
| test_json.py:27 | ok | test | json.loads(..) |
| test_json.py:34 | fail | test | tainted_filelike |
| test_json.py:35 | fail | test | json.load(..) |
| test_json.py:48 | fail | non_syntacical | dumps(..) |
| test_json.py:49 | fail | non_syntacical | dumps_alias(..) |
| test_json.py:50 | fail | non_syntacical | loads(..) |
| test_json.py:48 | ok | non_syntacical | dumps(..) |
| test_json.py:49 | ok | non_syntacical | dumps_alias(..) |
| test_json.py:50 | ok | non_syntacical | loads(..) |
| test_json.py:57 | fail | non_syntacical | tainted_filelike |
| test_json.py:58 | fail | non_syntacical | load(..) |
| test_string.py:25 | ok | str_operations | ts |

View File

@@ -1,37 +0,0 @@
| test_logical.py:30 | fail | test_basic | s |
| test_logical.py:32 | ok | test_basic | s |
| test_logical.py:35 | ok | test_basic | s |
| test_logical.py:37 | fail | test_basic | s |
| test_logical.py:45 | ok | test_or | s |
| test_logical.py:47 | ok | test_or | s |
| test_logical.py:51 | ok | test_or | s |
| test_logical.py:53 | ok | test_or | s |
| test_logical.py:57 | ok | test_or | s |
| test_logical.py:59 | ok | test_or | s |
| test_logical.py:67 | fail | test_and | s |
| test_logical.py:69 | ok | test_and | s |
| test_logical.py:73 | ok | test_and | s |
| test_logical.py:75 | fail | test_and | s |
| test_logical.py:79 | ok | test_and | s |
| test_logical.py:81 | fail | test_and | s |
| test_logical.py:89 | fail | test_tricky | s |
| test_logical.py:93 | fail | test_tricky | s_ |
| test_logical.py:100 | fail | test_nesting_not | s |
| test_logical.py:102 | ok | test_nesting_not | s |
| test_logical.py:105 | ok | test_nesting_not | s |
| test_logical.py:107 | fail | test_nesting_not | s |
| test_logical.py:116 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:118 | fail | test_nesting_not_with_and_true | s |
| test_logical.py:121 | fail | test_nesting_not_with_and_true | s |
| test_logical.py:123 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:126 | ok | test_nesting_not_with_and_true | s |
| test_logical.py:128 | fail | test_nesting_not_with_and_true | s |
| test_string_eq.py:16 | fail | const_eq_clears_taint | ts |
| test_string_eq.py:18 | ok | const_eq_clears_taint | ts |
| test_string_eq.py:20 | ok | const_eq_clears_taint | ts |
| test_string_eq.py:27 | fail | const_eq_clears_taint2 | ts |
| test_string_eq.py:33 | ok | non_const_eq_preserves_taint | ts |
| test_string_eq.py:35 | ok | non_const_eq_preserves_taint | ts |
| test_string_eq.py:45 | fail | const_eq_through_func | ts |
| test_string_eq.py:47 | ok | const_eq_through_func | ts |
| test_string_eq.py:49 | ok | const_eq_through_func | ts |

View File

@@ -1 +0,0 @@
import experimental.dataflow.tainttracking.TestTaintLib

View File

@@ -1,56 +0,0 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
def const_eq_clears_taint():
ts = TAINTED_STRING
if ts == "safe":
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
# ts should still be tainted after exiting the if block
ensure_tainted(ts)
def const_eq_clears_taint2():
ts = TAINTED_STRING
if ts != "safe":
return
ensure_not_tainted(ts)
def non_const_eq_preserves_taint(x="foo"):
ts = TAINTED_STRING
if ts == ts:
ensure_tainted(ts)
if ts == x:
ensure_tainted(ts)
def is_safe(x):
return x == "safe"
def const_eq_through_func():
ts = TAINTED_STRING
if is_safe(ts):
ensure_not_tainted(ts)
else:
ensure_tainted(ts)
# ts should still be tainted after exiting the if block
ensure_tainted(ts)
# Make tests runable
const_eq_clears_taint()
const_eq_clears_taint2()
non_const_eq_preserves_taint()

View File

@@ -4,19 +4,19 @@ from django.http.response import HttpResponse
from django.views.generic import View
def url_match_xss(request, foo, bar, no_taint=None): # $routeHandler routedParameter=foo routedParameter=bar
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def get_params_xss(request): # $routeHandler
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $routeHandler
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $routeHandler
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
return rsp
@@ -26,22 +26,22 @@ class Foo(object):
# Note: since Foo is used as the super type in a class view, it will be able to handle requests.
def post(self, request, untrusted): # $ MISSING: routeHandler routedParameter=untrusted
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ MISSING: routeHandler routedParameter=untrusted
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
def show_articles(request, page_number=1): # $routeHandler routedParameter=page_number
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $routeHandler routedParameter=arg0 routedParameter=arg1
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
@@ -62,7 +62,7 @@ urlpatterns = [
################################################################################
# Using patterns() for routing
def show_user(request, username): # $routeHandler routedParameter=username
def show_user(request, username): # $requestHandler routedParameter=username
return HttpResponse('show_user {}'.format(username)) # $HttpResponse
@@ -71,7 +71,7 @@ urlpatterns = patterns(url(r"^users/(?P<username>[^/]+)", show_user)) # $routeS
################################################################################
# Show we understand the keyword arguments to django.conf.urls.url
def kw_args(request): # $routeHandler
def kw_args(request): # $requestHandler
return HttpResponse('kw_args') # $HttpResponse
urlpatterns = [

View File

@@ -4,19 +4,19 @@ from django.http import HttpResponse, HttpResponseRedirect, JsonResponse, HttpRe
from django.views import View
def url_match_xss(request, foo, bar, no_taint=None): # $routeHandler routedParameter=foo routedParameter=bar
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def get_params_xss(request): # $routeHandler
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $routeHandler
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $routeHandler
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
return rsp
@@ -26,22 +26,22 @@ class Foo(object):
# Note: since Foo is used as the super type in a class view, it will be able to handle requests.
def post(self, request, untrusted): # $ MISSING: routeHandler routedParameter=untrusted
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ MISSING: routeHandler routedParameter=untrusted
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
def show_articles(request, page_number=1): # $routeHandler routedParameter=page_number
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $routeHandler routedParameter=arg0 routedParameter=arg1
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
@@ -62,7 +62,7 @@ urlpatterns = [
# Show we understand the keyword arguments to django.urls.re_path
def re_path_kwargs(request): # $routeHandler
def re_path_kwargs(request): # $requestHandler
return HttpResponse('re_path_kwargs') # $HttpResponse
@@ -75,16 +75,16 @@ urlpatterns = [
################################################################################
# saying page_number is an externally controlled *string* is a bit strange, when we have an int converter :O
def page_number(request, page_number=1): # $routeHandler routedParameter=page_number
def page_number(request, page_number=1): # $requestHandler routedParameter=page_number
return HttpResponse('page_number: {}'.format(page_number)) # $HttpResponse
def foo_bar_baz(request, foo, bar, baz): # $routeHandler routedParameter=foo routedParameter=bar routedParameter=baz
def foo_bar_baz(request, foo, bar, baz): # $requestHandler routedParameter=foo routedParameter=bar routedParameter=baz
return HttpResponse('foo_bar_baz: {} {} {}'.format(foo, bar, baz)) # $HttpResponse
def path_kwargs(request, foo, bar): # $routeHandler routedParameter=foo routedParameter=bar
def path_kwargs(request, foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('path_kwargs: {} {} {}'.format(foo, bar)) # $HttpResponse
def not_valid_identifier(request): # $routeHandler
def not_valid_identifier(request): # $requestHandler
return HttpResponse('<foo!>') # $HttpResponse
urlpatterns = [
@@ -101,9 +101,17 @@ urlpatterns = [
# This version 1.x way of defining urls is deprecated in Django 3.1, but still works
from django.conf.urls import url
def deprecated(request): # $routeHandler
def deprecated(request): # $requestHandler
return HttpResponse('deprecated') # $HttpResponse
urlpatterns = [
url(r"^deprecated/", deprecated), # $routeSetup="^deprecated/"
]
class PossiblyNotRouted(View):
# Even if our analysis can't find a route-setup for this class, we should still
# consider it to be a handle incoming HTTP requests
def get(self, request, possibly_not_routed=42): # $ requestHandler routedParameter=possibly_not_routed
return HttpResponse('PossiblyNotRouted get: {}'.format(possibly_not_routed)) # $HttpResponse

View File

@@ -3,7 +3,7 @@ from django.urls import path
from django.http import HttpRequest
def test_taint(request: HttpRequest, foo, bar, baz=None): # $routeHandler routedParameter=foo routedParameter=bar
def test_taint(request: HttpRequest, foo, bar, baz=None): # $requestHandler routedParameter=foo routedParameter=bar
ensure_tainted(foo, bar)
ensure_not_tainted(baz)

View File

@@ -12,4 +12,7 @@ urlpatterns = [
# line)
re_path(r"^ba[rz]/", views.bar_baz), # $routeSetup="^ba[rz]/"
url(r"^deprecated/", views.deprecated), # $routeSetup="^deprecated/"
path("basic-view-handler/", views.MyBasicViewHandler.as_view()), # $routeSetup="basic-view-handler/"
path("custom-inheritance-view-handler/", views.MyViewHandlerWithCustomInheritance.as_view()), # $routeSetup="custom-inheritance-view-handler/"
]

View File

@@ -1,10 +1,33 @@
from django.http import HttpRequest, HttpResponse
from django.views import View
from django.views.decorators.csrf import csrf_exempt
def foo(request: HttpRequest): # $routeHandler
def foo(request: HttpRequest): # $requestHandler
return HttpResponse("foo") # $HttpResponse
def bar_baz(request: HttpRequest): # $routeHandler
def bar_baz(request: HttpRequest): # $requestHandler
return HttpResponse("bar_baz") # $HttpResponse
def deprecated(request: HttpRequest): # $routeHandler
def deprecated(request: HttpRequest): # $requestHandler
return HttpResponse("deprecated") # $HttpResponse
class MyBasicViewHandler(View):
def get(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyViewHandler: GET") # $ HttpResponse
def post(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyViewHandler: POST") # $ HttpResponse
class MyCustomViewBaseClass(View):
def post(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyCustomViewBaseClass: POST") # $ HttpResponse
class MyViewHandlerWithCustomInheritance(MyCustomViewBaseClass):
def get(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyViewHandlerWithCustomInheritance: GET") # $ HttpResponse

View File

@@ -44,7 +44,7 @@ MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
# 'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',

View File

@@ -4,14 +4,14 @@ from flask import Flask, request, make_response
app = Flask(__name__)
@app.route("/") # $routeSetup="/"
def hello_world(): # $routeHandler
def hello_world(): # $requestHandler
return "Hello World!" # $HttpResponse
from flask.views import MethodView
class MyView(MethodView):
def get(self, user_id): # $ MISSING: routeHandler
def get(self, user_id): # $ requestHandler
if user_id is None:
# return a list of users
pass
@@ -25,42 +25,42 @@ app.add_url_rule('/the/', defaults={'user_id': None}, # $routeSetup="/the/"
view_func=the_view, methods=['GET',])
@app.route("/dangerous") # $routeSetup="/dangerous"
def dangerous(): # $routeHandler
def dangerous(): # $requestHandler
return request.args.get('payload') # $HttpResponse
@app.route("/dangerous-with-cfg-split") # $routeSetup="/dangerous-with-cfg-split"
def dangerous2(): # $routeHandler
def dangerous2(): # $requestHandler
x = request.form['param0']
if request.method == "POST":
return request.form['param1'] # $HttpResponse
return None # $ SPURIOUS: HttpResponse
@app.route("/unsafe") # $routeSetup="/unsafe"
def unsafe(): # $routeHandler
def unsafe(): # $requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + first_name) # $HttpResponse
@app.route("/safe") # $routeSetup="/safe"
def safe(): # $routeHandler
def safe(): # $requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + escape(first_name)) # $HttpResponse
@app.route("/hello/<name>") # $routeSetup="/hello/<name>"
def hello(name): # $routeHandler routedParameter=name
def hello(name): # $requestHandler routedParameter=name
return make_response("Your name is " + name) # $HttpResponse
@app.route("/foo/<path:subpath>") # $routeSetup="/foo/<path:subpath>"
def foo(subpath): # $routeHandler routedParameter=subpath
def foo(subpath): # $requestHandler routedParameter=subpath
return make_response("The subpath is " + subpath) # $HttpResponse
@app.route("/multiple/") # $routeSetup="/multiple/"
@app.route("/multiple/foo/<foo>") # $routeSetup="/multiple/foo/<foo>"
@app.route("/multiple/bar/<bar>") # $routeSetup="/multiple/bar/<bar>"
def multiple(foo=None, bar=None): # $routeHandler routedParameter=foo routedParameter=bar
def multiple(foo=None, bar=None): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("foo={!r} bar={!r}".format(foo, bar)) # $HttpResponse
@app.route("/complex/<string(length=2):lang_code>") # $routeSetup="/complex/<string(length=2):lang_code>"
def complex(lang_code): # $routeHandler routedParameter=lang_code
def complex(lang_code): # $requestHandler routedParameter=lang_code
return make_response("lang_code {}".format(lang_code)) # $HttpResponse
if __name__ == "__main__":

View File

@@ -6,12 +6,12 @@ app = Flask(__name__)
@app.route("/html1") # $routeSetup="/html1"
def html1(): # $routeHandler
def html1(): # $requestHandler
return "<h1>hello</h1>" # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html2") # $routeSetup="/html2"
def html2(): # $routeHandler
def html2(): # $requestHandler
# note that response saved in a variable intentionally -- we wan the annotations to
# show that we recognize the response creation, and not the return (hopefully). (and
# do the same in the following of the file)
@@ -20,7 +20,7 @@ def html2(): # $routeHandler
@app.route("/html3") # $routeSetup="/html3"
def html3(): # $routeHandler
def html3(): # $requestHandler
resp = app.make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -30,13 +30,13 @@ def html3(): # $routeHandler
@app.route("/html4") # $routeSetup="/html4"
def html4(): # $routeHandler
def html4(): # $requestHandler
resp = Response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html5") # $routeSetup="/html5"
def html5(): # $routeHandler
def html5(): # $requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@@ -44,7 +44,7 @@ def html5(): # $routeHandler
@app.route("/html6") # $routeSetup="/html6"
def html6(): # $routeHandler
def html6(): # $requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@@ -52,14 +52,14 @@ def html6(): # $routeHandler
@app.route("/html7") # $routeSetup="/html7"
def html7(): # $routeHandler
def html7(): # $requestHandler
resp = make_response() # $HttpResponse mimetype=text/html
resp.set_data("<h1>hello</h1>") # $ MISSING: responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/jsonify") # $routeSetup="/jsonify"
def jsonify_route(): # $routeHandler
def jsonify_route(): # $requestHandler
data = {"foo": "bar"}
resp = jsonify(data) # $ MISSING: HttpResponse mimetype=application/json responseBody=data
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -69,7 +69,7 @@ def jsonify_route(): # $routeHandler
################################################################################
@app.route("/tricky-return1") # $routeSetup="/tricky-return1"
def tricky_return1(): # $routeHandler
def tricky_return1(): # $requestHandler
if "raw" in request.args:
resp = "<h1>hellu</h1>"
else:
@@ -83,7 +83,7 @@ def helper():
return make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/tricky-return2") # $routeSetup="/tricky-return2"
def tricky_return2(): # $routeHandler
def tricky_return2(): # $requestHandler
resp = helper()
return resp # $HttpResponse mimetype=text/html responseBody=resp
@@ -94,14 +94,14 @@ def tricky_return2(): # $routeHandler
@app.route("/content-type/response-modification1") # $routeSetup="/content-type/response-modification1"
def response_modification1(): # $routeHandler
def response_modification1(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.content_type = "text/plain" # $ MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/response-modification2") # $routeSetup="/content-type/response-modification2"
def response_modification2(): # $routeHandler
def response_modification2(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.headers["content-type"] = "text/plain" # $ MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@@ -112,33 +112,33 @@ def response_modification2(): # $routeHandler
@app.route("/content-type/Response1") # $routeSetup="/content-type/Response1"
def Response1(): # $routeHandler
def Response1(): # $requestHandler
resp = Response("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response2") # $routeSetup="/content-type/Response2"
def Response2(): # $routeHandler
def Response2(): # $requestHandler
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response3") # $routeSetup="/content-type/Response3"
def Response3(): # $routeHandler
def Response3(): # $requestHandler
# content_type argument takes priority (and result is text/plain)
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8", mimetype="text/html") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response4") # $routeSetup="/content-type/Response4"
def Response4(): # $routeHandler
def Response4(): # $requestHandler
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/plain"}) # $HttpResponse responseBody="<h1>hello</h1>" SPURIOUS: mimetype=text/html MISSING: mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response5") # $routeSetup="/content-type/Response5"
def Response5(): # $routeHandler
def Response5(): # $requestHandler
# content_type argument takes priority (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, content_type="text/plain; charset=utf-8") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
@@ -146,7 +146,7 @@ def Response5(): # $routeHandler
@app.route("/content-type/Response6") # $routeSetup="/content-type/Response6"
def Response6(): # $routeHandler
def Response6(): # $requestHandler
# mimetype argument takes priority over header (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
@@ -154,7 +154,7 @@ def Response6(): # $routeHandler
@app.route("/content-type/Flask-response-class") # $routeSetup="/content-type/Flask-response-class"
def Flask_response_class(): # $routeHandler
def Flask_response_class(): # $requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
@@ -162,7 +162,7 @@ def Flask_response_class(): # $routeHandler
@app.route("/content-type/app-response-class") # $routeSetup="/content-type/app-response-class"
def app_response_class(): # $routeHandler
def app_response_class(): # $requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"

View File

@@ -6,25 +6,91 @@ app = Flask(__name__)
SOME_ROUTE = "/some/route"
@app.route(SOME_ROUTE) # $routeSetup="/some/route"
def some_route(): # $routeHandler
def some_route(): # $requestHandler
return make_response("some_route") # $HttpResponse
def index(): # $routeHandler
def index(): # $requestHandler
return make_response("index") # $HttpResponse
app.add_url_rule('/index', 'index', index) # $routeSetup="/index"
# We don't support this yet, and I think that's OK
def later_set(): # $ MISSING: routeHandler
def later_set(): # $ MISSING: requestHandler
return make_response("later_set") # $HttpResponse
app.add_url_rule('/later-set', 'later_set', view_func=None) # $routeSetup="/later-set"
app.view_functions['later_set'] = later_set
# We don't want to execute this at runtime (since program will crash). Just using
# `False` makes our analysis skip it, so here's a workaround :D
if __file__ == "False":
@app.route(UNKNOWN_ROUTE) # $routeSetup
def unkown_route(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("unkown_route") # $HttpResponse
@app.route(UNKNOWN_ROUTE) # $routeSetup
def unkown_route(foo, bar): # $routeHandler routedParameter=foo routedParameter=bar
return make_response("unkown_route") # $HttpResponse
# View
#
# see https://flask.palletsprojects.com/en/1.1.x/views/#basic-principle
from flask.views import View
class ShowUser(View):
def dispatch_request(self, user_id): # $ requestHandler routedParameter=user_id
return "user_id: {}".format(user_id) # $ HttpResponse
app.add_url_rule("/basic/user/<int:user_id>", view_func=ShowUser.as_view('show_user')) # $routeSetup="/basic/user/<int:user_id>"
class WithoutKnownRoute1(View):
# For handler without known route, treat all parameters as routed parameters
# (accepting that there might be a few FPs)
def dispatch_request(self, foo, not_routed=42): # $ requestHandler routedParameter=foo SPURIOUS: routedParameter=not_routed
pass
# MethodView
#
# see https://flask.palletsprojects.com/en/1.1.x/views/#method-views-for-apis
from flask.views import MethodView
class UserAPI(MethodView):
def get(self, user_id): # $ requestHandler routedParameter=user_id
if user_id is None:
# return a list of users
pass
else:
# expose a single user
pass
def post(self): # $ requestHandler
# create a new user
pass
def delete(self, user_id): # $ requestHandler routedParameter=user_id
# delete a single user
pass
def put(self, user_id): # $ requestHandler routedParameter=user_id
# update a single user
pass
user_view = UserAPI.as_view("user_api")
app.add_url_rule("/users/", defaults={"user_id": None}, view_func=user_view, methods=["GET",]) # $routeSetup="/users/"
app.add_url_rule("/users/", view_func=user_view, methods=["POST",]) # $routeSetup="/users/"
app.add_url_rule("/users/<int:user_id>", view_func=user_view, methods=["GET", "PUT", "DELETE"]) # $routeSetup="/users/<int:user_id>"
class WithoutKnownRoute2(MethodView):
# For handler without known route, treat all parameters as routed parameters
# (accepting that there might be a few FPs)
def get(self, foo, not_routed=42): # $ requestHandler routedParameter=foo SPURIOUS: routedParameter=not_routed
pass
if __name__ == "__main__":

View File

@@ -2,7 +2,7 @@ from flask import Flask, request
app = Flask(__name__)
@app.route("/test_taint/<name>/<int:number>") # $routeSetup="/test_taint/<name>/<int:number>"
def test_taint(name = "World!", number="0", foo="foo"): # $routeHandler routedParameter=name routedParameter=number
def test_taint(name = "World!", number="0", foo="foo"): # $requestHandler routedParameter=name routedParameter=number
ensure_tainted(name, number)
ensure_not_tainted(foo)
@@ -192,7 +192,7 @@ def test_taint(name = "World!", number="0", foo="foo"): # $routeHandler routedP
@app.route("/debug/<foo>/<bar>", methods=['GET']) # $routeSetup="/debug/<foo>/<bar>"
def debug(foo, bar): # $routeHandler routedParameter=foo routedParameter=bar
def debug(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
print("request.view_args", request.view_args)
print("request.headers {!r}".format(request.headers))
@@ -203,7 +203,7 @@ def debug(foo, bar): # $routeHandler routedParameter=foo routedParameter=bar
return 'ok' # $HttpResponse
@app.route("/stream", methods=['POST']) # $routeSetup="/stream"
def stream(): # $routeHandler
def stream(): # $requestHandler
print(request.path)
s = request.stream
print(s)
@@ -213,7 +213,7 @@ def stream(): # $routeHandler
return 'ok' # $HttpResponse
@app.route("/input_stream", methods=['POST']) # $routeSetup="/input_stream"
def input_stream(): # $routeHandler
def input_stream(): # $requestHandler
print(request.path)
s = request.input_stream
print(s)
@@ -224,14 +224,14 @@ def input_stream(): # $routeHandler
return 'ok' # $HttpResponse
@app.route("/form", methods=['POST']) # $routeSetup="/form"
def form(): # $routeHandler
def form(): # $requestHandler
print(request.path)
print("request.form", request.form)
return 'ok' # $HttpResponse
@app.route("/cache_control", methods=['POST']) # $routeSetup="/cache_control"
def cache_control(): # $routeHandler
def cache_control(): # $requestHandler
print(request.path)
print("request.cache_control.max_age", request.cache_control.max_age, type(request.cache_control.max_age))
print("request.cache_control.max_stale", request.cache_control.max_stale, type(request.cache_control.max_stale))
@@ -240,7 +240,7 @@ def cache_control(): # $routeHandler
return 'ok' # $HttpResponse
@app.route("/file_upload", methods=['POST']) # $routeSetup="/file_upload"
def file_upload(): # $routeHandler
def file_upload(): # $requestHandler
print(request.path)
for k,v in request.files.items():
print(k, v, v.name, v.filename, v.stream)
@@ -248,7 +248,7 @@ def file_upload(): # $routeHandler
return 'ok' # $HttpResponse
@app.route("/args", methods=['GET']) # $routeSetup="/args"
def args(): # $routeHandler
def args(): # $requestHandler
print(request.path)
print("request.args", request.args)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,5 @@
import pymysql
connection = pymysql.connect(host="localhost", user="user", password="passwd")
cursor = connection.cursor()
cursor.execute("some sql", (42,)) # $ getSql="some sql"

View File

@@ -2,3 +2,36 @@
| CodeExecution.py:36 | ok | test_additional_taint | cmd1 |
| CodeExecution.py:37 | ok | test_additional_taint | cmd2 |
| CodeExecution.py:38 | ok | test_additional_taint | cmd3 |
| http_server.py:22 | ok | test_cgi_FieldStorage_taint | form |
| http_server.py:24 | ok | test_cgi_FieldStorage_taint | form['key'] |
| http_server.py:25 | ok | test_cgi_FieldStorage_taint | form['key'].value |
| http_server.py:26 | ok | test_cgi_FieldStorage_taint | form['key'].file |
| http_server.py:27 | ok | test_cgi_FieldStorage_taint | form['key'].filename |
| http_server.py:28 | ok | test_cgi_FieldStorage_taint | form['key'][0] |
| http_server.py:29 | ok | test_cgi_FieldStorage_taint | form['key'][0].value |
| http_server.py:30 | ok | test_cgi_FieldStorage_taint | form['key'][0].file |
| http_server.py:31 | ok | test_cgi_FieldStorage_taint | form['key'][0].filename |
| http_server.py:32 | fail | test_cgi_FieldStorage_taint | ListComp |
| http_server.py:34 | ok | test_cgi_FieldStorage_taint | form.getvalue(..) |
| http_server.py:35 | ok | test_cgi_FieldStorage_taint | form.getvalue(..)[0] |
| http_server.py:37 | ok | test_cgi_FieldStorage_taint | form.getfirst(..) |
| http_server.py:39 | ok | test_cgi_FieldStorage_taint | form.getlist(..) |
| http_server.py:40 | ok | test_cgi_FieldStorage_taint | form.getlist(..)[0] |
| http_server.py:41 | fail | test_cgi_FieldStorage_taint | ListComp |
| http_server.py:50 | ok | taint_sources | self |
| http_server.py:52 | ok | taint_sources | self.requestline |
| http_server.py:54 | ok | taint_sources | self.path |
| http_server.py:56 | ok | taint_sources | self.headers |
| http_server.py:57 | ok | taint_sources | self.headers['Foo'] |
| http_server.py:58 | ok | taint_sources | self.headers.get(..) |
| http_server.py:59 | fail | taint_sources | self.headers.get_all(..) |
| http_server.py:60 | fail | taint_sources | self.headers.keys() |
| http_server.py:61 | ok | taint_sources | self.headers.values() |
| http_server.py:62 | ok | taint_sources | self.headers.items() |
| http_server.py:63 | fail | taint_sources | self.headers.as_bytes() |
| http_server.py:64 | fail | taint_sources | self.headers.as_string() |
| http_server.py:65 | ok | taint_sources | str(..) |
| http_server.py:66 | ok | taint_sources | bytes(..) |
| http_server.py:68 | ok | taint_sources | self.rfile |
| http_server.py:69 | fail | taint_sources | self.rfile.read() |
| http_server.py:78 | ok | taint_sources | form |

View File

@@ -1,2 +1,9 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.RemoteFlowSources
class WithRemoteFlowSources extends TestTaintTrackingConfiguration {
override predicate isSource(DataFlow::Node source) {
super.isSource(source) or
source instanceof RemoteFlowSource
}
}

View File

@@ -0,0 +1,122 @@
import sys
import os
import cgi
if sys.version_info[0] == 2:
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from CGIHTTPServer import CGIHTTPRequestHandler
if sys.version_info[0] == 3:
from http.server import HTTPServer, BaseHTTPRequestHandler, SimpleHTTPRequestHandler, CGIHTTPRequestHandler
def test_cgi_FieldStorage_taint():
# When a python script is invoked through CGI, the default values used by
# `cgi.FieldStorage` constructor makes it handle data from incoming request.
# You _can_ also manually set the input-data, as is shown below in `MyHandler`.
form = cgi.FieldStorage()
ensure_tainted(
form,
form['key'], # will be a list, if multiple fields named "key" are provided
form['key'].value,
form['key'].file,
form['key'].filename,
form['key'][0],
form['key'][0].value,
form['key'][0].file,
form['key'][0].filename,
[field.value for field in form['key']],
form.getvalue('key'), # will be a list, if multiple fields named "key" are provided
form.getvalue('key')[0],
form.getfirst('key'),
form.getlist('key'),
form.getlist('key')[0],
[field.value for field in form.getlist('key')],
)
class MyHandler(BaseHTTPRequestHandler):
def taint_sources(self):
ensure_tainted(
self,
self.requestline,
self.path,
self.headers,
self.headers['Foo'],
self.headers.get('Foo'),
self.headers.get_all('Foo'),
self.headers.keys(),
self.headers.values(),
self.headers.items(),
self.headers.as_bytes(),
self.headers.as_string(),
str(self.headers),
bytes(self.headers),
self.rfile,
self.rfile.read(),
)
form = cgi.FieldStorage(
self.rfile,
self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers.get('content-type')},
)
ensure_tainted(form)
def do_GET(self): # $ requestHandler
# send_response will log a line to stderr
self.send_response(200)
self.send_header("Content-type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(b"Hello BaseHTTPRequestHandler\n")
self.wfile.writelines([b"1\n", b"2\n", b"3\n"])
print(self.headers)
def do_POST(self): # $ requestHandler
form = cgi.FieldStorage(
self.rfile,
self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers.get('content-type')},
)
if 'myfile' not in form:
self.send_response(422)
self.end_headers()
return
field = form['myfile']
field.file.seek(0, os.SEEK_END)
filesize = field.file.tell()
print("Uploaded {!r} with {} bytes".format(field.filename, filesize))
self.send_response(200)
self.end_headers()
if __name__ == "__main__":
server = HTTPServer(("127.0.0.1", 8080), MyHandler)
server.serve_forever()
# Headers works case insensitvely, so self.headers['foo'] == self.headers['FOO']
# curl localhost:8080 --header "Foo: 1" --header "foo: 2"
# To test file submission through forms, use
# curl -F myfile=@<yourfile> localhost:8080

View File

@@ -0,0 +1,8 @@
import sqlite3
db = sqlite3.connect("example.db")
# non standard
db.execute("some sql", (42,)) # $ getSql="some sql"
cursor = db.cursor()
cursor.execute("some sql", (42,)) # $ getSql="some sql"

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
//
// class DedicatedResponseTest extends HttpServerHttpResponseTest {
// DedicatedResponseTest() { file.getShortName() = "response_test.py" }
// }
//
// class OtherResponseTest extends HttpServerHttpResponseTest {
// OtherResponseTest() { not this instanceof DedicatedResponseTest }
//
// override string getARelevantTag() { result = "HttpResponse" }
// }

View File

@@ -0,0 +1,3 @@
From Tornado 6.0 Python 3.5+ is [required](https://www.tornadoweb.org/en/stable/index.html#installation)
https://www.tornadoweb.org/en/stable/guide/structure.html#handling-request-input

View File

@@ -0,0 +1,41 @@
| taint_test.py:6 | ok | get | name |
| taint_test.py:6 | ok | get | number |
| taint_test.py:7 | ok | get | foo |
| taint_test.py:11 | ok | get | self.get_argument(..) |
| taint_test.py:12 | ok | get | self.get_arguments(..) |
| taint_test.py:13 | ok | get | self.get_arguments(..)[0] |
| taint_test.py:15 | ok | get | self.get_body_argument(..) |
| taint_test.py:16 | ok | get | self.get_body_arguments(..) |
| taint_test.py:17 | ok | get | self.get_body_arguments(..)[0] |
| taint_test.py:19 | ok | get | self.get_query_argument(..) |
| taint_test.py:20 | ok | get | self.get_query_arguments(..) |
| taint_test.py:21 | ok | get | self.get_query_arguments(..)[0] |
| taint_test.py:23 | ok | get | self.path_args |
| taint_test.py:24 | ok | get | self.path_args[0] |
| taint_test.py:26 | ok | get | self.path_kwargs |
| taint_test.py:27 | ok | get | self.path_kwargs["name"] |
| taint_test.py:34 | ok | get | request |
| taint_test.py:36 | ok | get | request.uri |
| taint_test.py:37 | ok | get | request.path |
| taint_test.py:38 | ok | get | request.query |
| taint_test.py:39 | ok | get | request.full_url() |
| taint_test.py:41 | ok | get | request.remote_ip |
| taint_test.py:43 | ok | get | request.body |
| taint_test.py:45 | ok | get | request.arguments |
| taint_test.py:46 | ok | get | request.arguments["name"] |
| taint_test.py:47 | ok | get | request.arguments["name"][0] |
| taint_test.py:49 | ok | get | request.query_arguments |
| taint_test.py:50 | ok | get | request.query_arguments["name"] |
| taint_test.py:51 | ok | get | request.query_arguments["name"][0] |
| taint_test.py:53 | ok | get | request.body_arguments |
| taint_test.py:54 | ok | get | request.body_arguments["name"] |
| taint_test.py:55 | ok | get | request.body_arguments["name"][0] |
| taint_test.py:58 | ok | get | request.headers |
| taint_test.py:59 | ok | get | request.headers["header-name"] |
| taint_test.py:60 | fail | get | request.headers.get_list(..) |
| taint_test.py:61 | fail | get | request.headers.get_all() |
| taint_test.py:62 | fail | get | ListComp |
| taint_test.py:65 | ok | get | request.cookies |
| taint_test.py:66 | ok | get | request.cookies["cookie-name"] |
| taint_test.py:67 | fail | get | request.cookies["cookie-name"].key |
| taint_test.py:68 | fail | get | request.cookies["cookie-name"].value |

View File

@@ -0,0 +1,6 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.RemoteFlowSources
class RemoteFlowTestTaintConfiguration extends TestTaintTrackingConfiguration {
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
}

View File

@@ -0,0 +1,69 @@
import tornado.web
class BasicHandler(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("BasicHandler " + self.get_argument("xss"))
def post(self): # $ requestHandler
self.write("BasicHandler (POST)")
class DeepInheritance(BasicHandler):
def get(self): # $ requestHandler
self.write("DeepInheritance" + self.get_argument("also_xss"))
class FormHandler(tornado.web.RequestHandler):
def post(self): # $ requestHandler
name = self.get_body_argument("name")
self.write(name)
class RedirectHandler(tornado.web.RequestHandler):
def get(self): # $ requestHandler
req = self.request
h = req.headers
url = h["url"]
self.redirect(url)
class BaseReverseInheritance(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("hello from BaseReverseInheritance")
class ReverseInheritance(BaseReverseInheritance):
pass
def make_app():
return tornado.web.Application(
[
(r"/basic", BasicHandler), # $ routeSetup="/basic"
(r"/deep", DeepInheritance), # $ routeSetup="/deep"
(r"/form", FormHandler), # $ routeSetup="/form"
(r"/redirect", RedirectHandler), # $ routeSetup="/redirect"
(r"/reverse-inheritance", ReverseInheritance), # $ routeSetup="/reverse-inheritance"
],
debug=True,
)
if __name__ == "__main__":
import tornado.ioloop
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
# http://localhost:8888/basic?xss=foo
# http://localhost:8888/deep?also_xss=foo
# curl -X POST http://localhost:8888/basic
# curl -X POST http://localhost:8888/deep
# curl -X POST -F "name=foo" http://localhost:8888/form
# curl -v -H 'url: http://example.com' http://localhost:8888/redirect
# http://localhost:8888/reverse-inheritance

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,35 @@
import tornado.web
class ResponseWriting(tornado.web.RequestHandler):
def get(self, type_): # $ requestHandler routedParameter=type_
if type_ == "str":
self.write("foo")
elif type_ == "bytes":
self.write(b"foo")
elif type_ == "dict":
# Content-type will be set to `application/json`
self.write({"foo": 42})
else:
raise Exception("Bad type {} {}".format(type_, type(type_)))
def make_app():
return tornado.web.Application(
[
(r"/ResponseWriting/(str|bytes|dict)", ResponseWriting), # $ routeSetup="/ResponseWriting/(str|bytes|dict)"
],
debug=True,
)
if __name__ == "__main__":
import tornado.ioloop
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
# http://localhost:8888/ResponseWriting/str
# http://localhost:8888/ResponseWriting/bytes
# http://localhost:8888/ResponseWriting/dict

View File

@@ -0,0 +1,110 @@
import tornado.web
import tornado.routing
class FooHandler(tornado.web.RequestHandler):
def get(self, x, y=None, not_used=None): # $ requestHandler routedParameter=x routedParameter=y
self.write("FooHandler {} {}".format(x, y))
class BarHandler(tornado.web.RequestHandler):
def get(self, x, y=None, not_used=None): # $ requestHandler routedParameter=x routedParameter=y SPURIOUS: routedParameter=not_used
self.write("BarHandler {} {}".format(x, y))
class BazHandler(tornado.web.RequestHandler):
def get(self, x, y=None, not_used=None): # $ requestHandler routedParameter=x routedParameter=y SPURIOUS: routedParameter=not_used
self.write("BazHandler {} {}".format(x, y))
class KwArgs(tornado.web.RequestHandler):
def get(self, *, x, y=None, not_used=None): # $ requestHandler routedParameter=x routedParameter=y
self.write("KwArgs {} {}".format(x, y))
class OnlyLocalhost(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("OnlyLocalhost")
class One(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("One")
class Two(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("Two")
class Three(tornado.web.RequestHandler):
def get(self): # $ requestHandler
self.write("Three")
class AddedLater(tornado.web.RequestHandler):
def get(self, x, y=None, not_used=None): # $ requestHandler routedParameter=x routedParameter=y
self.write("AddedLater {} {}".format(x, y))
class PossiblyNotRouted(tornado.web.RequestHandler):
# Even if our analysis can't find a route-setup for this class, we should still
# consider it to be a handle incoming HTTP requests
def get(self): # $ requestHandler
self.write("NotRouted")
def make_app():
# see https://www.tornadoweb.org/en/stable/routing.html for even more examples
app = tornado.web.Application(
[
(r"/foo/([0-9]+)/([0-9]+)?", FooHandler), # $ routeSetup="/foo/([0-9]+)/([0-9]+)?"
tornado.web.URLSpec(r"/bar/([0-9]+)/([0-9]+)?", BarHandler), # $ MISSING: routeSetup="/bar/([0-9]+)/([0-9]+)?"
# Very verbose way to write same as FooHandler
tornado.routing.Rule(tornado.routing.PathMatches(r"/baz/([0-9]+)/([0-9]+)?"), BazHandler), # $ MISSING: routeSetup="/baz/([0-9]+)/([0-9]+)?"
(r"/kw-args/(?P<x>[0-9]+)/(?P<y>[0-9]+)?", KwArgs), # $ routeSetup="/kw-args/(?P<x>[0-9]+)/(?P<y>[0-9]+)?"
# You can do nesting
(r"/(one|two|three)", [
(r"/one", One), # $ routeSetup="/one"
(r"/two", Two), # $ routeSetup="/two"
(r"/three", Three) # $ routeSetup="/three"
]),
# which is _one_ recommended way to ensure known host is used
(tornado.routing.HostMatches(r"(localhost|127\.0\.0\.1)"), [
("/only-localhost", OnlyLocalhost) # $ routeSetup="/only-localhost"
]),
],
debug=True,
)
app.add_handlers(r".*", [(r"/added-later/([0-9]+)/([0-9]+)?", AddedLater)]) # $ routeSetup="/added-later/([0-9]+)/([0-9]+)?"
return app
if __name__ == "__main__":
import tornado.ioloop
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
# http://localhost:8888/foo/42/
# http://localhost:8888/foo/42/1337
# http://localhost:8888/bar/42/
# http://localhost:8888/bar/42/1337
# http://localhost:8888/baz/42/
# http://localhost:8888/baz/42/1337
# http://localhost:8888/kw-args/42/
# http://localhost:8888/kw-args/42/1337
# http://localhost:8888/only-localhost
# http://localhost:8888/one
# http://localhost:8888/two
# http://localhost:8888/three
# http://localhost:8888/added-later

View File

@@ -0,0 +1,90 @@
import tornado.web
class TaintTest(tornado.web.RequestHandler):
def get(self, name = "World!", number="0", foo="foo"): # $ requestHandler routedParameter=name routedParameter=number
ensure_tainted(name, number)
ensure_not_tainted(foo)
ensure_tainted(
# see https://www.tornadoweb.org/en/stable/web.html#input
self.get_argument("name"),
self.get_arguments("name"),
self.get_arguments("name")[0],
self.get_body_argument("name"),
self.get_body_arguments("name"),
self.get_body_arguments("name")[0],
self.get_query_argument("name"),
self.get_query_arguments("name"),
self.get_query_arguments("name")[0],
self.path_args,
self.path_args[0],
self.path_kwargs,
self.path_kwargs["name"],
)
request = self.request
ensure_tainted(
# see https://www.tornadoweb.org/en/stable/httputil.html#tornado.httputil.HTTPServerRequest
request,
request.uri,
request.path,
request.query,
request.full_url(),
request.remote_ip,
request.body,
request.arguments,
request.arguments["name"],
request.arguments["name"][0],
request.query_arguments,
request.query_arguments["name"],
request.query_arguments["name"][0],
request.body_arguments,
request.body_arguments["name"],
request.body_arguments["name"][0],
# dict-like, see https://www.tornadoweb.org/en/stable/httputil.html#tornado.httputil.HTTPHeaders
request.headers,
request.headers["header-name"],
request.headers.get_list("header-name"),
request.headers.get_all(),
[(k, v) for (k, v) in request.headers.get_all()],
# Dict[str, http.cookies.Morsel]
request.cookies,
request.cookies["cookie-name"],
request.cookies["cookie-name"].key,
request.cookies["cookie-name"].value,
)
def make_app():
return tornado.web.Application(
[
(r"/test_taint/([^/]+)/([0-9]+)", TaintTest), # $ routeSetup="/test_taint/([^/]+)/([0-9]+)"
],
debug=True,
)
if __name__ == "__main__":
import tornado.ioloop
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
# http://localhost:8888/ResponseWriting/str
# http://localhost:8888/ResponseWriting/bytes
# http://localhost:8888/ResponseWriting/dict

View File

@@ -111,6 +111,7 @@ class CodeExecutionTest extends InlineExpectationsTest {
override string getARelevantTag() { result = "getCode" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(CodeExecution ce, DataFlow::Node code |
exists(location.getFile().getRelativePath()) and
code = ce.getCode() and
@@ -128,6 +129,7 @@ class SqlExecutionTest extends InlineExpectationsTest {
override string getARelevantTag() { result = "getSql" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(SqlExecution e, DataFlow::Node sql |
exists(location.getFile().getRelativePath()) and
sql = e.getSql() and
@@ -142,9 +144,10 @@ class SqlExecutionTest extends InlineExpectationsTest {
class HttpServerRouteSetupTest extends InlineExpectationsTest {
HttpServerRouteSetupTest() { this = "HttpServerRouteSetupTest" }
override string getARelevantTag() { result in ["routeSetup", "routeHandler", "routedParameter"] }
override string getARelevantTag() { result in ["routeSetup"] }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(HTTP::Server::RouteSetup setup |
location = setup.getLocation() and
element = setup.toString() and
@@ -156,21 +159,31 @@ class HttpServerRouteSetupTest extends InlineExpectationsTest {
) and
tag = "routeSetup"
)
or
exists(HTTP::Server::RouteSetup setup, Function func |
func = setup.getARouteHandler() and
location = func.getLocation() and
element = func.toString() and
value = "" and
tag = "routeHandler"
)
or
exists(HTTP::Server::RouteSetup setup, Parameter param |
param = setup.getARoutedParameter() and
location = param.getLocation() and
element = param.toString() and
value = param.asName().getId() and
tag = "routedParameter"
}
}
class HttpServerRequestHandlerTest extends InlineExpectationsTest {
HttpServerRequestHandlerTest() { this = "HttpServerRequestHandlerTest" }
override string getARelevantTag() { result in ["requestHandler", "routedParameter"] }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
(
exists(HTTP::Server::RequestHandler handler |
location = handler.getLocation() and
element = handler.toString() and
value = "" and
tag = "requestHandler"
)
or
exists(HTTP::Server::RequestHandler handler, Parameter param |
param = handler.getARoutedParameter() and
location = param.getLocation() and
element = param.toString() and
value = param.asName().getId() and
tag = "routedParameter"
)
)
}
}
@@ -191,6 +204,8 @@ class HttpServerHttpResponseTest extends InlineExpectationsTest {
// flask tests more readable since adding full annotations for HttpResponses in the
// the tests for routing setup is both annoying and not very useful.
location.getFile() = file and
exists(file.getRelativePath()) and
// we need to do this step since we expect subclasses could override getARelevantTag
tag = getARelevantTag() and
(
exists(HTTP::Server::HttpResponse response |
@@ -230,8 +245,8 @@ class FileSystemAccessTest extends InlineExpectationsTest {
override string getARelevantTag() { result = "getAPathArgument" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(FileSystemAccess a, DataFlow::Node path |
exists(location.getFile().getRelativePath()) and
path = a.getAPathArgument() and
location = a.getLocation() and
element = path.toString() and
@@ -247,8 +262,8 @@ class PathNormalizationTest extends InlineExpectationsTest {
override string getARelevantTag() { result = "pathNormalization" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(Path::PathNormalization n |
exists(location.getFile().getRelativePath()) and
location = n.getLocation() and
element = n.toString() and
value = "" and
@@ -263,8 +278,8 @@ class SafeAccessCheckTest extends InlineExpectationsTest {
override string getARelevantTag() { result in ["checks", "branch"] }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(Path::SafeAccessCheck c, DataFlow::Node checks, boolean branch |
exists(location.getFile().getRelativePath()) and
c.checks(checks.asCfgNode(), branch) and
location = c.getLocation() and
(