Python: Move framework tests out of experimental

Since they are not experimental anymore 😄
This commit is contained in:
Rasmus Wriedt Larsen
2021-03-19 15:51:54 +01:00
parent 79feb3b689
commit d9079e34e3
123 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,41 @@
# DSA is a public-key algorithm for signing messages.
# Following example at https://pycryptodome.readthedocs.io/en/latest/src/signature/dsa.html
from Crypto.PublicKey import DSA
from Crypto.Signature import DSS
from Crypto.Hash import SHA256
private_key = DSA.generate(2048) # $ PublicKeyGeneration keySize=2048
public_key = private_key.publickey()
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = DSS.new(private_key, mode='fips-186-3')
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = DSS.new(public_key, mode='fips-186-3')
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,38 @@
from Crypto.PublicKey import ECC
from Crypto.Signature import DSS
from Crypto.Hash import SHA256
private_key = ECC.generate(curve="P-256") # $ PublicKeyGeneration keySize=256
public_key = private_key.public_key()
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = DSS.new(private_key, mode='fips-186-3')
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = DSS.new(public_key, mode='fips-186-3')
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,73 @@
# RSA is a public-key algorithm for encrypting and signing messages.
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import pss
from Crypto.Hash import SHA256
private_key = RSA.generate(2048) # $ PublicKeyGeneration keySize=2048
# These 2 methods do the same
public_key = private_key.publickey()
public_key = private_key.public_key()
# ------------------------------------------------------------------------------
# encrypt/decrypt
# ------------------------------------------------------------------------------
print("encrypt/decrypt")
secret_message = b"secret message"
# Following example at https://pycryptodome.readthedocs.io/en/latest/src/examples.html#encrypt-data-with-rsa
encrypt_cipher = PKCS1_OAEP.new(public_key)
encrypted = encrypt_cipher.encrypt(secret_message)
print("encrypted={}".format(encrypted))
print()
decrypt_cipher = PKCS1_OAEP.new(private_key)
decrypted = decrypt_cipher.decrypt(
encrypted,
)
print("decrypted={}".format(decrypted))
assert decrypted == secret_message
print("\n---\n")
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = pss.new(private_key)
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = pss.new(public_key)
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
verifier = pss.new(public_key)
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,41 @@
# DSA is a public-key algorithm for signing messages.
# Following example at https://pycryptodome.readthedocs.io/en/latest/src/signature/dsa.html
from Cryptodome.PublicKey import DSA
from Cryptodome.Signature import DSS
from Cryptodome.Hash import SHA256
private_key = DSA.generate(2048) # $ PublicKeyGeneration keySize=2048
public_key = private_key.publickey()
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = DSS.new(private_key, mode='fips-186-3')
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = DSS.new(public_key, mode='fips-186-3')
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,38 @@
from Cryptodome.PublicKey import ECC
from Cryptodome.Signature import DSS
from Cryptodome.Hash import SHA256
private_key = ECC.generate(curve="P-256") # $ PublicKeyGeneration keySize=256
public_key = private_key.public_key()
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = DSS.new(private_key, mode='fips-186-3')
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = DSS.new(public_key, mode='fips-186-3')
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,73 @@
# RSA is a public-key algorithm for encrypting and signing messages.
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_OAEP
from Cryptodome.Signature import pss
from Cryptodome.Hash import SHA256
private_key = RSA.generate(2048) # $ PublicKeyGeneration keySize=2048
# These 2 methods do the same
public_key = private_key.publickey()
public_key = private_key.public_key()
# ------------------------------------------------------------------------------
# encrypt/decrypt
# ------------------------------------------------------------------------------
print("encrypt/decrypt")
secret_message = b"secret message"
# Following example at https://pycryptodome.readthedocs.io/en/latest/src/examples.html#encrypt-data-with-rsa
encrypt_cipher = PKCS1_OAEP.new(public_key)
encrypted = encrypt_cipher.encrypt(secret_message)
print("encrypted={}".format(encrypted))
print()
decrypt_cipher = PKCS1_OAEP.new(private_key)
decrypted = decrypt_cipher.decrypt(
encrypted,
)
print("decrypted={}".format(decrypted))
assert decrypted == secret_message
print("\n---\n")
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
message = b"message"
signer = pss.new(private_key)
hasher = SHA256.new(message)
signature = signer.sign(hasher)
print("signature={}".format(signature))
print()
verifier = pss.new(public_key)
hasher = SHA256.new(message)
verifier.verify(hasher, signature)
print("Signature verified (as expected)")
try:
verifier = pss.new(public_key)
hasher = SHA256.new(b"other message")
verifier.verify(hasher, signature)
raise Exception("Signature verified (unexpected)")
except ValueError:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,37 @@
# DSA is a public-key algorithm for signing messages.
# see https://cryptography.io/en/latest/hazmat/primitives/asymmetric/dsa.html
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
HASH_ALGORITHM = hashes.SHA256()
private_key = dsa.generate_private_key(key_size=2048) # $ PublicKeyGeneration keySize=2048
public_key = private_key.public_key()
message = b"message"
# Following example at https://cryptography.io/en/latest/hazmat/primitives/asymmetric/dsa.html#signing
signature = private_key.sign(
message,
algorithm=HASH_ALGORITHM,
)
print("signature={}".format(signature))
print()
public_key.verify(
signature, message, algorithm=HASH_ALGORITHM
)
print("Signature verified (as expected)")
try:
public_key.verify(
signature, b"other message", algorithm=HASH_ALGORITHM
)
raise Exception("Signature verified (unexpected)")
except InvalidSignature:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,43 @@
# see https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa.html
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
private_key = ec.generate_private_key(curve=ec.SECP384R1()) # $ PublicKeyGeneration keySize=384
public_key = private_key.public_key()
HASH_ALGORITHM = hashes.SHA256()
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
SIGNATURE_ALGORITHM = ec.ECDSA(HASH_ALGORITHM)
message = b"message"
signature = private_key.sign(
message,
signature_algorithm=SIGNATURE_ALGORITHM,
)
print("signature={}".format(signature))
print()
public_key.verify(
signature, message, signature_algorithm=SIGNATURE_ALGORITHM
)
print("Signature verified (as expected)")
try:
public_key.verify(
signature, b"other message", signature_algorithm=SIGNATURE_ALGORITHM
)
raise Exception("Signature verified (unexpected)")
except InvalidSignature:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,80 @@
# RSA is a public-key algorithm for encrypting and signing messages.
# see https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa.html
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) # $ PublicKeyGeneration keySize=2048
public_key = private_key.public_key()
HASH_ALGORITHM = hashes.SHA256()
# ------------------------------------------------------------------------------
# encrypt/decrypt
# ------------------------------------------------------------------------------
print("encrypt/decrypt")
ENCRYPT_PADDING = padding.OAEP(
mgf=padding.MGF1(algorithm=HASH_ALGORITHM),
algorithm=HASH_ALGORITHM,
label=None,
)
secret_message = b"secret message"
# Following example at https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa.html#encryption
encrypted = public_key.encrypt(secret_message, padding=ENCRYPT_PADDING)
print("encrypted={}".format(encrypted))
print()
decrypted = private_key.decrypt(
encrypted,
padding=ENCRYPT_PADDING
)
print("decrypted={}".format(decrypted))
assert decrypted == secret_message
print("\n---\n")
# ------------------------------------------------------------------------------
# sign/verify
# ------------------------------------------------------------------------------
print("sign/verify")
SIGN_PADDING = padding.PSS(
mgf=padding.MGF1(HASH_ALGORITHM),
salt_length=padding.PSS.MAX_LENGTH
)
message = b"message"
signature = private_key.sign(
message,
padding=SIGN_PADDING,
algorithm=HASH_ALGORITHM,
)
print("signature={}".format(signature))
print()
public_key.verify(
signature, message, padding=SIGN_PADDING, algorithm=HASH_ALGORITHM
)
print("Signature verified (as expected)")
try:
public_key.verify(
signature, b"other message", padding=SIGN_PADDING, algorithm=HASH_ALGORITHM
)
raise Exception("Signature verified (unexpected)")
except InvalidSignature:
print("Signature mismatch (as expected)")

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,3 @@
import dill
dill.loads(payload) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=dill decodeMayExecuteInput

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,79 @@
"""test of views for Django 1.x"""
from django.conf.urls import patterns, url
from django.http.response import HttpResponse
from django.views.generic import View
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
return rsp
class Foo(object):
# Note: since Foo is used as the super type in a class view, it will be able to handle requests.
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
urlpatterns = [
url(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
url(r"^get_params", get_params_xss), # $routeSetup="^get_params"
url(r"^post_params", post_params_xss), # $routeSetup="^post_params"
url(r"^http_resp_write", http_resp_write), # $routeSetup="^http_resp_write"
url(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $routeSetup="^class_view/(?P<untrusted>.+)"
# one pattern to support `articles/page-<n>` and ensuring that articles/ goes to page-1
url(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
# passing as positional argument is not the recommended way of doing things, but it is certainly
# possible
url(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
]
################################################################################
# Using patterns() for routing
def show_user(request, username): # $requestHandler routedParameter=username
return HttpResponse('show_user {}'.format(username)) # $HttpResponse
urlpatterns = patterns(url(r"^users/(?P<username>[^/]+)", show_user)) # $routeSetup="^users/(?P<username>[^/]+)"
################################################################################
# Show we understand the keyword arguments to django.conf.urls.url
def kw_args(request): # $requestHandler
return HttpResponse('kw_args') # $HttpResponse
urlpatterns = [
url(view=kw_args, regex=r"^kw_args") # $routeSetup="^kw_args"
]

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,17 @@
Tests for Django in version 2.x and 3.x.
This folder contains a runable django application generated with `django-admin startproject testproj` and `django-admin startapp testapp`.
To run the development server, install django (in venv), and run `python manage.py runserver`
To understand how things work, see
- https://docs.djangoproject.com/en/3.1/intro/tutorial01/#creating-a-project
- https://docs.djangoproject.com/en/3.1/intro/tutorial02/#activating-models
---
Note that from [Django 2.0 only Python 3 is supported](https://docs.djangoproject.com/en/stable/releases/2.0/#python-compatibility) (enforced by `options` file).
As I see it, from a QL modeling perspective, the important part of [Django 3.0](https://docs.djangoproject.com/en/stable/releases/3.0/) was the added support for ASGI (Asynchronous Server Gateway Interface), and [Django 3.1](https://docs.djangoproject.com/en/stable/releases/3.1/) added support for async views, async middleware.
We currently don't have any tests specific to Django 3.0, since it's very compatible with Django 2.0 in general, but we could split the tests in the future.

View File

@@ -0,0 +1,89 @@
| response_test.py:61 | ok | get_redirect_url | foo |
| taint_test.py:8 | ok | test_taint | bar |
| taint_test.py:8 | ok | test_taint | foo |
| taint_test.py:9 | ok | test_taint | baz |
| taint_test.py:15 | ok | test_taint | request |
| taint_test.py:17 | ok | test_taint | request.body |
| taint_test.py:18 | ok | test_taint | request.path |
| taint_test.py:19 | ok | test_taint | request.path_info |
| taint_test.py:23 | ok | test_taint | request.method |
| taint_test.py:25 | ok | test_taint | request.encoding |
| taint_test.py:26 | ok | test_taint | request.content_type |
| taint_test.py:29 | ok | test_taint | request.content_params |
| taint_test.py:30 | ok | test_taint | request.content_params["key"] |
| taint_test.py:31 | ok | test_taint | request.content_params.get(..) |
| taint_test.py:35 | ok | test_taint | request.GET |
| taint_test.py:36 | ok | test_taint | request.GET["key"] |
| taint_test.py:37 | ok | test_taint | request.GET.get(..) |
| taint_test.py:38 | fail | test_taint | request.GET.getlist(..) |
| taint_test.py:39 | fail | test_taint | request.GET.getlist(..)[0] |
| taint_test.py:40 | ok | test_taint | request.GET.pop(..) |
| taint_test.py:41 | ok | test_taint | request.GET.pop(..)[0] |
| taint_test.py:42 | ok | test_taint | request.GET.popitem()[0] |
| taint_test.py:43 | ok | test_taint | request.GET.popitem()[1] |
| taint_test.py:44 | ok | test_taint | request.GET.popitem()[1][0] |
| taint_test.py:45 | fail | test_taint | request.GET.dict() |
| taint_test.py:46 | fail | test_taint | request.GET.dict()["key"] |
| taint_test.py:47 | fail | test_taint | request.GET.urlencode() |
| taint_test.py:50 | ok | test_taint | request.POST |
| taint_test.py:53 | ok | test_taint | request.COOKIES |
| taint_test.py:54 | ok | test_taint | request.COOKIES["key"] |
| taint_test.py:55 | ok | test_taint | request.COOKIES.get(..) |
| taint_test.py:58 | ok | test_taint | request.FILES |
| taint_test.py:59 | ok | test_taint | request.FILES["key"] |
| taint_test.py:60 | fail | test_taint | request.FILES["key"].content_type |
| taint_test.py:61 | fail | test_taint | request.FILES["key"].content_type_extra |
| taint_test.py:62 | fail | test_taint | request.FILES["key"].content_type_extra["key"] |
| taint_test.py:63 | fail | test_taint | request.FILES["key"].charset |
| taint_test.py:64 | fail | test_taint | request.FILES["key"].name |
| taint_test.py:65 | fail | test_taint | request.FILES["key"].file |
| taint_test.py:66 | fail | test_taint | request.FILES["key"].file.read() |
| taint_test.py:68 | ok | test_taint | request.FILES.get(..) |
| taint_test.py:69 | fail | test_taint | request.FILES.get(..).name |
| taint_test.py:70 | fail | test_taint | request.FILES.getlist(..) |
| taint_test.py:71 | fail | test_taint | request.FILES.getlist(..)[0] |
| taint_test.py:72 | fail | test_taint | request.FILES.getlist(..)[0].name |
| taint_test.py:73 | fail | test_taint | request.FILES.dict() |
| taint_test.py:74 | fail | test_taint | request.FILES.dict()["key"] |
| taint_test.py:75 | fail | test_taint | request.FILES.dict()["key"].name |
| taint_test.py:78 | ok | test_taint | request.META |
| taint_test.py:79 | ok | test_taint | request.META["HTTP_USER_AGENT"] |
| taint_test.py:80 | ok | test_taint | request.META.get(..) |
| taint_test.py:83 | ok | test_taint | request.headers |
| taint_test.py:84 | ok | test_taint | request.headers["user-agent"] |
| taint_test.py:85 | ok | test_taint | request.headers["USER_AGENT"] |
| taint_test.py:88 | ok | test_taint | request.resolver_match |
| taint_test.py:89 | fail | test_taint | request.resolver_match.args |
| taint_test.py:90 | fail | test_taint | request.resolver_match.args[0] |
| taint_test.py:91 | fail | test_taint | request.resolver_match.kwargs |
| taint_test.py:92 | fail | test_taint | request.resolver_match.kwargs["key"] |
| taint_test.py:94 | fail | test_taint | request.get_full_path() |
| taint_test.py:95 | fail | test_taint | request.get_full_path_info() |
| taint_test.py:99 | fail | test_taint | request.read() |
| taint_test.py:100 | fail | test_taint | request.readline() |
| taint_test.py:101 | fail | test_taint | request.readlines() |
| taint_test.py:102 | fail | test_taint | request.readlines()[0] |
| taint_test.py:103 | fail | test_taint | ListComp |
| taint_test.py:109 | ok | test_taint | args |
| taint_test.py:110 | ok | test_taint | args[0] |
| taint_test.py:111 | ok | test_taint | kwargs |
| taint_test.py:112 | ok | test_taint | kwargs["key"] |
| taint_test.py:116 | ok | test_taint | request.current_app |
| taint_test.py:121 | ok | test_taint | request.get_host() |
| taint_test.py:122 | ok | test_taint | request.get_port() |
| taint_test.py:129 | fail | test_taint | request.build_absolute_uri() |
| taint_test.py:130 | fail | test_taint | request.build_absolute_uri(..) |
| taint_test.py:131 | fail | test_taint | request.build_absolute_uri(..) |
| taint_test.py:134 | ok | test_taint | request.build_absolute_uri(..) |
| taint_test.py:135 | ok | test_taint | request.build_absolute_uri(..) |
| taint_test.py:143 | ok | test_taint | request.get_signed_cookie(..) |
| taint_test.py:144 | ok | test_taint | request.get_signed_cookie(..) |
| taint_test.py:145 | ok | test_taint | request.get_signed_cookie(..) |
| taint_test.py:149 | fail | test_taint | request.get_signed_cookie(..) |
| taint_test.py:150 | fail | test_taint | request.get_signed_cookie(..) |
| taint_test.py:157 | ok | some_method | self.request |
| taint_test.py:158 | ok | some_method | self.request.GET["key"] |
| taint_test.py:160 | ok | some_method | self.args |
| taint_test.py:161 | ok | some_method | self.args[0] |
| taint_test.py:163 | ok | some_method | self.kwargs |
| taint_test.py:164 | ok | some_method | self.kwargs["key"] |

View File

@@ -0,0 +1,6 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.RemoteFlowSources
class RemoteFlowTestTaintConfiguration extends TestTaintTrackingConfiguration {
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
}

View File

@@ -0,0 +1,10 @@
# to force extractor to see files. since we use `--max-import-depth=1`, we use this
# "fake" import that doesn't actually work, but tricks the python extractor to look at
# all the files
from testproj import *
from testapp import *
import os.path as pth
pth.join("foo", "bar")

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'testproj.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,96 @@
from django.http.response import HttpResponse, HttpResponseRedirect, HttpResponsePermanentRedirect, JsonResponse, HttpResponseNotFound
from django.views.generic import RedirectView
import django.shortcuts
# Not an XSS sink, since the Content-Type is not "text/html"
# FP reported in https://github.com/github/codeql-python-team/issues/38
def safe__json_response(request):
# implicitly sets Content-Type to "application/json"
return JsonResponse({"foo": request.GET.get("foo")}) # $HttpResponse mimetype=application/json responseBody=Dict
# Not an XSS sink, since the Content-Type is not "text/html"
def safe__manual_json_response(request):
json_data = '{"json": "{}"}'.format(request.GET.get("foo"))
return HttpResponse(json_data, content_type="application/json") # $HttpResponse mimetype=application/json responseBody=json_data
# Not an XSS sink, since the Content-Type is not "text/html"
def safe__manual_content_type(request):
return HttpResponse('<img src="0" onerror="alert(1)">', content_type="text/plain") # $HttpResponse mimetype=text/plain responseBody='<img src="0" onerror="alert(1)">'
# XSS FP reported in https://github.com/github/codeql/issues/3466
# Note: This should be an open-redirect sink, but not an XSS sink.
def or__redirect(request):
next = request.GET.get("next")
return HttpResponseRedirect(next) # $HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation=next
def information_exposure_through_redirect(request, as_kw=False):
# This is a contrived example, but possible
private = "private"
next = request.GET.get("next")
if as_kw:
return HttpResponseRedirect(next, content=private) # $HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
else:
return HttpResponseRedirect(next, private) # $ HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
def perm_redirect(request):
private = "private"
next = request.GET.get("next")
return HttpResponsePermanentRedirect(next, private) # $ HttpResponse mimetype=text/html responseBody=private HttpRedirectResponse redirectLocation=next
def redirect_through_normal_response(request):
private = "private"
next = request.GET.get("next")
resp = HttpResponse() # $ HttpResponse mimetype=text/html
resp.status_code = 302
resp['Location'] = next # $ MISSING: redirectLocation=next
resp.content = private # $ MISSING: responseBody=private
return resp
def redirect_shortcut(request):
next = request.GET.get("next")
return django.shortcuts.redirect(next) # $ HttpResponse HttpRedirectResponse redirectLocation=next
class CustomRedirectView(RedirectView):
def get_redirect_url(self, foo): # $ requestHandler routedParameter=foo
ensure_tainted(foo)
next = "https://example.com/{}".format(foo)
return next # $ HttpResponse HttpRedirectResponse redirectLocation=next
# Ensure that simple subclasses are still vuln to XSS
def xss__not_found(request):
return HttpResponseNotFound(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=Attribute()
# Ensure we still have an XSS sink when manually setting the content_type to HTML
def xss__manual_response_type(request):
return HttpResponse(request.GET.get("name"), content_type="text/html; charset=utf-8") # $HttpResponse mimetype=text/html responseBody=Attribute()
def xss__write(request):
response = HttpResponse() # $HttpResponse mimetype=text/html
response.write(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=Attribute()
# This is safe but probably a bug if the argument to `write` is not a result of `json.dumps` or similar.
def safe__write_json(request):
response = JsonResponse() # $HttpResponse mimetype=application/json
response.write(request.GET.get("name")) # $HttpResponse mimetype=application/json responseBody=Attribute()
# Ensure manual subclasses are vulnerable
class CustomResponse(HttpResponse):
def __init__(self, banner, content, *args, **kwargs):
super().__init__(content, *args, content_type="text/html", **kwargs)
def xss__custom_response(request):
return CustomResponse("ACME Responses", request.GET("name")) # $HttpResponse MISSING: mimetype=text/html responseBody=Attribute() SPURIOUS: responseBody="ACME Responses"
class CustomJsonResponse(JsonResponse):
def __init__(self, banner, content, *args, **kwargs):
super().__init__(content, *args, content_type="text/html", **kwargs)
def safe__custom_json_response(request):
return CustomJsonResponse("ACME Responses", {"foo": request.GET.get("foo")}) # $HttpResponse mimetype=application/json MISSING: responseBody=Dict SPURIOUS: responseBody="ACME Responses"

View File

@@ -0,0 +1,152 @@
"""testing views for Django 2.x and 3.x"""
from django.urls import path, re_path
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse, HttpResponseNotFound
from django.views import View
import django.views.generic.base
from django.views.decorators.http import require_GET
def url_match_xss(request, foo, bar, no_taint=None): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('url_match_xss: {} {}'.format(foo, bar)) # $HttpResponse
def get_params_xss(request): # $requestHandler
return HttpResponse(request.GET.get("untrusted")) # $HttpResponse
def post_params_xss(request): # $requestHandler
return HttpResponse(request.POST.get("untrusted")) # $HttpResponse
def http_resp_write(request): # $requestHandler
rsp = HttpResponse() # $HttpResponse
rsp.write(request.GET.get("untrusted")) # $HttpResponse
return rsp
class Foo(object):
# Note: since Foo is used as the super type in a class view, it will be able to handle requests.
def post(self, request, untrusted): # $ MISSING: requestHandler routedParameter=untrusted
return HttpResponse('Foo post: {}'.format(untrusted)) # $HttpResponse
class ClassView(View, Foo):
def get(self, request, untrusted): # $ requestHandler routedParameter=untrusted
return HttpResponse('ClassView get: {}'.format(untrusted)) # $HttpResponse
# direct import with full path to `View` class
class ClassView2(django.views.generic.base.View):
def get(self, request): # $ requestHandler
pass
def show_articles(request, page_number=1): # $requestHandler routedParameter=page_number
page_number = int(page_number)
return HttpResponse('articles page: {}'.format(page_number)) # $HttpResponse
def xxs_positional_arg(request, arg0, arg1, no_taint=None): # $requestHandler routedParameter=arg0 routedParameter=arg1
return HttpResponse('xxs_positional_arg: {} {}'.format(arg0, arg1)) # $HttpResponse
urlpatterns = [
re_path(r"^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)", url_match_xss), # $routeSetup="^url_match/(?P<foo>[^/]+)/(?P<bar>[^/]+)"
re_path(r"^get_params", get_params_xss), # $routeSetup="^get_params"
re_path(r"^post_params", post_params_xss), # $routeSetup="^post_params"
re_path(r"^http_resp_write", http_resp_write), # $routeSetup="^http_resp_write"
re_path(r"^class_view/(?P<untrusted>.+)", ClassView.as_view()), # $routeSetup="^class_view/(?P<untrusted>.+)"
# one pattern to support `articles/page-<n>` and ensuring that articles/ goes to page-1
re_path(r"articles/^(?:page-(?P<page_number>\d+)/)?", show_articles), # $routeSetup="articles/^(?:page-(?P<page_number>\d+)/)?"
# passing as positional argument is not the recommended way of doing things, but it is certainly
# possible
re_path(r"^([^/]+)/(?:foo|bar)/([^/]+)", xxs_positional_arg, name='xxs_positional_arg'), # $routeSetup="^([^/]+)/(?:foo|bar)/([^/]+)"
]
# Show we understand the keyword arguments to django.urls.re_path
def re_path_kwargs(request): # $requestHandler
return HttpResponse('re_path_kwargs') # $HttpResponse
urlpatterns = [
re_path(view=re_path_kwargs, route=r"^specifying-as-kwargs-is-not-a-problem") # $routeSetup="^specifying-as-kwargs-is-not-a-problem"
]
################################################################################
# Using path
################################################################################
# saying page_number is an externally controlled *string* is a bit strange, when we have an int converter :O
def page_number(request, page_number=1): # $requestHandler routedParameter=page_number
return HttpResponse('page_number: {}'.format(page_number)) # $HttpResponse
def foo_bar_baz(request, foo, bar, baz): # $requestHandler routedParameter=foo routedParameter=bar routedParameter=baz
return HttpResponse('foo_bar_baz: {} {} {}'.format(foo, bar, baz)) # $HttpResponse
def path_kwargs(request, foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return HttpResponse('path_kwargs: {} {} {}'.format(foo, bar)) # $HttpResponse
def not_valid_identifier(request): # $requestHandler
return HttpResponse('<foo!>') # $HttpResponse
urlpatterns = [
path("articles/", page_number), # $routeSetup="articles/"
path("articles/page-<int:page_number>", page_number), # $routeSetup="articles/page-<int:page_number>"
path("<int:foo>/<str:bar>/<baz>", foo_bar_baz, name='foo-bar-baz'), # $routeSetup="<int:foo>/<str:bar>/<baz>"
path(view=path_kwargs, route="<foo>/<bar>"), # $routeSetup="<foo>/<bar>"
# We should not report there is a request parameter called `not_valid!`
path("not_valid/<not_valid!>", not_valid_identifier), # $routeSetup="not_valid/<not_valid!>"
]
################################################################################
# Deprecated django.conf.urls.url
################################################################################
# This version 1.x way of defining urls is deprecated in Django 3.1, but still works
from django.conf.urls import url
def deprecated(request): # $requestHandler
return HttpResponse('deprecated') # $HttpResponse
urlpatterns = [
url(r"^deprecated/", deprecated), # $routeSetup="^deprecated/"
]
################################################################################
# Special stuff
################################################################################
class PossiblyNotRouted(View):
# Even if our analysis can't find a route-setup for this class, we should still
# consider it to be a handle incoming HTTP requests
def get(self, request, possibly_not_routed=42): # $ requestHandler routedParameter=possibly_not_routed
return HttpResponse('PossiblyNotRouted get: {}'.format(possibly_not_routed)) # $HttpResponse
@require_GET
def with_decorator(request, foo): # $ requestHandler routedParameter=foo
pass
urlpatterns = [
path("with_decorator/<foo>", with_decorator), # $ routeSetup="with_decorator/<foo>"
]
class UnknownViewSubclass(UnknownViewSuperclass):
# Although we don't know for certain that this class is a django view class, the fact that it's
# used with `as_view()` in the routing setup should be enough that we treat it as such.
def get(self, request): # $ requestHandler
pass
urlpatterns = [
path("UnknownViewSubclass/", UnknownViewSubclass.as_view()), # $ routeSetup="UnknownViewSubclass/"
]

View File

@@ -0,0 +1,172 @@
"""testing views for Django 2.x and 3.x"""
from django.urls import path
from django.http import HttpRequest
from django.views import View
def test_taint(request: HttpRequest, foo, bar, baz=None): # $requestHandler routedParameter=foo routedParameter=bar
ensure_tainted(foo, bar)
ensure_not_tainted(baz)
# Manually inspected all fields of the HttpRequest object
# https://docs.djangoproject.com/en/3.0/ref/request-response/#httprequest-objects
ensure_tainted(
request,
request.body,
request.path,
request.path_info,
# With CSRF middleware disabled, it's possible to use custom methods,
# for example by `curl -X FOO <url>`
request.method,
request.encoding,
request.content_type,
# Dict[str, str]
request.content_params,
request.content_params["key"],
request.content_params.get("key"),
# django.http.QueryDict
# see https://docs.djangoproject.com/en/3.0/ref/request-response/#querydict-objects
request.GET,
request.GET["key"],
request.GET.get("key"),
request.GET.getlist("key"),
request.GET.getlist("key")[0],
request.GET.pop("key"),
request.GET.pop("key")[0],
request.GET.popitem()[0], # key
request.GET.popitem()[1], # values
request.GET.popitem()[1][0], # values[0]
request.GET.dict(),
request.GET.dict()["key"],
request.GET.urlencode(),
# django.http.QueryDict (same as above, did not duplicate tests)
request.POST,
# Dict[str, str]
request.COOKIES,
request.COOKIES["key"],
request.COOKIES.get("key"),
# MultiValueDict[str, UploadedFile]
request.FILES,
request.FILES["key"],
request.FILES["key"].content_type,
request.FILES["key"].content_type_extra,
request.FILES["key"].content_type_extra["key"],
request.FILES["key"].charset,
request.FILES["key"].name,
request.FILES["key"].file,
request.FILES["key"].file.read(),
request.FILES.get("key"),
request.FILES.get("key").name,
request.FILES.getlist("key"),
request.FILES.getlist("key")[0],
request.FILES.getlist("key")[0].name,
request.FILES.dict(),
request.FILES.dict()["key"],
request.FILES.dict()["key"].name,
# Dict[str, Any]
request.META,
request.META["HTTP_USER_AGENT"],
request.META.get("HTTP_USER_AGENT"),
# HttpHeaders (case insensitive dict-like)
request.headers,
request.headers["user-agent"],
request.headers["USER_AGENT"],
# django.urls.ResolverMatch
request.resolver_match,
request.resolver_match.args,
request.resolver_match.args[0],
request.resolver_match.kwargs,
request.resolver_match.kwargs["key"],
request.get_full_path(),
request.get_full_path_info(),
# build_absolute_uri handled below
# get_signed_cookie handled below
request.read(),
request.readline(),
request.readlines(),
request.readlines()[0],
[line for line in request],
)
# django.urls.ResolverMatch also supports iterable unpacking
_view, args, kwargs = request.resolver_match
ensure_tainted(
args,
args[0],
kwargs,
kwargs["key"],
)
ensure_not_tainted(
request.current_app,
# Django has `ALLOWED_HOSTS` to ensure the HOST value cannot be tampered with.
# It is possible to remove this protection, but it seems reasonable to assume
# people don"t do this by default.
request.get_host(),
request.get_port(),
)
####################################
# build_absolute_uri
####################################
ensure_tainted(
request.build_absolute_uri(),
request.build_absolute_uri(request.GET["key"]),
request.build_absolute_uri(location=request.GET["key"]),
)
ensure_not_tainted(
request.build_absolute_uri("/hardcoded/"),
request.build_absolute_uri("https://example.com"),
)
####################################
# get_signed_cookie
####################################
# We don't consider user to be able to tamper with cookies that are signed
ensure_not_tainted(
request.get_signed_cookie("key"),
request.get_signed_cookie("key", salt="salt"),
request.get_signed_cookie("key", max_age=60),
)
# However, providing tainted default value might result in taint
ensure_tainted(
request.get_signed_cookie("key", request.COOKIES["key"]),
request.get_signed_cookie("key", default=request.COOKIES["key"]),
)
class ClassView(View):
def some_method(self):
ensure_tainted(
self.request,
self.request.GET["key"],
self.args,
self.args[0],
self.kwargs,
self.kwargs["key"],
)
# fake setup, you can't actually run this
urlpatterns = [
path("test-taint/<foo>/<bar>", test_taint), # $ routeSetup="test-taint/<foo>/<bar>"
path("ClassView/", ClassView.as_view()), # $ routeSetup="ClassView/"
]

View File

@@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@@ -0,0 +1,5 @@
from django.apps import AppConfig
class TestappConfig(AppConfig):
name = 'testapp'

View File

@@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View File

@@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View File

@@ -0,0 +1,21 @@
from django.urls import path, re_path
# This version 1.x way of defining urls is deprecated in Django 3.1, but still works
from django.conf.urls import url
from . import views
urlpatterns = [
path("foo/", views.foo), # $routeSetup="foo/"
# TODO: Doesn't include standard `$` to mark end of string, due to problems with
# inline expectation tests (which thinks the `$` would mark the beginning of a new
# line)
re_path(r"^ba[rz]/", views.bar_baz), # $routeSetup="^ba[rz]/"
url(r"^deprecated/", views.deprecated), # $routeSetup="^deprecated/"
path("basic-view-handler/", views.MyBasicViewHandler.as_view()), # $routeSetup="basic-view-handler/"
path("custom-inheritance-view-handler/", views.MyViewHandlerWithCustomInheritance.as_view()), # $routeSetup="custom-inheritance-view-handler/"
path("CustomRedirectView/<foo>", views.CustomRedirectView.as_view()), # $routeSetup="CustomRedirectView/<foo>"
path("CustomRedirectView2/<foo>", views.CustomRedirectView2.as_view()), # $routeSetup="CustomRedirectView2/<foo>"
]

View File

@@ -0,0 +1,47 @@
from django.http import HttpRequest, HttpResponse
from django.views.generic import View, RedirectView
from django.views.decorators.csrf import csrf_exempt
def foo(request: HttpRequest): # $requestHandler
return HttpResponse("foo") # $HttpResponse
def bar_baz(request: HttpRequest): # $requestHandler
return HttpResponse("bar_baz") # $HttpResponse
def deprecated(request: HttpRequest): # $requestHandler
return HttpResponse("deprecated") # $HttpResponse
class MyBasicViewHandler(View):
def get(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyViewHandler: GET") # $ HttpResponse
def post(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyViewHandler: POST") # $ HttpResponse
class MyCustomViewBaseClass(View):
def post(self, request: HttpRequest): # $ requestHandler
return HttpResponse("MyCustomViewBaseClass: POST") # $ HttpResponse
class MyViewHandlerWithCustomInheritance(MyCustomViewBaseClass):
def get(self, request: HttpRequest): # $ requestHandler
print(self.request.GET)
return HttpResponse("MyViewHandlerWithCustomInheritance: GET") # $ HttpResponse
# RedirectView
# See docs at https://docs.djangoproject.com/en/3.1/ref/class-based-views/base/#redirectview
class CustomRedirectView(RedirectView):
def get_redirect_url(self, foo): # $ requestHandler routedParameter=foo
next = "https://example.com/{}".format(foo)
return next # $ HttpResponse HttpRedirectResponse redirectLocation=next
class CustomRedirectView2(RedirectView):
url = "https://example.com/%(foo)s"

View File

@@ -0,0 +1,16 @@
"""
ASGI config for testproj project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.1/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'testproj.settings')
application = get_asgi_application()

View File

@@ -0,0 +1,121 @@
"""
Django settings for testproj project.
Generated by 'django-admin startproject' using Django 3.1.2.
For more information on this file, see
https://docs.djangoproject.com/en/3.1/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.1/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.1/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '!vwm^sd$9#=ebollrn--dd3_x8-b=aj!c@lp8x)ha8r()^51^f'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'testapp.apps.TestappConfig',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'testproj.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'testproj.wsgi.application'
# Database
# https://docs.djangoproject.com/en/3.1/ref/settings/#databases
# DATABASES = {
# 'default': {
# 'ENGINE': 'django.db.backends.sqlite3',
# 'NAME': BASE_DIR / 'db.sqlite3',
# }
# }
# Password validation
# https://docs.djangoproject.com/en/3.1/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/3.1/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.1/howto/static-files/
STATIC_URL = '/static/'

View File

@@ -0,0 +1,22 @@
"""testproj URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/3.1/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls), # $routeSetup="admin/"
path("app/", include("testapp.urls")), # $routeSetup="app/"
]

View File

@@ -0,0 +1,16 @@
"""
WSGI config for testproj project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.1/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'testproj.settings')
application = get_wsgi_application()

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,29 @@
from django.db import connection, models
from django.db.models.expressions import RawSQL
def test_plain():
cursor = connection.cursor()
cursor.execute("some sql") # $getSql="some sql"
def test_context():
with connection.cursor() as cursor:
cursor.execute("some sql") # $getSql="some sql"
cursor.execute(sql="some sql") # $getSql="some sql"
class User(models.Model):
pass
def test_model():
User.objects.raw("some sql") # $getSql="some sql"
User.objects.annotate(RawSQL("some sql")) # $getSql="some sql"
User.objects.annotate(RawSQL("foo"), RawSQL("bar")) # $getSql="foo" getSql="bar"
User.objects.annotate(val=RawSQL("some sql")) # $getSql="some sql"
User.objects.extra("some sql") # $getSql="some sql"
User.objects.extra(select="select", where="where", tables="tables", order_by="order_by") # $getSql="select" getSql="where" getSql="tables" getSql="order_by"
raw = RawSQL("so raw")
User.objects.annotate(val=raw) # $getSql="so raw"

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,10 @@
| fabric_v1_execute.py:7 | fail | unsafe | cmd |
| fabric_v1_execute.py:7 | fail | unsafe | cmd2 |
| fabric_v1_execute.py:8 | ok | unsafe | safe_arg |
| fabric_v1_execute.py:8 | ok | unsafe | safe_optional |
| fabric_v1_execute.py:14 | fail | unsafe | cmd |
| fabric_v1_execute.py:14 | fail | unsafe | cmd2 |
| fabric_v1_execute.py:15 | ok | unsafe | safe_arg |
| fabric_v1_execute.py:15 | ok | unsafe | safe_optional |
| fabric_v1_execute.py:21 | ok | some_http_handler | cmd |
| fabric_v1_execute.py:21 | ok | some_http_handler | cmd2 |

View File

@@ -0,0 +1 @@
import experimental.dataflow.tainttracking.TestTaintLib

View File

@@ -0,0 +1,26 @@
"""Test that shows fabric.api.execute propagates taint"""
from fabric.api import run, execute
def unsafe(cmd, safe_arg, cmd2=None, safe_optional=5):
ensure_tainted(cmd, cmd2)
ensure_not_tainted(safe_arg, safe_optional)
class Foo(object):
def unsafe(self, cmd, safe_arg, cmd2=None, safe_optional=5):
ensure_tainted(cmd, cmd2)
ensure_not_tainted(safe_arg, safe_optional)
def some_http_handler():
cmd = TAINTED_STRING
cmd2 = TAINTED_STRING
ensure_tainted(cmd, cmd2)
execute(unsafe, cmd=cmd, safe_arg='safe_arg', cmd2=cmd2)
foo = Foo()
execute(foo.unsafe, cmd, 'safe_arg', cmd2)

View File

@@ -0,0 +1,14 @@
"""tests for the 'fabric' package (v1.x)
See http://docs.fabfile.org/en/1.14/tutorial.html
"""
from fabric.api import run, local, sudo
local("cmd1; cmd2") # $getCommand="cmd1; cmd2"
run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
local(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"

View File

@@ -0,0 +1,64 @@
"""tests for the 'fabric' package (v2.x)
Loosely inspired by http://docs.fabfile.org/en/2.5/getting-started.html
"""
from fabric import connection, Connection, group, SerialGroup, ThreadingGroup, tasks, task
################################################################################
# Connection
################################################################################
c = Connection("web1")
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.local("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.local(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
# fully qualified usage
c2 = connection.Connection("web2")
c2.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
################################################################################
# SerialGroup
################################################################################
results = SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool = SerialGroup("web1", "web2", "web3")
pool.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
# fully qualified usage
group.SerialGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
################################################################################
# ThreadingGroup
################################################################################
results = ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
pool = ThreadingGroup("web1", "web2", "web3")
pool.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
# fully qualified usage
group.ThreadingGroup("web1", "web2", "mac1").run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
################################################################################
# task decorator
# using the 'fab' command-line tool
################################################################################
@task
def foo(c):
# 'c' is a fabric.connection.Connection
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
# fully qualified usage
@tasks.task
def bar(c):
# 'c' is a fabric.connection.Connection
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,98 @@
| taint_test.py:6 | ok | test_taint | name |
| taint_test.py:6 | ok | test_taint | number |
| taint_test.py:7 | ok | test_taint | foo |
| taint_test.py:14 | ok | test_taint | request.environ |
| taint_test.py:15 | ok | test_taint | request.environ.get(..) |
| taint_test.py:17 | ok | test_taint | request.path |
| taint_test.py:18 | ok | test_taint | request.full_path |
| taint_test.py:19 | ok | test_taint | request.base_url |
| taint_test.py:20 | ok | test_taint | request.url |
| taint_test.py:23 | fail | test_taint | request.accept_charsets.best |
| taint_test.py:24 | fail | test_taint | request.accept_charsets.best_match(..) |
| taint_test.py:25 | ok | test_taint | request.accept_charsets[0] |
| taint_test.py:26 | ok | test_taint | request.accept_encodings |
| taint_test.py:27 | ok | test_taint | request.accept_languages |
| taint_test.py:28 | ok | test_taint | request.accept_mimetypes |
| taint_test.py:31 | ok | test_taint | request.access_control_request_headers |
| taint_test.py:33 | ok | test_taint | request.access_control_request_method |
| taint_test.py:35 | ok | test_taint | request.access_route |
| taint_test.py:36 | ok | test_taint | request.access_route[0] |
| taint_test.py:39 | ok | test_taint | request.args |
| taint_test.py:40 | ok | test_taint | request.args['key'] |
| taint_test.py:41 | ok | test_taint | request.args.getlist(..) |
| taint_test.py:44 | ok | test_taint | request.authorization |
| taint_test.py:45 | ok | test_taint | request.authorization['username'] |
| taint_test.py:46 | fail | test_taint | request.authorization.username |
| taint_test.py:49 | ok | test_taint | request.cache_control |
| taint_test.py:51 | fail | test_taint | request.cache_control.max_age |
| taint_test.py:52 | fail | test_taint | request.cache_control.max_stale |
| taint_test.py:53 | fail | test_taint | request.cache_control.min_fresh |
| taint_test.py:55 | ok | test_taint | request.content_encoding |
| taint_test.py:57 | ok | test_taint | request.content_md5 |
| taint_test.py:59 | ok | test_taint | request.content_type |
| taint_test.py:62 | ok | test_taint | request.cookies |
| taint_test.py:63 | ok | test_taint | request.cookies['key'] |
| taint_test.py:65 | ok | test_taint | request.data |
| taint_test.py:68 | ok | test_taint | request.files |
| taint_test.py:69 | ok | test_taint | request.files['key'] |
| taint_test.py:70 | fail | test_taint | request.files['key'].filename |
| taint_test.py:71 | fail | test_taint | request.files['key'].stream |
| taint_test.py:72 | ok | test_taint | request.files.getlist(..) |
| taint_test.py:73 | fail | test_taint | request.files.getlist(..)[0].filename |
| taint_test.py:74 | fail | test_taint | request.files.getlist(..)[0].stream |
| taint_test.py:77 | ok | test_taint | request.form |
| taint_test.py:78 | ok | test_taint | request.form['key'] |
| taint_test.py:79 | ok | test_taint | request.form.getlist(..) |
| taint_test.py:81 | ok | test_taint | request.get_data() |
| taint_test.py:83 | ok | test_taint | request.get_json() |
| taint_test.py:84 | ok | test_taint | request.get_json()['foo'] |
| taint_test.py:85 | ok | test_taint | request.get_json()['foo']['bar'] |
| taint_test.py:89 | ok | test_taint | request.headers |
| taint_test.py:90 | ok | test_taint | request.headers['key'] |
| taint_test.py:91 | fail | test_taint | request.headers.get_all(..) |
| taint_test.py:92 | fail | test_taint | request.headers.getlist(..) |
| taint_test.py:93 | ok | test_taint | list(..) |
| taint_test.py:94 | fail | test_taint | request.headers.to_wsgi_list() |
| taint_test.py:96 | ok | test_taint | request.json |
| taint_test.py:97 | ok | test_taint | request.json['foo'] |
| taint_test.py:98 | ok | test_taint | request.json['foo']['bar'] |
| taint_test.py:100 | ok | test_taint | request.method |
| taint_test.py:102 | ok | test_taint | request.mimetype |
| taint_test.py:104 | ok | test_taint | request.mimetype_params |
| taint_test.py:106 | ok | test_taint | request.origin |
| taint_test.py:109 | ok | test_taint | request.pragma |
| taint_test.py:111 | ok | test_taint | request.query_string |
| taint_test.py:113 | ok | test_taint | request.referrer |
| taint_test.py:115 | ok | test_taint | request.remote_addr |
| taint_test.py:117 | ok | test_taint | request.remote_user |
| taint_test.py:120 | ok | test_taint | request.stream |
| taint_test.py:121 | ok | test_taint | request.input_stream |
| taint_test.py:123 | ok | test_taint | request.url |
| taint_test.py:125 | ok | test_taint | request.user_agent |
| taint_test.py:128 | ok | test_taint | request.values |
| taint_test.py:129 | ok | test_taint | request.values['key'] |
| taint_test.py:130 | ok | test_taint | request.values.getlist(..) |
| taint_test.py:133 | ok | test_taint | request.view_args |
| taint_test.py:134 | ok | test_taint | request.view_args['key'] |
| taint_test.py:138 | ok | test_taint | request.script_root |
| taint_test.py:139 | ok | test_taint | request.url_root |
| taint_test.py:143 | ok | test_taint | request.charset |
| taint_test.py:144 | ok | test_taint | request.url_charset |
| taint_test.py:148 | ok | test_taint | request.date |
| taint_test.py:151 | ok | test_taint | request.endpoint |
| taint_test.py:156 | ok | test_taint | request.host |
| taint_test.py:157 | ok | test_taint | request.host_url |
| taint_test.py:159 | ok | test_taint | request.scheme |
| taint_test.py:161 | ok | test_taint | request.script_root |
| taint_test.py:169 | ok | test_taint | request.args |
| taint_test.py:170 | ok | test_taint | a |
| taint_test.py:171 | ok | test_taint | b |
| taint_test.py:173 | ok | test_taint | request.args['key'] |
| taint_test.py:174 | ok | test_taint | a['key'] |
| taint_test.py:175 | ok | test_taint | b['key'] |
| taint_test.py:177 | ok | test_taint | request.args.getlist(..) |
| taint_test.py:178 | ok | test_taint | a.getlist(..) |
| taint_test.py:179 | ok | test_taint | b.getlist(..) |
| taint_test.py:180 | ok | test_taint | gl(..) |
| taint_test.py:187 | ok | test_taint | req.path |
| taint_test.py:188 | ok | test_taint | gd() |

View File

@@ -0,0 +1,6 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.RemoteFlowSources
class RemoteFlowTestTaintConfiguration extends TestTaintTrackingConfiguration {
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
}

View File

@@ -0,0 +1,7 @@
import flask
bp3 = flask.Blueprint("bp3", __name__)
@bp3.route("/bp3/example") # $ routeSetup="/bp3/example"
def bp3_example(): # $ requestHandler
return "bp 3 example" # $ HttpResponse

View File

@@ -0,0 +1,67 @@
import flask
from flask import Flask, request, make_response
app = Flask(__name__)
@app.route("/") # $routeSetup="/"
def hello_world(): # $requestHandler
return "Hello World!" # $HttpResponse
from flask.views import MethodView
class MyView(MethodView):
def get(self, user_id): # $ requestHandler
if user_id is None:
# return a list of users
pass
else:
# expose a single user
pass
the_view = MyView.as_view('my_view')
app.add_url_rule('/the/', defaults={'user_id': None}, # $routeSetup="/the/"
view_func=the_view, methods=['GET',])
@app.route("/dangerous") # $routeSetup="/dangerous"
def dangerous(): # $requestHandler
return request.args.get('payload') # $HttpResponse
@app.route("/dangerous-with-cfg-split") # $routeSetup="/dangerous-with-cfg-split"
def dangerous2(): # $requestHandler
x = request.form['param0']
if request.method == "POST":
return request.form['param1'] # $HttpResponse
return None # $ SPURIOUS: HttpResponse
@app.route("/unsafe") # $routeSetup="/unsafe"
def unsafe(): # $requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + first_name) # $HttpResponse
@app.route("/safe") # $routeSetup="/safe"
def safe(): # $requestHandler
first_name = request.args.get('name', '')
return make_response("Your name is " + escape(first_name)) # $HttpResponse
@app.route("/hello/<name>") # $routeSetup="/hello/<name>"
def hello(name): # $requestHandler routedParameter=name
return make_response("Your name is " + name) # $HttpResponse
@app.route("/foo/<path:subpath>") # $routeSetup="/foo/<path:subpath>"
def foo(subpath): # $requestHandler routedParameter=subpath
return make_response("The subpath is " + subpath) # $HttpResponse
@app.route("/multiple/") # $routeSetup="/multiple/"
@app.route("/multiple/foo/<foo>") # $routeSetup="/multiple/foo/<foo>"
@app.route("/multiple/bar/<bar>") # $routeSetup="/multiple/bar/<bar>"
def multiple(foo=None, bar=None): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("foo={!r} bar={!r}".format(foo, bar)) # $HttpResponse
@app.route("/complex/<string(length=2):lang_code>") # $routeSetup="/complex/<string(length=2):lang_code>"
def complex(lang_code): # $requestHandler routedParameter=lang_code
return make_response("lang_code {}".format(lang_code)) # $HttpResponse
if __name__ == "__main__":
app.run(debug=True)

View File

@@ -0,0 +1,191 @@
import json
from flask import Flask, make_response, jsonify, Response, request, redirect
app = Flask(__name__)
@app.route("/html1") # $routeSetup="/html1"
def html1(): # $requestHandler
return "<h1>hello</h1>" # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/html2") # $routeSetup="/html2"
def html2(): # $requestHandler
# note that response saved in a variable intentionally -- we wan the annotations to
# show that we recognize the response creation, and not the return (hopefully). (and
# do the same in the following of the file)
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html3") # $routeSetup="/html3"
def html3(): # $requestHandler
resp = app.make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
# TODO: Create test-cases for the many ways that `make_response` can be used
# https://flask.palletsprojects.com/en/1.1.x/api/#flask.Flask.make_response
@app.route("/html4") # $routeSetup="/html4"
def html4(): # $requestHandler
resp = Response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html5") # $routeSetup="/html5"
def html5(): # $requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html6") # $routeSetup="/html6"
def html6(): # $requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/html7") # $routeSetup="/html7"
def html7(): # $requestHandler
resp = make_response() # $HttpResponse mimetype=text/html
resp.set_data("<h1>hello</h1>") # $ MISSING: responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/jsonify") # $routeSetup="/jsonify"
def jsonify_route(): # $requestHandler
data = {"foo": "bar"}
resp = jsonify(data) # $ MISSING: HttpResponse mimetype=application/json responseBody=data
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
################################################################################
# Tricky return handling
################################################################################
@app.route("/tricky-return1") # $routeSetup="/tricky-return1"
def tricky_return1(): # $requestHandler
if "raw" in request.args:
resp = "<h1>hellu</h1>"
else:
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
return resp # $HttpResponse mimetype=text/html responseBody=resp
def helper():
if "raw" in request.args:
return "<h1>hellu</h1>"
else:
return make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
@app.route("/tricky-return2") # $routeSetup="/tricky-return2"
def tricky_return2(): # $requestHandler
resp = helper()
return resp # $HttpResponse mimetype=text/html responseBody=resp
################################################################################
# Setting content-type manually
################################################################################
@app.route("/content-type/response-modification1") # $routeSetup="/content-type/response-modification1"
def response_modification1(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.content_type = "text/plain" # $ MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/response-modification2") # $routeSetup="/content-type/response-modification2"
def response_modification2(): # $requestHandler
resp = make_response("<h1>hello</h1>") # $HttpResponse mimetype=text/html responseBody="<h1>hello</h1>"
resp.headers["content-type"] = "text/plain" # $ MISSING: HttpResponse mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
# Exploration of mimetype/content_type/headers arguments to `app.response_class` -- things can get tricky
# see https://werkzeug.palletsprojects.com/en/1.0.x/wrappers/#werkzeug.wrappers.Response
@app.route("/content-type/Response1") # $routeSetup="/content-type/Response1"
def Response1(): # $requestHandler
resp = Response("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response2") # $routeSetup="/content-type/Response2"
def Response2(): # $requestHandler
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response3") # $routeSetup="/content-type/Response3"
def Response3(): # $requestHandler
# content_type argument takes priority (and result is text/plain)
resp = Response("<h1>hello</h1>", content_type="text/plain; charset=utf-8", mimetype="text/html") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response4") # $routeSetup="/content-type/Response4"
def Response4(): # $requestHandler
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/plain"}) # $HttpResponse responseBody="<h1>hello</h1>" SPURIOUS: mimetype=text/html MISSING: mimetype=text/plain
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response5") # $routeSetup="/content-type/Response5"
def Response5(): # $requestHandler
# content_type argument takes priority (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, content_type="text/plain; charset=utf-8") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Response6") # $routeSetup="/content-type/Response6"
def Response6(): # $requestHandler
# mimetype argument takes priority over header (and result is text/plain)
# note: capitalization of Content-Type does not matter
resp = Response("<h1>hello</h1>", headers={"Content-TYPE": "text/html"}, mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/Flask-response-class") # $routeSetup="/content-type/Flask-response-class"
def Flask_response_class(): # $requestHandler
# note: flask.Flask.response_class is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = Flask.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
@app.route("/content-type/app-response-class") # $routeSetup="/content-type/app-response-class"
def app_response_class(): # $requestHandler
# note: app.response_class (flask.Flask.response_class) is set to `flask.Response` by default.
# it can be overridden, but we don't try to handle that right now.
resp = app.response_class("<h1>hello</h1>", mimetype="text/plain") # $HttpResponse mimetype=text/plain responseBody="<h1>hello</h1>"
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
# TODO: add tests for setting status code
# TODO: add test that manually creates a redirect by setting status code and suitable header.
################################################################################
# Redirect
################################################################################
@app.route("/redirect-simple") # $routeSetup="/redirect-simple"
def redirect_simple(): # $requestHandler
next = request.args['next']
resp = redirect(next) # $ HttpResponse mimetype=text/html HttpRedirectResponse redirectLocation=next
return resp # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=resp
################################################################################
if __name__ == "__main__":
app.run(debug=True)

View File

@@ -0,0 +1,124 @@
import flask
from flask import Flask, make_response
app = Flask(__name__)
SOME_ROUTE = "/some/route"
@app.route(SOME_ROUTE) # $routeSetup="/some/route"
def some_route(): # $requestHandler
return make_response("some_route") # $HttpResponse
def index(): # $requestHandler
return make_response("index") # $HttpResponse
app.add_url_rule('/index', 'index', index) # $routeSetup="/index"
# We don't support this yet, and I think that's OK
def later_set(): # $ MISSING: requestHandler
return make_response("later_set") # $HttpResponse
app.add_url_rule('/later-set', 'later_set', view_func=None) # $routeSetup="/later-set"
app.view_functions['later_set'] = later_set
# We don't want to execute this at runtime (since program will crash). Just using
# `False` makes our analysis skip it, so here's a workaround :D
if __file__ == "False":
@app.route(UNKNOWN_ROUTE) # $routeSetup
def unkown_route(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
return make_response("unkown_route") # $HttpResponse
# View
#
# see https://flask.palletsprojects.com/en/1.1.x/views/#basic-principle
from flask.views import View
class ShowUser(View):
def dispatch_request(self, user_id): # $ requestHandler routedParameter=user_id
return "user_id: {}".format(user_id) # $ HttpResponse
app.add_url_rule("/basic/user/<int:user_id>", view_func=ShowUser.as_view('show_user')) # $routeSetup="/basic/user/<int:user_id>"
class WithoutKnownRoute1(View):
# For handler without known route, treat all parameters as routed parameters
# (accepting that there might be a few FPs)
def dispatch_request(self, foo, not_routed=42): # $ requestHandler routedParameter=foo SPURIOUS: routedParameter=not_routed
pass
# MethodView
#
# see https://flask.palletsprojects.com/en/1.1.x/views/#method-views-for-apis
from flask.views import MethodView
class UserAPI(MethodView):
def get(self, user_id): # $ requestHandler routedParameter=user_id
if user_id is None:
# return a list of users
pass
else:
# expose a single user
pass
def post(self): # $ requestHandler
# create a new user
pass
def delete(self, user_id): # $ requestHandler routedParameter=user_id
# delete a single user
pass
def put(self, user_id): # $ requestHandler routedParameter=user_id
# update a single user
pass
user_view = UserAPI.as_view("user_api")
app.add_url_rule("/users/", defaults={"user_id": None}, view_func=user_view, methods=["GET",]) # $routeSetup="/users/"
app.add_url_rule("/users/", view_func=user_view, methods=["POST",]) # $routeSetup="/users/"
app.add_url_rule("/users/<int:user_id>", view_func=user_view, methods=["GET", "PUT", "DELETE"]) # $routeSetup="/users/<int:user_id>"
class WithoutKnownRoute2(MethodView):
# For handler without known route, treat all parameters as routed parameters
# (accepting that there might be a few FPs)
def get(self, foo, not_routed=42): # $ requestHandler routedParameter=foo SPURIOUS: routedParameter=not_routed
pass
# Blueprints
#
# see https://flask.palletsprojects.com/en/1.1.x/blueprints/
bp1 = flask.Blueprint("bp1", __name__)
@bp1.route("/bp1/example/<foo>") # $ routeSetup="/bp1/example/<foo>"
def bp1_example(foo): # $ requestHandler routedParameter=foo
return "bp 1 example foo={}".format(foo) # $ HttpResponse
app.register_blueprint(bp1) # by default, URLs of blueprints are not prefixed
bp2 = flask.Blueprint("bp2", __name__)
@bp2.route("/example") # $ routeSetup="/example"
def bp2_example(): # $ requestHandler
return "bp 2 example" # $ HttpResponse
app.register_blueprint(bp2, url_prefix="/bp2") # but it is possible to add a URL prefix
from external_blueprint import bp3
app.register_blueprint(bp3)
if __name__ == "__main__":
print(app.url_map)
app.run(debug=True)

View File

@@ -0,0 +1,273 @@
from flask import Flask, request
app = Flask(__name__)
@app.route("/test_taint/<name>/<int:number>") # $routeSetup="/test_taint/<name>/<int:number>"
def test_taint(name = "World!", number="0", foo="foo"): # $requestHandler routedParameter=name routedParameter=number
ensure_tainted(name, number)
ensure_not_tainted(foo)
# Manually inspected all fields of the Request object
# https://flask.palletsprojects.com/en/1.1.x/api/#flask.Request
ensure_tainted(
request.environ,
request.environ.get('HTTP_AUTHORIZATION'),
request.path,
request.full_path,
request.base_url,
request.url,
# These request.accept_* properties are instances of subclasses of werkzeug.datastructures.Accept
request.accept_charsets.best,
request.accept_charsets.best_match(["utf-8", "utf-16"]),
request.accept_charsets[0],
request.accept_encodings,
request.accept_languages,
request.accept_mimetypes,
# werkzeug.datastructures.HeaderSet (subclass of collections_abc.MutableSet)
request.access_control_request_headers,
request.access_control_request_method,
request.access_route,
request.access_route[0],
# By default werkzeug.datastructures.ImmutableMultiDict -- although can be changed :\
request.args,
request.args['key'],
request.args.getlist('key'),
# werkzeug.datastructures.Authorization (a dict, with some properties)
request.authorization,
request.authorization['username'],
request.authorization.username,
# werkzeug.datastructures.RequestCacheControl
request.cache_control,
# These should be `int`s, but can be strings... see debug method below
request.cache_control.max_age,
request.cache_control.max_stale,
request.cache_control.min_fresh,
request.content_encoding,
request.content_md5,
request.content_type,
# werkzeug.datastructures.ImmutableTypeConversionDict (which is basically just a dict)
request.cookies,
request.cookies['key'],
request.data,
# a werkzeug.datastructures.MultiDict, mapping [str, werkzeug.datastructures.FileStorage]
request.files,
request.files['key'],
request.files['key'].filename,
request.files['key'].stream,
request.files.getlist('key'),
request.files.getlist('key')[0].filename,
request.files.getlist('key')[0].stream,
# By default werkzeug.datastructures.ImmutableMultiDict -- although can be changed :\
request.form,
request.form['key'],
request.form.getlist('key'),
request.get_data(),
request.get_json(),
request.get_json()['foo'],
request.get_json()['foo']['bar'],
# werkzeug.datastructures.EnvironHeaders,
# which has same interface as werkzeug.datastructures.Headers
request.headers,
request.headers['key'],
request.headers.get_all('key'),
request.headers.getlist('key'),
list(request.headers), # (k, v) list
request.headers.to_wsgi_list(), # (k, v) list
request.json,
request.json['foo'],
request.json['foo']['bar'],
request.method,
request.mimetype,
request.mimetype_params,
request.origin,
# werkzeug.datastructures.HeaderSet (subclass of collections_abc.MutableSet)
request.pragma,
request.query_string,
request.referrer,
request.remote_addr,
request.remote_user,
# file-like object
request.stream,
request.input_stream,
request.url,
request.user_agent,
# werkzeug.datastructures.CombinedMultiDict, which is basically just a werkzeug.datastructures.MultiDict
request.values,
request.values['key'],
request.values.getlist('key'),
# dict
request.view_args,
request.view_args['key'],
)
ensure_not_tainted(
request.script_root,
request.url_root,
# The expected charset for parsing request data / urls. Can not be changed by client.
# https://github.com/pallets/werkzeug/blob/4dc8d6ab840d4b78cbd5789cef91b01e3bde01d5/src/werkzeug/wrappers/base_request.py#L71-L72
request.charset,
request.url_charset,
# request.date is a parsed `datetime`
# https://github.com/pallets/werkzeug/blob/4dc8d6ab840d4b78cbd5789cef91b01e3bde01d5/src/werkzeug/wrappers/common_descriptors.py#L76-L83
request.date,
# Assuming that endpoints are not created by user-input seems fair
request.endpoint,
# In some rare circumstances a client could spoof the host, but by default they
# should not be able to. See
# https://werkzeug.palletsprojects.com/en/1.0.x/wrappers/#werkzeug.wrappers.BaseRequest.trusted_hosts
request.host,
request.host_url,
request.scheme,
request.script_root,
)
# Testing some more tricky data-flow still works
a = request.args
b = a
gl = b.getlist
ensure_tainted(
request.args,
a,
b,
request.args['key'],
a['key'],
b['key'],
request.args.getlist('key'),
a.getlist('key'),
b.getlist('key'),
gl('key'),
)
# aliasing tests
req = request
gd = request.get_data
ensure_tainted(
req.path,
gd(),
)
@app.route("/debug/<foo>/<bar>", methods=['GET']) # $routeSetup="/debug/<foo>/<bar>"
def debug(foo, bar): # $requestHandler routedParameter=foo routedParameter=bar
print("request.view_args", request.view_args)
print("request.headers {!r}".format(request.headers))
print("request.headers['accept'] {!r}".format(request.headers['accept']))
print("request.pragma {!r}".format(request.pragma))
return 'ok' # $HttpResponse
@app.route("/stream", methods=['POST']) # $routeSetup="/stream"
def stream(): # $requestHandler
print(request.path)
s = request.stream
print(s)
# just works :)
print(s.read())
return 'ok' # $HttpResponse
@app.route("/input_stream", methods=['POST']) # $routeSetup="/input_stream"
def input_stream(): # $requestHandler
print(request.path)
s = request.input_stream
print(s)
# hangs until client stops connection, since max number of bytes to read must
# be handled manually
print(s.read())
return 'ok' # $HttpResponse
@app.route("/form", methods=['POST']) # $routeSetup="/form"
def form(): # $requestHandler
print(request.path)
print("request.form", request.form)
return 'ok' # $HttpResponse
@app.route("/cache_control", methods=['POST']) # $routeSetup="/cache_control"
def cache_control(): # $requestHandler
print(request.path)
print("request.cache_control.max_age", request.cache_control.max_age, type(request.cache_control.max_age))
print("request.cache_control.max_stale", request.cache_control.max_stale, type(request.cache_control.max_stale))
print("request.cache_control.min_fresh", request.cache_control.min_fresh, type(request.cache_control.min_fresh))
return 'ok' # $HttpResponse
@app.route("/file_upload", methods=['POST']) # $routeSetup="/file_upload"
def file_upload(): # $requestHandler
print(request.path)
for k,v in request.files.items():
print(k, v, v.name, v.filename, v.stream)
return 'ok' # $HttpResponse
@app.route("/args", methods=['GET']) # $routeSetup="/args"
def args(): # $requestHandler
print(request.path)
print("request.args", request.args)
return 'ok' # $HttpResponse
# curl --header "My-Header: some-value" http://localhost:5000/debug/fooval/barval
# curl --header "Pragma: foo, bar" --header "Pragma: stuff, foo" http://localhost:5000/debug/fooval/barval
# curl -X POST --data 'wat' http://localhost:5000/stream
# curl -X POST --data 'wat' http://localhost:5000/input_stream
# curl --form foo=foo --form foo=123 http://localhost:5000/form
# curl --header "Cache-Control: max-age=foo, max-stale=bar, min-fresh=baz" http://localhost:5000/cache_control
# curl --header "Cache-Control: max-age=1, max-stale=2, min-fresh=3" http://localhost:5000/cache_control
# curl -F myfile=@<some-file> localhost:5000/file_upload
# curl http://localhost:5000/args?foo=42&bar=bar
if __name__ == "__main__":
app.run(debug=True)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,39 @@
"""tests for the 'invoke' package
see https://www.pyinvoke.org/
"""
import invoke
invoke.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.run(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
def with_sudo():
invoke.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
invoke.sudo(command="cmd1; cmd2") # $getCommand="cmd1; cmd2"
def manual_context():
c = invoke.Context()
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
c.sudo("cmd1; cmd2") # $getCommand="cmd1; cmd2"
# invoke.Context is just an alias for invoke.context.Context
c2 = invoke.context.Context()
c2.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
manual_context()
def foo_helper(c):
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
# for use with the 'invoke' command-line tool
@invoke.task
def foo(c):
# 'c' is a invoke.context.Context
c.run("cmd1; cmd2") # $getCommand="cmd1; cmd2"
foo_helper(c)

View File

@@ -0,0 +1,35 @@
edges
| test.py:21:11:21:18 | ControlFlowNode for source() | test.py:22:10:22:24 | ControlFlowNode for Attribute() |
| test.py:29:11:29:18 | ControlFlowNode for source() | test.py:33:10:33:12 | ControlFlowNode for val |
| test.py:40:11:40:25 | ControlFlowNode for Attribute() | test.py:41:10:41:12 | ControlFlowNode for val |
| test.py:45:11:45:18 | ControlFlowNode for source() | test.py:40:11:40:25 | ControlFlowNode for Attribute() |
| test.py:53:11:53:25 | ControlFlowNode for Attribute() | test.py:54:10:54:12 | ControlFlowNode for val |
| test.py:70:11:70:18 | ControlFlowNode for source() | test.py:53:11:53:25 | ControlFlowNode for Attribute() |
| test.py:78:11:78:14 | ControlFlowNode for bm() | test.py:79:10:79:12 | ControlFlowNode for val |
| test.py:83:11:83:18 | ControlFlowNode for source() | test.py:78:11:78:14 | ControlFlowNode for bm() |
| test.py:90:11:90:14 | ControlFlowNode for bm() | test.py:91:10:91:12 | ControlFlowNode for val |
| test.py:107:11:107:18 | ControlFlowNode for source() | test.py:90:11:90:14 | ControlFlowNode for bm() |
nodes
| test.py:21:11:21:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:22:10:22:24 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:29:11:29:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:33:10:33:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:40:11:40:25 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:41:10:41:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:45:11:45:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:53:11:53:25 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:54:10:54:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:70:11:70:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:78:11:78:14 | ControlFlowNode for bm() | semmle.label | ControlFlowNode for bm() |
| test.py:79:10:79:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:83:11:83:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:90:11:90:14 | ControlFlowNode for bm() | semmle.label | ControlFlowNode for bm() |
| test.py:91:10:91:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:107:11:107:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
#select
| test.py:22:10:22:24 | ControlFlowNode for Attribute() | test.py:21:11:21:18 | ControlFlowNode for source() | test.py:22:10:22:24 | ControlFlowNode for Attribute() | test flow (naive): test_simple |
| test.py:33:10:33:12 | ControlFlowNode for val | test.py:29:11:29:18 | ControlFlowNode for source() | test.py:33:10:33:12 | ControlFlowNode for val | test flow (naive): test_alias |
| test.py:41:10:41:12 | ControlFlowNode for val | test.py:45:11:45:18 | ControlFlowNode for source() | test.py:41:10:41:12 | ControlFlowNode for val | test flow (naive): test_accross_functions |
| test.py:54:10:54:12 | ControlFlowNode for val | test.py:70:11:70:18 | ControlFlowNode for source() | test.py:54:10:54:12 | ControlFlowNode for val | test flow (naive): test_deeply_nested |
| test.py:79:10:79:12 | ControlFlowNode for val | test.py:83:11:83:18 | ControlFlowNode for source() | test.py:79:10:79:12 | ControlFlowNode for val | test flow (naive): test_pass_bound_method |
| test.py:91:10:91:12 | ControlFlowNode for val | test.py:107:11:107:18 | ControlFlowNode for source() | test.py:91:10:91:12 | ControlFlowNode for val | test flow (naive): test_deeply_nested_bound_method |

View File

@@ -0,0 +1,24 @@
/**
* @kind path-problem
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
import DataFlow::PathGraph
import SharedCode
class MyClassGetValueAdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// obj -> obj.get_value()
exists(DataFlow::Node bound_method |
bound_method = myClassGetValue(nodeFrom) and
nodeTo.asCfgNode().(CallNode).getFunction() = bound_method.asCfgNode()
)
}
}
from SharedConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"test flow (naive): " + source.getNode().asCfgNode().getScope().getName()

View File

@@ -0,0 +1,75 @@
edges
| test.py:21:11:21:18 | ControlFlowNode for source() | test.py:22:10:22:22 | ControlFlowNode for Attribute |
| test.py:22:10:22:22 | ControlFlowNode for Attribute | test.py:22:10:22:24 | ControlFlowNode for Attribute() |
| test.py:29:11:29:18 | ControlFlowNode for source() | test.py:31:20:31:32 | ControlFlowNode for Attribute |
| test.py:31:20:31:32 | ControlFlowNode for Attribute | test.py:33:10:33:12 | ControlFlowNode for val |
| test.py:39:15:39:17 | ControlFlowNode for arg | test.py:40:11:40:23 | ControlFlowNode for Attribute |
| test.py:40:11:40:23 | ControlFlowNode for Attribute | test.py:41:10:41:12 | ControlFlowNode for val |
| test.py:45:11:45:18 | ControlFlowNode for source() | test.py:46:15:46:17 | ControlFlowNode for src |
| test.py:46:15:46:17 | ControlFlowNode for src | test.py:39:15:39:17 | ControlFlowNode for arg |
| test.py:52:24:52:26 | ControlFlowNode for arg | test.py:53:11:53:23 | ControlFlowNode for Attribute |
| test.py:53:11:53:23 | ControlFlowNode for Attribute | test.py:54:10:54:12 | ControlFlowNode for val |
| test.py:57:33:57:35 | ControlFlowNode for arg | test.py:58:24:58:26 | ControlFlowNode for arg |
| test.py:58:24:58:26 | ControlFlowNode for arg | test.py:52:24:52:26 | ControlFlowNode for arg |
| test.py:61:33:61:35 | ControlFlowNode for arg | test.py:62:33:62:35 | ControlFlowNode for arg |
| test.py:62:33:62:35 | ControlFlowNode for arg | test.py:57:33:57:35 | ControlFlowNode for arg |
| test.py:65:33:65:35 | ControlFlowNode for arg | test.py:66:33:66:35 | ControlFlowNode for arg |
| test.py:66:33:66:35 | ControlFlowNode for arg | test.py:61:33:61:35 | ControlFlowNode for arg |
| test.py:70:11:70:18 | ControlFlowNode for source() | test.py:71:33:71:35 | ControlFlowNode for src |
| test.py:71:33:71:35 | ControlFlowNode for src | test.py:65:33:65:35 | ControlFlowNode for arg |
| test.py:77:23:77:24 | ControlFlowNode for bm | test.py:79:10:79:12 | ControlFlowNode for val |
| test.py:83:11:83:18 | ControlFlowNode for source() | test.py:84:23:84:35 | ControlFlowNode for Attribute |
| test.py:84:23:84:35 | ControlFlowNode for Attribute | test.py:77:23:77:24 | ControlFlowNode for bm |
| test.py:89:37:89:38 | ControlFlowNode for bm | test.py:91:10:91:12 | ControlFlowNode for val |
| test.py:94:46:94:47 | ControlFlowNode for bm | test.py:95:37:95:38 | ControlFlowNode for bm |
| test.py:95:37:95:38 | ControlFlowNode for bm | test.py:89:37:89:38 | ControlFlowNode for bm |
| test.py:98:46:98:47 | ControlFlowNode for bm | test.py:99:46:99:47 | ControlFlowNode for bm |
| test.py:99:46:99:47 | ControlFlowNode for bm | test.py:94:46:94:47 | ControlFlowNode for bm |
| test.py:102:46:102:47 | ControlFlowNode for bm | test.py:103:46:103:47 | ControlFlowNode for bm |
| test.py:103:46:103:47 | ControlFlowNode for bm | test.py:98:46:98:47 | ControlFlowNode for bm |
| test.py:107:11:107:18 | ControlFlowNode for source() | test.py:108:46:108:58 | ControlFlowNode for Attribute |
| test.py:108:46:108:58 | ControlFlowNode for Attribute | test.py:102:46:102:47 | ControlFlowNode for bm |
nodes
| test.py:21:11:21:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:22:10:22:22 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| test.py:22:10:22:24 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:29:11:29:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:31:20:31:32 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| test.py:33:10:33:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:39:15:39:17 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:40:11:40:23 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| test.py:41:10:41:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:45:11:45:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:46:15:46:17 | ControlFlowNode for src | semmle.label | ControlFlowNode for src |
| test.py:52:24:52:26 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:53:11:53:23 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| test.py:54:10:54:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:57:33:57:35 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:58:24:58:26 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:61:33:61:35 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:62:33:62:35 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:65:33:65:35 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:66:33:66:35 | ControlFlowNode for arg | semmle.label | ControlFlowNode for arg |
| test.py:70:11:70:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:71:33:71:35 | ControlFlowNode for src | semmle.label | ControlFlowNode for src |
| test.py:77:23:77:24 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:79:10:79:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:83:11:83:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:84:23:84:35 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| test.py:89:37:89:38 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:91:10:91:12 | ControlFlowNode for val | semmle.label | ControlFlowNode for val |
| test.py:94:46:94:47 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:95:37:95:38 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:98:46:98:47 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:99:46:99:47 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:102:46:102:47 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:103:46:103:47 | ControlFlowNode for bm | semmle.label | ControlFlowNode for bm |
| test.py:107:11:107:18 | ControlFlowNode for source() | semmle.label | ControlFlowNode for source() |
| test.py:108:46:108:58 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
#select
| test.py:22:10:22:24 | ControlFlowNode for Attribute() | test.py:21:11:21:18 | ControlFlowNode for source() | test.py:22:10:22:24 | ControlFlowNode for Attribute() | test flow (proper): test_simple |
| test.py:33:10:33:12 | ControlFlowNode for val | test.py:29:11:29:18 | ControlFlowNode for source() | test.py:33:10:33:12 | ControlFlowNode for val | test flow (proper): test_alias |
| test.py:41:10:41:12 | ControlFlowNode for val | test.py:45:11:45:18 | ControlFlowNode for source() | test.py:41:10:41:12 | ControlFlowNode for val | test flow (proper): test_accross_functions |
| test.py:54:10:54:12 | ControlFlowNode for val | test.py:70:11:70:18 | ControlFlowNode for source() | test.py:54:10:54:12 | ControlFlowNode for val | test flow (proper): test_deeply_nested |
| test.py:79:10:79:12 | ControlFlowNode for val | test.py:83:11:83:18 | ControlFlowNode for source() | test.py:79:10:79:12 | ControlFlowNode for val | test flow (proper): test_pass_bound_method |
| test.py:91:10:91:12 | ControlFlowNode for val | test.py:107:11:107:18 | ControlFlowNode for source() | test.py:91:10:91:12 | ControlFlowNode for val | test flow (proper): test_deeply_nested_bound_method |

View File

@@ -0,0 +1,26 @@
/**
* @kind path-problem
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
import DataFlow::PathGraph
import SharedCode
class MyClassGetValueAdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// obj -> obj.get_value
nodeTo.asCfgNode().(AttrNode).getObject("get_value") = nodeFrom.asCfgNode() and
nodeTo = myClassGetValue(_)
or
// get_value -> get_value()
nodeFrom = myClassGetValue(_) and
nodeTo.asCfgNode().(CallNode).getFunction() = nodeFrom.asCfgNode()
}
}
from SharedConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"test flow (proper): " + source.getNode().asCfgNode().getScope().getName()

View File

@@ -0,0 +1,29 @@
This test illustrates that you need to be very careful when adding additional taint-steps or dataflow steps using `TypeTracker`.
The basic setup is that we're modeling the behavior of a (fictitious) external library class `MyClass`, and (fictitious) source of such an instance (the `source` function).
```py3
class MyClass:
def __init__(self, value):
self.value = value
def get_value(self):
return self.value
```
We want to extend our analysis to `obj.get_value()` is also tainted if `obj` is a tainted instance of `MyClass`.
The actual type-tracking is done in `SharedCode.qll`, but it's the _way_ we use it that matters.
In `NaiveModel.ql` we add an additional taint step from an instance of `MyClass` to calls of the bound method `get_value` (that we have tracked). It provides us with the correct results, but the path explanations are not very useful, since we are now able to cross functions in _one step_.
In `ProperModel.ql` we split the additional taint step in two:
1. from tracked `obj` that is instance of `MyClass`, to `obj.get_value` **but only** exactly where the attribute is accessed (by an `AttrNode`). This is important, since if we allowed `<any tracked qualifier>.get_value` we would again be able to cross functions in one step.
2. from tracked `get_value` bound method to calls of it, **but only** exactly where the call is (by a `CallNode`). for same reason as above.
**Try running the queries in VS Code to see the difference**
### Possible improvements
Using `AttrNode` directly in the code here means there is no easy way to add `getattr` support too all such predicates. Not really sure how to handle this in a generalized way though :|

View File

@@ -0,0 +1,36 @@
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
// Helpers modeling MyClass
/** A data-flow Node representing an instance of MyClass. */
abstract class MyClass extends DataFlow::Node { }
private DataFlow::Node myClassGetValue(MyClass qualifier, DataFlow::TypeTracker t) {
t.startInAttr("get_value") and
result = qualifier
or
exists(DataFlow::TypeTracker t2 | result = myClassGetValue(qualifier, t2).track(t2, t))
}
DataFlow::Node myClassGetValue(MyClass qualifier) {
result = myClassGetValue(qualifier, DataFlow::TypeTracker::end())
}
// Config
class SourceCall extends DataFlow::Node, MyClass {
SourceCall() { this.asCfgNode().(CallNode).getFunction().(NameNode).getId() = "source" }
}
class SharedConfig extends TaintTracking::Configuration {
SharedConfig() { this = "SharedConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof SourceCall }
override predicate isSink(DataFlow::Node sink) {
exists(CallNode call |
call.getFunction().(NameNode).getId() = "sink" and
call.getArg(0) = sink.asCfgNode()
)
}
}

View File

@@ -0,0 +1,108 @@
class MyClass:
def __init__(self, value):
self.value = value
def get_value(self):
return self.value
def source():
return MyClass("tainted")
def sink(obj):
print("sink", obj)
################################################################################
def test_simple():
src = source()
sink(src.get_value())
################################################################################
def test_alias():
src = source()
foo = src
bound_method = foo.get_value
val = bound_method()
sink(val)
################################################################################
def sink_func(arg):
val = arg.get_value()
sink(val)
def test_accross_functions():
src = source()
sink_func(src)
################################################################################
def deeply_nested_sink(arg):
val = arg.get_value()
sink(val)
def deeply_nested_passthrough_1(arg):
deeply_nested_sink(arg)
def deeply_nested_passthrough_2(arg):
deeply_nested_passthrough_1(arg)
def deeply_nested_passthrough_3(arg):
deeply_nested_passthrough_2(arg)
def test_deeply_nested():
src = source()
deeply_nested_passthrough_3(src)
################################################################################
def recv_bound_method(bm):
val = bm()
sink(val)
def test_pass_bound_method():
src = source()
recv_bound_method(src.get_value)
################################################################################
def deeply_nested_bound_method_sink(bm):
val = bm()
sink(val)
def deeply_nested_bound_method_passthrough_1(bm):
deeply_nested_bound_method_sink(bm)
def deeply_nested_bound_method_passthrough_2(bm):
deeply_nested_bound_method_passthrough_1(bm)
def deeply_nested_bound_method_passthrough_3(bm):
deeply_nested_bound_method_passthrough_2(bm)
def test_deeply_nested_bound_method():
src = source()
deeply_nested_bound_method_passthrough_3(src.get_value)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,37 @@
# taken from https://dev.mysql.com/doc/connector-python/en/connector-python-example-cursor-transaction.html
from __future__ import print_function
from datetime import date, datetime, timedelta
import mysql.connector
cnx = mysql.connector.connect(user='scott', database='employees')
cursor = cnx.cursor()
tomorrow = datetime.now().date() + timedelta(days=1)
add_employee = ("INSERT INTO employees "
"(first_name, last_name, hire_date, gender, birth_date) "
"VALUES (%s, %s, %s, %s, %s)")
add_salary = ("INSERT INTO salaries "
"(emp_no, salary, from_date, to_date) "
"VALUES (%(emp_no)s, %(salary)s, %(from_date)s, %(to_date)s)")
data_employee = ('Geert', 'Vanderkelen', tomorrow, 'M', date(1977, 6, 14))
# Insert new employee
cursor.execute(add_employee, data_employee) # $getSql=add_employee
emp_no = cursor.lastrowid
# Insert salary information
data_salary = {
'emp_no': emp_no,
'salary': 50000,
'from_date': tomorrow,
'to_date': date(9999, 1, 1),
}
cursor.execute(add_salary, data_salary) # $getSql=add_salary
# Make sure data is committed to the database
cnx.commit()
cursor.close()
cnx.close()

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,7 @@
# taken from https://mysqlclient.readthedocs.io/user_guide.html#some-examples
import MySQLdb
db=MySQLdb.connect(passwd="moonpie",db="thangs")
c=db.cursor()
max_price=5
c.execute("some sql", (max_price,)) # $getSql="some sql"

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,5 @@
import pymysql
connection = pymysql.connect(host="localhost", user="user", password="passwd")
cursor = connection.cursor()
cursor.execute("some sql", (42,)) # $ getSql="some sql"

View File

@@ -0,0 +1,2 @@
# exec statement is Python 2 specific
exec "print(42)" # $getCode="print(42)"

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,36 @@
########################################
import os
os.popen2("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen2(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
os.popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
# os.popen does not support keyword arguments, so this is a TypeError
os.popen(cmd="cmd1; cmd2")
########################################
import platform
platform.popen("cmd1; cmd2") # $getCommand="cmd1; cmd2"
platform.popen(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
########################################
# popen2 was deprecated in Python 2.6, but still available in Python 2.7
import popen2
popen2.popen2("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen3("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen4("cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen2(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen3(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"
popen2.Popen4(cmd="cmd1; cmd2") # $getCommand="cmd1; cmd2"

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=2

View File

@@ -0,0 +1,4 @@
import builtins
# exec being part of builtins is Python 3 only
builtins.exec("print(42)") # $getCode="print(42)"

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,6 @@
import base64
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep-py3/test_string.py
base64.a85decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Ascii85
base64.b85decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base85
base64.decodebytes(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64

View File

@@ -0,0 +1,6 @@
import base64
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep-py3/test_string.py
base64.a85encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Ascii85
base64.b85encode(bs)# $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base85
base64.encodebytes(bs)# $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,39 @@
# without this, `eval("print(42)")` becomes invalid syntax in Python 2, since print is a
# statement
from __future__ import print_function
import sys
if sys.version_info[0] == 3:
import builtins
if sys.version_info[0] == 2:
import __builtin__ as builtins
exec("print(42)") # $getCode="print(42)"
eval("print(42)") # $getCode="print(42)"
builtins.eval("print(42)") # $getCode="print(42)"
cmd = compile("print(42)", "<filename>", "exec")
exec(cmd) # $getCode=cmd
cmd = builtins.compile("print(42)", "<filename>", "exec")
exec(cmd) # $getCode=cmd
# ------------------------------------------------------------------------------
# taint related
def test_additional_taint():
src = TAINTED_STRING
cmd1 = compile(src, "<filename>", "exec")
cmd2 = compile(source=src, filename="<filename>", mode="exec")
cmd3 = builtins.compile(src, "<filename>", "exec")
ensure_tainted(
src,
cmd1,
cmd2,
cmd3,
)

View File

@@ -0,0 +1,11 @@
# without this, `eval("print(42)")` becomes invalid syntax in Python 2, since print is a
# statement
from __future__ import print_function
def eval(*args, **kwargs):
raise Exception("no eval")
# This function call might be marked as a code execution, but it actually isn't.
eval("print(42)")

View File

@@ -0,0 +1,13 @@
# without this, `eval("print(42)")` becomes invalid syntax in Python 2, since print is a
# statement
from __future__ import print_function
def foo(*args, **kwargs):
raise Exception("no eval")
eval = foo
# This function call might be marked as a code execution, but it actually isn't.
eval("print(42)")

View File

@@ -0,0 +1,19 @@
# without this, `eval("print(42)")` becomes invalid syntax in Python 2, since print is a
# statement
from __future__ import print_function
import sys
if sys.version_info[0] == 3:
import builtins
if sys.version_info[0] == 2:
import __builtin__ as builtins
def foo(*args, **kwargs):
raise Exception("no eval")
builtins.eval = foo
# This function call might be marked as a code execution, but it actually isn't.
eval("print(42)") # $ SPURIOUS: getCode="print(42)"

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

Some files were not shown because too many files have changed in this diff Show More