Merge branch 'main' into jmespath

This commit is contained in:
Rasmus Wriedt Larsen
2021-06-21 15:26:45 +02:00
828 changed files with 21750 additions and 2337 deletions

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,30 @@
import aioch
SQL = "SOME SQL"
async def aioch_test():
client = aioch.Client("localhost")
await client.execute(SQL) # $ getSql=SQL
await client.execute(query=SQL) # $ getSql=SQL
await client.execute_with_progress(SQL) # $ getSql=SQL
await client.execute_with_progress(query=SQL) # $ getSql=SQL
await client.execute_iter(SQL) # $ getSql=SQL
await client.execute_iter(query=SQL) # $ getSql=SQL
# Using custom client (this has been seen done for the blocking version in
# `clickhouse_driver` PyPI package)
class MyClient(aioch.Client):
pass
async def test_custom_client():
client = MyClient("localhost")
await client.execute(SQL) # $ getSql=SQL

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1,48 @@
"""
This file is a test of an extra data-flow step that we want to have for
aiohttp.web.Application
We don't really have an established way to test extra data-flow steps in external
libraries right now, so for now I've just used our normal taint-flow testing ¯\_(ツ)_/¯
see https://docs.aiohttp.org/en/stable/web_advanced.html#application-s-config
"""
from aiohttp import web
# to make code runable
TAINTED_STRING = "TAINTED_STRING"
def ensure_tainted(*args, **kwargs):
pass
ensure_tainted(
TAINTED_STRING # $ tainted
)
async def example(request: web.Request): # $ requestHandler
return web.Response(text=f'example {request.app["foo"]=}') # $ HttpResponse
async def also_works(request: web.Request): # $ requestHandler
return web.Response(text=f'also_works {request.config_dict["foo"]=}') # $ HttpResponse
async def taint_test(request: web.Request): # $ requestHandler
ensure_tainted(
request.app["ts"], # $ MISSING: tainted
request.config_dict["ts"], # $ MISSING: tainted
)
return web.Response(text="ok") # $ HttpResponse
app = web.Application()
app.router.add_get("", example) # $ routeSetup=""
app.router.add_get("/also-works", also_works) # $ routeSetup="/also-works"
app.router.add_get("/taint-test", taint_test) # $ routeSetup="/taint-test"
app["foo"] = 42
app["ts"] = TAINTED_STRING
if __name__ == "__main__":
web.run_app(app)

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,72 @@
from aiohttp import web
routes = web.RouteTableDef()
@routes.get("/raw_text") # $ routeSetup="/raw_text"
async def raw_text(request): # $ requestHandler
return web.Response(text="foo") # $ HttpResponse mimetype=text/plain responseBody="foo"
@routes.get("/raw_body") # $ routeSetup="/raw_body"
async def raw_body(request): # $ requestHandler
return web.Response(body=b"foo") # $ HttpResponse mimetype=application/octet-stream responseBody=b"foo"
@routes.get("/html_text") # $ routeSetup="/html_text"
async def html_text(request): # $ requestHandler
return web.Response(text="foo", content_type="text/html") # $ HttpResponse mimetype=text/html responseBody="foo"
@routes.get("/html_body") # $ routeSetup="/html_body"
async def html_body(request): # $ requestHandler
return web.Response(body=b"foo", content_type="text/html") # $ HttpResponse mimetype=text/html responseBody=b"foo"
@routes.get("/html_body_set_later") # $ routeSetup="/html_body_set_later"
async def html_body_set_later(request): # $ requestHandler
resp = web.Response(body=b"foo") # $ HttpResponse mimetype=application/octet-stream responseBody=b"foo"
resp.content_type = "text/html" # $ MISSING: mimetype=text/html
return resp
# Each HTTP status code has an exception
# see https://docs.aiohttp.org/en/stable/web_quickstart.html#exceptions
@routes.get("/through_200_exception") # $ routeSetup="/through_200_exception"
async def through_200_exception(request): # $ requestHandler
raise web.HTTPOk(text="foo") # $ HttpResponse mimetype=text/plain responseBody="foo"
@routes.get("/through_200_exception_html") # $ routeSetup="/through_200_exception_html"
async def through_200_exception(request): # $ requestHandler
exception = web.HTTPOk(text="foo") # $ HttpResponse mimetype=text/plain responseBody="foo"
exception.content_type = "text/html" # $ MISSING: mimetype=text/html
raise exception
@routes.get("/through_404_exception") # $ routeSetup="/through_404_exception"
async def through_404_exception(request): # $ requestHandler
raise web.HTTPNotFound(text="foo") # $ HttpResponse mimetype=text/plain responseBody="foo"
@routes.get("/redirect_301") # $ routeSetup="/redirect_301"
async def redirect_301(request): # $ requestHandler
if not "kwarg" in request.url.query:
raise web.HTTPMovedPermanently("/login") # $ HttpResponse HttpRedirectResponse mimetype=application/octet-stream redirectLocation="/login"
else:
raise web.HTTPMovedPermanently(location="/logout") # $ HttpResponse HttpRedirectResponse mimetype=application/octet-stream redirectLocation="/logout"
@routes.get("/redirect_302") # $ routeSetup="/redirect_302"
async def redirect_302(request): # $ requestHandler
if not "kwarg" in request.url.query:
raise web.HTTPFound("/login") # $ HttpResponse HttpRedirectResponse mimetype=application/octet-stream redirectLocation="/login"
else:
raise web.HTTPFound(location="/logout") # $ HttpResponse HttpRedirectResponse mimetype=application/octet-stream redirectLocation="/logout"
if __name__ == "__main__":
app = web.Application()
app.add_routes(routes)
web.run_app(app)

View File

@@ -0,0 +1,199 @@
# Inspired by https://docs.aiohttp.org/en/stable/web_quickstart.html
# and https://docs.aiohttp.org/en/stable/web_quickstart.html#resources-and-routes
from aiohttp import web
app = web.Application()
## ================================= ##
## Ways to specify routes / handlers ##
## ================================= ##
## Using coroutines
if True:
# `app.add_routes` with list
async def foo(request): # $ requestHandler
return web.Response(text="foo") # $ HttpResponse
async def foo2(request): # $ requestHandler
return web.Response(text="foo2") # $ HttpResponse
async def foo3(request): # $ requestHandler
return web.Response(text="foo3") # $ HttpResponse
app.add_routes([
web.get("/foo", foo), # $ routeSetup="/foo"
web.route("*", "/foo2", foo2), # $ routeSetup="/foo2"
web.get(path="/foo3", handler=foo3), # $ routeSetup="/foo3"
])
# using decorator
routes = web.RouteTableDef()
@routes.get("/bar") # $ routeSetup="/bar"
async def bar(request): # $ requestHandler
return web.Response(text="bar") # $ HttpResponse
@routes.route("*", "/bar2") # $ routeSetup="/bar2"
async def bar2(request): # $ requestHandler
return web.Response(text="bar2") # $ HttpResponse
@routes.get(path="/bar3") # $ routeSetup="/bar3"
async def bar3(request): # $ requestHandler
return web.Response(text="bar3") # $ HttpResponse
app.add_routes(routes)
# `app.router.add_get` / `app.router.add_route`
async def baz(request): # $ requestHandler
return web.Response(text="baz") # $ HttpResponse
app.router.add_get("/baz", baz) # $ routeSetup="/baz"
async def baz2(request): # $ requestHandler
return web.Response(text="baz2") # $ HttpResponse
app.router.add_route("*", "/baz2", baz2) # $ routeSetup="/baz2"
async def baz3(request): # $ requestHandler
return web.Response(text="baz3") # $ HttpResponse
app.router.add_get(path="/baz3", handler=baz3) # $ routeSetup="/baz3"
## Using classes / views
if True:
# see https://docs.aiohttp.org/en/stable/web_quickstart.html#organizing-handlers-in-classes
class MyCustomHandlerClass:
async def foo_handler(self, request): # $ MISSING: requestHandler
return web.Response(text="MyCustomHandlerClass.foo") # $ HttpResponse
my_custom_handler = MyCustomHandlerClass()
app.router.add_get("/MyCustomHandlerClass/foo", my_custom_handler.foo_handler) # $ routeSetup="/MyCustomHandlerClass/foo"
# Using `web.View`
# ---------------
# `app.add_routes` with list
class MyWebView1(web.View):
async def get(self): # $ requestHandler
return web.Response(text="MyWebView1.get") # $ HttpResponse
app.add_routes([
web.view("/MyWebView1", MyWebView1) # $ routeSetup="/MyWebView1"
])
# using decorator
routes = web.RouteTableDef()
@routes.view("/MyWebView2") # $ routeSetup="/MyWebView2"
class MyWebView2(web.View):
async def get(self): # $ requestHandler
return web.Response(text="MyWebView2.get") # $ HttpResponse
app.add_routes(routes)
# `app.router.add_view`
class MyWebView3(web.View):
async def get(self): # $ requestHandler
return web.Response(text="MyWebView3.get") # $ HttpResponse
app.router.add_view("/MyWebView3", MyWebView3) # $ routeSetup="/MyWebView3"
# no route-setup
class MyWebViewNoRoute(web.View):
async def get(self): # $ requestHandler
return web.Response(text="MyWebViewNoRoute.get") # $ HttpResponse
if len(__name__) < 0: # avoid running, but fool analysis to not consider dead code
# no explicit-view subclass (but route-setup)
class MyWebViewNoSubclassButRoute(somelib.someclass):
async def get(self): # $ requestHandler
return web.Response(text="MyWebViewNoSubclassButRoute.get") # $ HttpResponse
app.router.add_view("/MyWebViewNoSubclassButRoute", MyWebViewNoSubclassButRoute) # $ routeSetup="/MyWebViewNoSubclassButRoute"
# Apparently there is no enforcement that `add_view` is only for views, and vice-versa
# for `add_get` only being for async functions.
if True:
async def no_rules(request): # $ requestHandler
return web.Response(text="no_rules") # $ HttpResponse
app.router.add_view("/no_rules", no_rules) # $ routeSetup="/no_rules"
class NoRulesView(web.View):
async def get(self): # $ requestHandler
return web.Response(text="NoRulesView.get") # $ HttpResponse
app.router.add_get("/NoRulesView", NoRulesView) # $ routeSetup="/NoRulesView"
## =================== ##
## "Routed parameters" ##
## =================== ##
if True:
# see https://docs.aiohttp.org/en/stable/web_quickstart.html#variable-resources
async def matching(request: web.Request): # $ requestHandler
name = request.match_info['name']
number = request.match_info['number']
return web.Response(text="matching name={} number={}".format(name, number)) # $ HttpResponse
app.router.add_get(r"/matching/{name}/{number:\d+}", matching) # $ routeSetup="/matching/{name}/{number:\d+}"
## ======= ##
## subapps ##
## ======= ##
if True:
subapp = web.Application()
async def subapp_handler(request): # $ requestHandler
return web.Response(text="subapp_handler") # $ HttpResponse
subapp.router.add_get("/subapp_handler", subapp_handler) # $ routeSetup="/subapp_handler"
app.add_subapp("/my_subapp", subapp)
# similar behavior is possible with `app.add_domain`, but since I don't think we'll have special handling
# for any kind of subapps, I have not created a test for this.
## ================================ ##
## Constructing UrlDispatcher first ##
## ================================ ##
if True:
async def manual_dispatcher_instance(request): # $ requestHandler
return web.Response(text="manual_dispatcher_instance") # $ HttpResponse
url_dispatcher = web.UrlDispatcher()
url_dispatcher.add_get("/manual_dispatcher_instance", manual_dispatcher_instance) # $ routeSetup="/manual_dispatcher_instance"
subapp2 = web.Application(router=url_dispatcher)
app.add_subapp("/manual_dispatcher_instance_app", subapp2)
## =========== ##
## Run the app ##
## =========== ##
if __name__ == "__main__":
print("For auto-reloading server you can use:")
print(f"aiohttp-devtools runserver {__file__}")
print("after doing `pip install aiohttp-devtools`")
print()
web.run_app(app)

View File

@@ -0,0 +1,152 @@
from aiohttp import web
async def test_taint(request: web.Request): # $ requestHandler
ensure_tainted(
# see https://docs.aiohttp.org/en/stable/web_reference.html#request-and-base-request
request, # $ tainted
# yarl.URL (see `yarl` framework tests)
request.url, # $ tainted
request.url.human_repr(), # $ tainted
request.rel_url, # $ tainted
request.rel_url.human_repr(), # $ tainted
request.forwarded, # $ tainted
request.host, # $ tainted
request.remote, # $ tainted
request.path, # $ tainted
request.path_qs, # $ tainted
request.raw_path, # $ tainted
# dict-like for captured parts of the URL
request.match_info, # $ tainted
request.match_info["key"], # $ tainted
request.match_info.get("key"), # $ tainted
# multidict.MultiDictProxy[str] (see `multidict` framework tests)
request.query, # $ tainted
request.query.getone("key"), # $ tainted
# multidict.CIMultiDictProxy[str] (see `multidict` framework tests)
request.headers, # $ tainted
request.headers.getone("key"), # $ tainted
# dict-like (readonly)
request.cookies, # $ tainted
request.cookies["key"], # $ tainted
request.cookies.get("key"), # $ tainted
request.cookies.keys(), # $ MISSING: tainted
request.cookies.values(), # $ tainted
request.cookies.items(), # $ tainted
list(request.cookies), # $ tainted
iter(request.cookies), # $ tainted
# aiohttp.StreamReader
# see https://docs.aiohttp.org/en/stable/streams.html#aiohttp.StreamReader
request.content, # $ tainted
await request.content.read(), # $ tainted
await request.content.readany(), # $ tainted
await request.content.readexactly(42), # $ tainted
await request.content.readline(), # $ tainted
await request.content.readchunk(), # $ tainted
(await request.content.readchunk())[0], # $ tainted
[line async for line in request.content], # $ MISSING: tainted
[data async for data in request.content.iter_chunked(1024)], # $ MISSING: tainted
[data async for data in request.content.iter_any()], # $ MISSING: tainted
[data async for data, _ in request.content.iter_chunks()], # $ MISSING: tainted
request.content.read_nowait(), # $ tainted
# aiohttp.StreamReader
request._payload, # $ tainted
await request._payload.readany(), # $ tainted
request.content_type, # $ tainted
request.charset, # $ tainted
request.http_range, # $ tainted
# Optional[datetime]
request.if_modified_since, # $ tainted
request.if_unmodified_since, # $ tainted
request.if_range, # $ tainted
request.clone(scheme="https"), # $ tainted
# asyncio.Transport
# https://docs.python.org/3/library/asyncio-protocol.html#asyncio-transport
# example given in https://docs.aiohttp.org/en/stable/web_reference.html#aiohttp.web.BaseRequest.transport
# uses `peername` to get IP address of client
request.transport, # $ tainted
request.transport.get_extra_info("key"), # $ MISSING: tainted
# Like request.transport.get_extra_info
request.get_extra_info("key"), # $ tainted
# Like request.transport.get_extra_info
request.protocol.transport.get_extra_info("key"), # $ MISSING: tainted
# bytes
await request.read(), # $ tainted
# str
await request.text(), # $ tainted
# obj
await request.json(), # $ tainted
# aiohttp.multipart.MultipartReader
await request.multipart(), # $ tainted
# multidict.MultiDictProxy[str] (see `multidict` framework tests)
await request.post(), # $ tainted
(await request.post()).getone("key"), # $ tainted
)
import yarl
assert isinstance(request.url, yarl.URL)
assert isinstance(request.rel_url, yarl.URL)
# things that are technically controlled by sender of request,
# but doesn't seem that likely for exploitation.
ensure_not_tainted(
request.method,
request.version,
request.scheme,
request.secure,
request.keep_alive,
request.content_length,
request.body_exists,
request.has_body,
request.can_read_body,
)
ensure_not_tainted(
request.loop,
request.app,
request.config_dict,
)
class TaintTestClass(web.View):
def get(self): # $ requestHandler
ensure_tainted(
self.request, # $ tainted
self.request.url # $ tainted
)
app = web.Application()
app.router.add_get(r"/test_taint/{name}/{number:\d+}", test_taint) # $ routeSetup="/test_taint/{name}/{number:\d+}"
app.router.add_view(r"/test_taint_class", TaintTestClass) # $ routeSetup="/test_taint_class"
if __name__ == "__main__":
web.run_app(app)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,42 @@
import clickhouse_driver
SQL = "SOME SQL"
# Normal operation
client = clickhouse_driver.client.Client("localhost")
client.execute(SQL) # $ getSql=SQL
client.execute(query=SQL) # $ getSql=SQL
client.execute_with_progress(SQL) # $ getSql=SQL
client.execute_with_progress(query=SQL) # $ getSql=SQL
client.execute_iter(SQL) # $ getSql=SQL
client.execute_iter(query=SQL) # $ getSql=SQL
# commonly used alias
client = clickhouse_driver.Client("localhost")
client.execute(SQL) # $ getSql=SQL
# Using PEP249 interface
conn = clickhouse_driver.connect('clickhouse://localhost')
cursor = conn.cursor()
cursor.execute(SQL) # $ getSql=SQL
# Using custom client
#
# examples from real world code
# https://github.com/Altinity/clickhouse-mysql-data-reader/blob/3b1b7088751b05e5bbf45890c5949b58208c2343/clickhouse_mysql/dbclient/chclient.py#L10
# https://github.com/Felixoid/clickhouse-plantuml/blob/d8b2ba7d164a836770ec21f5e4035dfb04c41d9c/clickhouse_plantuml/client.py#L9
class MyClient(clickhouse_driver.Client):
pass
MyClient("localhost").execute(SQL) # $ getSql=SQL

View File

@@ -0,0 +1,5 @@
| ec_keygen_origin.py:8:1:8:45 | ControlFlowNode for Attribute() | 384 | ec_keygen_origin.py:8:31:8:42 | ControlFlowNode for Attribute |
| ec_keygen_origin.py:9:1:9:43 | ControlFlowNode for Attribute() | 384 | ec_keygen_origin.py:9:31:9:42 | ControlFlowNode for Attribute |
| ec_keygen_origin.py:12:1:12:36 | ControlFlowNode for Attribute() | 384 | ec_keygen_origin.py:11:9:11:20 | ControlFlowNode for Attribute |
| ec_keygen_origin.py:15:1:15:39 | ControlFlowNode for Attribute() | 384 | ec_keygen_origin.py:11:9:11:20 | ControlFlowNode for Attribute |
| ec_keygen_origin.py:20:1:20:32 | ControlFlowNode for Attribute() | 384 | ec_keygen_origin.py:6:58:6:66 | ControlFlowNode for ImportMember |

View File

@@ -0,0 +1,8 @@
import semmle.python.dataflow.new.DataFlow
import semmle.python.Concepts
from Cryptography::PublicKey::KeyGeneration keyGen, int keySize, DataFlow::Node origin
where
keyGen.getLocation().getFile().getShortName() = "ec_keygen_origin.py" and
keySize = keyGen.getKeySizeWithOrigin(origin)
select keyGen, keySize, origin

View File

@@ -0,0 +1,20 @@
# Since key-size is not specified explicitly as an integer for the predefined
# classes in the `cryptography.hazmat.primitives.asymmetric.ec` module, we need
# special handling of the origin... this test is simply to show off how we handle this.
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1
ec.generate_private_key(curve=ec.SECP384R1()) # $ PublicKeyGeneration keySize=384
ec.generate_private_key(curve=ec.SECP384R1) # $ PublicKeyGeneration keySize=384
alias = ec.SECP384R1
ec.generate_private_key(curve=alias) # $ PublicKeyGeneration keySize=384
instance = alias()
ec.generate_private_key(curve=instance) # $ PublicKeyGeneration keySize=384
x = SECP384R1
y = x
ec.generate_private_key(curve=y) # $ PublicKeyGeneration keySize=384

View File

@@ -6,6 +6,7 @@ from cryptography.exceptions import InvalidSignature
private_key = ec.generate_private_key(curve=ec.SECP384R1()) # $ PublicKeyGeneration keySize=384
private_key = ec.generate_private_key(curve=ec.SECP384R1) # $ PublicKeyGeneration keySize=384
public_key = private_key.public_key()
HASH_ALGORITHM = hashes.SHA256()

View File

@@ -1,3 +1,3 @@
import dill
dill.loads(payload) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=dill decodeMayExecuteInput
dill.loads(payload) # $decodeInput=payload decodeOutput=dill.loads(..) decodeFormat=dill decodeMayExecuteInput

View File

@@ -74,20 +74,20 @@ class CustomRedirectView(RedirectView):
# Ensure that simple subclasses are still vuln to XSS
def xss__not_found(request):
return HttpResponseNotFound(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=Attribute()
return HttpResponseNotFound(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
# Ensure we still have an XSS sink when manually setting the content_type to HTML
def xss__manual_response_type(request):
return HttpResponse(request.GET.get("name"), content_type="text/html; charset=utf-8") # $HttpResponse mimetype=text/html responseBody=Attribute()
return HttpResponse(request.GET.get("name"), content_type="text/html; charset=utf-8") # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
def xss__write(request):
response = HttpResponse() # $HttpResponse mimetype=text/html
response.write(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=Attribute()
response.write(request.GET.get("name")) # $HttpResponse mimetype=text/html responseBody=request.GET.get(..)
# This is safe but probably a bug if the argument to `write` is not a result of `json.dumps` or similar.
def safe__write_json(request):
response = JsonResponse() # $HttpResponse mimetype=application/json
response.write(request.GET.get("name")) # $HttpResponse mimetype=application/json responseBody=Attribute()
response.write(request.GET.get("name")) # $HttpResponse mimetype=application/json responseBody=request.GET.get(..)
# Ensure manual subclasses are vulnerable
class CustomResponse(HttpResponse):
@@ -95,7 +95,7 @@ class CustomResponse(HttpResponse):
super().__init__(content, *args, content_type="text/html", **kwargs)
def xss__custom_response(request):
return CustomResponse("ACME Responses", request.GET("name")) # $HttpResponse MISSING: mimetype=text/html responseBody=Attribute() SPURIOUS: responseBody="ACME Responses"
return CustomResponse("ACME Responses", request.GET("name")) # $HttpResponse MISSING: mimetype=text/html responseBody=request.GET.get(..) SPURIOUS: responseBody="ACME Responses"
class CustomJsonResponse(JsonResponse):
def __init__(self, banner, content, *args, **kwargs):

View File

@@ -13,7 +13,7 @@ https://docs.djangoproject.com/en/3.1/ref/settings/
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent #$ getAPathArgument=Path()
BASE_DIR = Path(__file__).resolve().parent.parent #$ getAPathArgument=Path(..)
# Quick-start development settings - unsuitable for production

View File

@@ -5,9 +5,9 @@ def test_idna():
tb = TAINTED_BYTES
ensure_tainted(
idna.encode(ts), # $ tainted encodeInput=ts encodeOutput=Attribute() encodeFormat=IDNA
idna.encode(s=ts), # $ tainted encodeInput=ts encodeOutput=Attribute() encodeFormat=IDNA
idna.encode(ts), # $ tainted encodeInput=ts encodeOutput=idna.encode(..) encodeFormat=IDNA
idna.encode(s=ts), # $ tainted encodeInput=ts encodeOutput=idna.encode(..) encodeFormat=IDNA
idna.decode(tb), # $ tainted decodeInput=tb decodeOutput=Attribute() decodeFormat=IDNA
idna.decode(s=tb), # $ tainted decodeInput=tb decodeOutput=Attribute() decodeFormat=IDNA
idna.decode(tb), # $ tainted decodeInput=tb decodeOutput=idna.decode(..) decodeFormat=IDNA
idna.decode(s=tb), # $ tainted decodeInput=tb decodeOutput=idna.decode(..) decodeFormat=IDNA
)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,41 @@
import multidict
# TODO: This is an invalid MultiDictProxy construction... but for the purpose of
# taint-test, this should be good enough.
mdp = multidict.MultiDictProxy(TAINTED_STRING)
ensure_tainted(
# see https://multidict.readthedocs.io/en/stable/multidict.html#multidict.MultiDictProxy
mdp, # $ tainted
mdp["key"], # $ tainted
mdp.get("key"), # $ tainted
mdp.getone("key"), # $ tainted
mdp.getall("key"), # $ tainted
mdp.keys(), # $ MISSING: tainted
mdp.values(), # $ tainted
mdp.items(), # $ tainted
mdp.copy(), # $ tainted
list(mdp), # $ tainted
iter(mdp), # $ tainted
)
# TODO: This is an invalid CIMultiDictProxy construction... but for the purpose of
# taint-test, this should be good enough.
ci_mdp = multidict.CIMultiDictProxy(TAINTED_STRING)
ensure_tainted(
# see https://multidict.readthedocs.io/en/stable/multidict.html#multidict.CIMultiDictProxy
ci_mdp, # $ tainted
ci_mdp["key"], # $ tainted
ci_mdp.get("key"), # $ tainted
ci_mdp.getone("key"), # $ tainted
ci_mdp.getall("key"), # $ tainted
ci_mdp.keys(), # $ MISSING: tainted
ci_mdp.values(), # $ tainted
ci_mdp.items(), # $ tainted
ci_mdp.copy(), # $ tainted
list(ci_mdp), # $ tainted
iter(ci_mdp), # $ tainted
)

View File

@@ -5,14 +5,14 @@ def test():
ts = TAINTED_STRING
tainted_obj = {"foo": ts}
encoded = simplejson.dumps(tainted_obj) # $ encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
encoded = simplejson.dumps(tainted_obj) # $ encodeOutput=simplejson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
ensure_tainted(
encoded, # $ tainted
simplejson.dumps(tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
simplejson.dumps(obj=tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
simplejson.loads(encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
simplejson.loads(s=encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
simplejson.dumps(tainted_obj), # $ tainted encodeOutput=simplejson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
simplejson.dumps(obj=tainted_obj), # $ tainted encodeOutput=simplejson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
simplejson.loads(encoded), # $ tainted decodeOutput=simplejson.loads(..) decodeFormat=JSON decodeInput=encoded
simplejson.loads(s=encoded), # $ tainted decodeOutput=simplejson.loads(..) decodeFormat=JSON decodeInput=encoded
)
# load/dump with file-like
@@ -22,7 +22,7 @@ def test():
tainted_filelike.seek(0)
ensure_tainted(
tainted_filelike, # $ MISSING: tainted
simplejson.load(tainted_filelike), # $ decodeOutput=Attribute() decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
simplejson.load(tainted_filelike), # $ decodeOutput=simplejson.load(..) decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
)
# load/dump with file-like using keyword-args
@@ -32,7 +32,7 @@ def test():
tainted_filelike.seek(0)
ensure_tainted(
tainted_filelike, # $ MISSING: tainted
simplejson.load(fp=tainted_filelike), # $ decodeOutput=Attribute() decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
simplejson.load(fp=tainted_filelike), # $ decodeOutput=simplejson.load(..) decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
)
# To make things runable

View File

@@ -1,6 +1,6 @@
import base64
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep-py3/test_string.py
base64.a85decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Ascii85
base64.b85decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base85
base64.decodebytes(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64
base64.a85decode(payload) # $ decodeInput=payload decodeOutput=base64.a85decode(..) decodeFormat=Ascii85
base64.b85decode(payload) # $ decodeInput=payload decodeOutput=base64.b85decode(..) decodeFormat=Base85
base64.decodebytes(payload) # $ decodeInput=payload decodeOutput=base64.decodebytes(..) decodeFormat=Base64

View File

@@ -1,6 +1,6 @@
import base64
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep-py3/test_string.py
base64.a85encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Ascii85
base64.b85encode(bs)# $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base85
base64.encodebytes(bs)# $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64
base64.a85encode(bs) # $ encodeInput=bs encodeOutput=base64.a85encode(..) encodeFormat=Ascii85
base64.b85encode(bs)# $ encodeInput=bs encodeOutput=base64.b85encode(..) encodeFormat=Base85
base64.encodebytes(bs)# $ encodeInput=bs encodeOutput=base64.encodebytes(..) encodeFormat=Base64

View File

@@ -2,14 +2,14 @@ import pickle
import marshal
import base64
pickle.loads(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=pickle decodeMayExecuteInput
marshal.loads(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=marshal decodeMayExecuteInput
pickle.loads(payload) # $ decodeInput=payload decodeOutput=pickle.loads(..) decodeFormat=pickle decodeMayExecuteInput
marshal.loads(payload) # $ decodeInput=payload decodeOutput=marshal.loads(..) decodeFormat=marshal decodeMayExecuteInput
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep/test_string.py
base64.b64decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64
base64.standard_b64decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64
base64.urlsafe_b64decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64
base64.b32decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base32
base64.b16decode(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base16
base64.b64decode(payload) # $ decodeInput=payload decodeOutput=base64.b64decode(..) decodeFormat=Base64
base64.standard_b64decode(payload) # $ decodeInput=payload decodeOutput=base64.standard_b64decode(..) decodeFormat=Base64
base64.urlsafe_b64decode(payload) # $ decodeInput=payload decodeOutput=base64.urlsafe_b64decode(..) decodeFormat=Base64
base64.b32decode(payload) # $ decodeInput=payload decodeOutput=base64.b32decode(..) decodeFormat=Base32
base64.b16decode(payload) # $ decodeInput=payload decodeOutput=base64.b16decode(..) decodeFormat=Base16
# deprecated since Python 3.1, but still works
base64.decodestring(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=Base64
base64.decodestring(payload) # $ decodeInput=payload decodeOutput=base64.decodestring(..) decodeFormat=Base64

View File

@@ -2,14 +2,14 @@ import pickle
import marshal
import base64
pickle.dumps(obj) # $ MISSING: f-:encodeInput=obj f-:encodeOutput=Attribute() f-:encodeFormat=pickle f-:encodeMayExecuteInput
marshal.dumps(obj) # $ MISSING: f-:encodeInput=obj f-:encodeOutput=Attribute() f-:encodeFormat=marshal f-:encodeMayExecuteInput
pickle.dumps(obj) # $ MISSING: encodeInput=obj encodeOutput=pickle.dumps(..) encodeFormat=pickle encodeMayExecuteInput
marshal.dumps(obj) # $ MISSING: encodeInput=obj encodeOutput=marshal.dumps(..) encodeFormat=marshal encodeMayExecuteInput
# TODO: These tests should be merged with python/ql/test/experimental/dataflow/tainttracking/defaultAdditionalTaintStep/test_string.py
base64.b64encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64
base64.standard_b64encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64
base64.urlsafe_b64encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64
base64.b32encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base32
base64.b16encode(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base16
base64.b64encode(bs) # $ encodeInput=bs encodeOutput=base64.b64encode(..) encodeFormat=Base64
base64.standard_b64encode(bs) # $ encodeInput=bs encodeOutput=base64.standard_b64encode(..) encodeFormat=Base64
base64.urlsafe_b64encode(bs) # $ encodeInput=bs encodeOutput=base64.urlsafe_b64encode(..) encodeFormat=Base64
base64.b32encode(bs) # $ encodeInput=bs encodeOutput=base64.b32encode(..) encodeFormat=Base32
base64.b16encode(bs) # $ encodeInput=bs encodeOutput=base64.b16encode(..) encodeFormat=Base16
# deprecated since Python 3.1, but still works
base64.encodestring(bs) # $ encodeInput=bs encodeOutput=Attribute() encodeFormat=Base64
base64.encodestring(bs) # $ encodeInput=bs encodeOutput=base64.encodestring(..) encodeFormat=Base64

View File

@@ -0,0 +1,40 @@
from io import StringIO
import json
def test():
print("\n# test")
ts = TAINTED_STRING
encoded = json.dumps(ts) # $ encodeOutput=json.dumps(..) encodeFormat=JSON encodeInput=ts
ensure_tainted(
encoded, # $ tainted
json.dumps(ts), # $ tainted encodeOutput=json.dumps(..) encodeFormat=JSON encodeInput=ts
json.dumps(obj=ts), # $ tainted encodeOutput=json.dumps(..) encodeFormat=JSON encodeInput=ts
json.loads(encoded), # $ tainted decodeOutput=json.loads(..) decodeFormat=JSON decodeInput=encoded
json.loads(s=encoded), # $ tainted decodeOutput=json.loads(..) decodeFormat=JSON decodeInput=encoded
)
# load/dump with file-like
tainted_filelike = StringIO()
json.dump(ts, tainted_filelike) # $ encodeOutput=[post]tainted_filelike encodeFormat=JSON encodeInput=ts
tainted_filelike.seek(0)
ensure_tainted(
tainted_filelike, # $ tainted
json.load(tainted_filelike), # $ tainted decodeOutput=json.load(..) decodeFormat=JSON decodeInput=tainted_filelike
)
# load/dump with file-like using keyword-args
tainted_filelike = StringIO()
json.dump(obj=ts, fp=tainted_filelike) # $ encodeOutput=[post]tainted_filelike encodeFormat=JSON encodeInput=ts
tainted_filelike.seek(0)
ensure_tainted(
tainted_filelike, # $ tainted
json.load(fp=tainted_filelike), # $ tainted decodeOutput=json.load(..) decodeFormat=JSON decodeInput=tainted_filelike
)
# Make tests runable
test()

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1,75 @@
from twisted.web.server import Site, Request, NOT_DONE_YET
from twisted.web.resource import Resource
from twisted.internet import reactor, endpoints, defer
root = Resource()
class Now(Resource):
def render(self, request: Request): # $ requestHandler
return b"now" # $ HttpResponse mimetype=text/html responseBody=b"now"
class AlsoNow(Resource):
def render(self, request: Request): # $ requestHandler
request.write(b"also now") # $ HttpResponse mimetype=text/html responseBody=b"also now"
return b"" # $ HttpResponse mimetype=text/html responseBody=b""
def process_later(request: Request):
print("process_later called")
request.write(b"later") # $ MISSING: responseBody=b"later"
request.finish()
class Later(Resource):
def render(self, request: Request): # $ requestHandler
# process the request in 1 second
print("setting up callback for process_later")
reactor.callLater(1, process_later, request)
return NOT_DONE_YET # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=NOT_DONE_YET
class PlainText(Resource):
def render(self, request: Request): # $ requestHandler
request.setHeader(b"content-type", "text/plain")
return b"this is plain text" # $ HttpResponse responseBody=b"this is plain text" SPURIOUS: mimetype=text/html MISSING: mimetype=text/plain
class Redirect(Resource):
def render_GET(self, request: Request): # $ requestHandler
request.redirect("/new-location") # $ HttpRedirectResponse redirectLocation="/new-location" HttpResponse mimetype=text/html
# By default, this `hello` output is not returned... not even when
# requested with curl.
return b"hello" # $ SPURIOUS: HttpResponse mimetype=text/html responseBody=b"hello"
class NonHttpBodyOutput(Resource):
"""Examples of providing values in response that is not in the body
"""
def render_GET(self, request: Request): # $ requestHandler
request.responseHeaders.addRawHeader("key", "value")
request.setHeader("key2", "value")
request.addCookie("key", "value")
request.cookies.append(b"key2=value")
return b"" # $ HttpResponse mimetype=text/html responseBody=b""
root.putChild(b"now", Now())
root.putChild(b"also-now", AlsoNow())
root.putChild(b"later", Later())
root.putChild(b"plain-text", PlainText())
root.putChild(b"redirect", Redirect())
root.putChild(b"non-body", NonHttpBodyOutput())
if __name__ == "__main__":
factory = Site(root)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8880)
endpoint.listen(factory)
print("Will run on http://localhost:8880")
reactor.run()

View File

@@ -0,0 +1,47 @@
from twisted.web.server import Site, Request
from twisted.web.resource import Resource
from twisted.internet import reactor, endpoints
root = Resource()
class Foo(Resource):
def render(self, request: Request): # $ requestHandler
print(f"{request.content=}")
print(f"{request.cookies=}")
print(f"{request.received_cookies=}")
return b"I am Foo" # $ HttpResponse
root.putChild(b"foo", Foo())
class Child(Resource):
def __init__(self, name):
self.name = name.decode("utf-8")
def render_GET(self, request): # $ requestHandler
return f"Hi, I'm child '{self.name}'".encode("utf-8") # $ HttpResponse
class Parent(Resource):
def getChild(self, path, request): # $ requestHandler
print(path, type(path))
return Child(path)
def render_GET(self, request): # $ requestHandler
return b"Hi, I'm parent" # $ HttpResponse
root.putChild(b"parent", Parent())
if __name__ == "__main__":
factory = Site(root)
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8880)
endpoint.listen(factory)
print("Will run on http://localhost:8880")
reactor.run()

View File

@@ -0,0 +1,70 @@
from twisted.web.resource import Resource
from twisted.web.server import Request
class MyTaintTest(Resource):
def getChild(self, path, request): # $ requestHandler
ensure_tainted(path, request) # $ tainted
def render(self, request): # $ requestHandler
ensure_tainted(request) # $ tainted
def render_GET(self, request: Request): # $ requestHandler
# see https://twistedmatrix.com/documents/21.2.0/api/twisted.web.server.Request.html
ensure_tainted(
request, # $ tainted
request.uri, # $ tainted
request.path, # $ tainted
request.prepath, # $ tainted
request.postpath, # $ tainted
# file-like
request.content, # $ tainted
request.content.read(), # $ MISSING: tainted
# Dict[bytes, List[bytes]] (for query args)
request.args, # $ tainted
request.args[b"key"], # $ tainted
request.args[b"key"][0], # $ tainted
request.args.get(b"key"), # $ tainted
request.args.get(b"key")[0], # $ tainted
request.received_cookies, # $ tainted
request.received_cookies["key"], # $ tainted
request.received_cookies.get("key"), # $ tainted
request.getCookie(b"key"), # $ tainted
# twisted.web.http_headers.Headers
# see https://twistedmatrix.com/documents/21.2.0/api/twisted.web.http_headers.Headers.html
request.requestHeaders, # $ tainted
request.requestHeaders.getRawHeaders("key"), # $ MISSING: tainted
request.requestHeaders.getRawHeaders("key")[0], # $ MISSING: tainted
request.requestHeaders.getAllRawHeaders(), # $ MISSING: tainted
list(request.requestHeaders.getAllRawHeaders()), # $ MISSING: tainted
request.getHeader("key"), # $ tainted
request.getAllHeaders(), # $ tainted
request.getAllHeaders()["key"], # $ tainted
request.user, # $ tainted
request.getUser(), # $ tainted
request.password, # $ tainted
request.getPassword(), # $ tainted
request.host, # $ tainted
request.getHost(), # $ tainted
request.getRequestHostname(), # $ tainted
)
# technically user-controlled, but unlikely to lead to vulnerabilities.
ensure_not_tainted(
request.method,
)
# not tainted at all
ensure_not_tainted(
# outgoing things
request.cookies,
request.responseHeaders,
)

View File

@@ -5,19 +5,19 @@ def test():
ts = TAINTED_STRING
tainted_obj = {"foo": ts}
encoded = ujson.dumps(tainted_obj) # $ encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
encoded = ujson.dumps(tainted_obj) # $ encodeOutput=ujson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
ensure_tainted(
encoded, # $ tainted
ujson.dumps(tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
ujson.dumps(obj=tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
ujson.loads(encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
ujson.loads(obj=encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
ujson.dumps(tainted_obj), # $ tainted encodeOutput=ujson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
ujson.dumps(obj=tainted_obj), # $ tainted encodeOutput=ujson.dumps(..) encodeFormat=JSON encodeInput=tainted_obj
ujson.loads(encoded), # $ tainted decodeOutput=ujson.loads(..) decodeFormat=JSON decodeInput=encoded
ujson.loads(obj=encoded), # $ tainted decodeOutput=ujson.loads(..) decodeFormat=JSON decodeInput=encoded
ujson.encode(tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
ujson.encode(obj=tainted_obj), # $ tainted encodeOutput=Attribute() encodeFormat=JSON encodeInput=tainted_obj
ujson.decode(encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
ujson.decode(obj=encoded), # $ tainted decodeOutput=Attribute() decodeFormat=JSON decodeInput=encoded
ujson.encode(tainted_obj), # $ tainted encodeOutput=ujson.encode(..) encodeFormat=JSON encodeInput=tainted_obj
ujson.encode(obj=tainted_obj), # $ tainted encodeOutput=ujson.encode(..) encodeFormat=JSON encodeInput=tainted_obj
ujson.decode(encoded), # $ tainted decodeOutput=ujson.decode(..) decodeFormat=JSON decodeInput=encoded
ujson.decode(obj=encoded), # $ tainted decodeOutput=ujson.decode(..) decodeFormat=JSON decodeInput=encoded
)
# load/dump with file-like
@@ -27,7 +27,7 @@ def test():
tainted_filelike.seek(0)
ensure_tainted(
tainted_filelike, # $ MISSING: tainted
ujson.load(tainted_filelike), # $ decodeOutput=Attribute() decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
ujson.load(tainted_filelike), # $ decodeOutput=ujson.load(..) decodeFormat=JSON decodeInput=tainted_filelike MISSING: tainted
)
# load/dump with file-like using keyword-args does not work in `ujson`

View File

@@ -1,37 +1,37 @@
import yaml
# Unsafe:
yaml.load(payload) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.Loader) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.full_load(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.Loader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load(payload) # $ decodeInput=payload decodeOutput=yaml.unsafe_load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.full_load(payload) # $ decodeInput=payload decodeOutput=yaml.full_load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
yaml.load(payload, yaml.SafeLoader) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.load(payload, Loader=yaml.SafeLoader) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.load(payload, yaml.BaseLoader) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.safe_load(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.load(payload, yaml.SafeLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, Loader=yaml.SafeLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.BaseLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.safe_load(payload) # $ decodeInput=payload decodeOutput=yaml.safe_load(..) decodeFormat=YAML
################################################################################
# load_all variants
################################################################################
# Unsafe:
yaml.load_all(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load_all(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.full_load_all(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.load_all(payload) # $ decodeInput=payload decodeOutput=yaml.load_all(..) decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load_all(payload) # $ decodeInput=payload decodeOutput=yaml.unsafe_load_all(..) decodeFormat=YAML decodeMayExecuteInput
yaml.full_load_all(payload) # $ decodeInput=payload decodeOutput=yaml.full_load_all(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
yaml.safe_load_all(payload) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.safe_load_all(payload) # $ decodeInput=payload decodeOutput=yaml.safe_load_all(..) decodeFormat=YAML
################################################################################
# C-based loaders with `libyaml`
################################################################################
# Unsafe:
yaml.load(payload, yaml.CLoader) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.CFullLoader) # $ decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.CLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.CFullLoader) # $ decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
yaml.load(payload, yaml.CSafeLoader) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.load(payload, yaml.CBaseLoader) # $decodeInput=payload decodeOutput=Attribute() decodeFormat=YAML
yaml.load(payload, yaml.CSafeLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML
yaml.load(payload, yaml.CBaseLoader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,64 @@
import yarl
url = yarl.URL(TAINTED_STRING)
ensure_tainted(
url, # $ tainted
# see https://yarl.readthedocs.io/en/stable/api.html#yarl.URL
url.user, # $ tainted
url.raw_user, # $ tainted
url.password, # $ tainted
url.raw_password, # $ tainted
url.host, # $ tainted
url.raw_host, # $ tainted
url.port, # $ tainted
url.explicit_port, # $ tainted
url.authority, # $ tainted
url.raw_authority, # $ tainted
url.path, # $ tainted
url.raw_path, # $ tainted
url.path_qs, # $ tainted
url.raw_path_qs, # $ tainted
url.query_string, # $ tainted
url.raw_query_string, # $ tainted
url.fragment, # $ tainted
url.raw_fragment, # $ tainted
url.parts, # $ tainted
url.raw_parts, # $ tainted
url.name, # $ tainted
url.raw_name, # $ tainted
# multidict.MultiDictProxy[str]
url.query, # $ tainted
url.query.getone("key"), # $ tainted
url.with_scheme("foo"), # $ tainted
url.with_user("foo"), # $ tainted
url.with_password("foo"), # $ tainted
url.with_host("foo"), # $ tainted
url.with_port("foo"), # $ tainted
url.with_path("foo"), # $ tainted
url.with_query({"foo": 42}), # $ tainted
url.with_query(foo=42), # $ tainted
url.update_query({"foo": 42}), # $ tainted
url.update_query(foo=42), # $ tainted
url.with_fragment("foo"), # $ tainted
url.with_name("foo"), # $ tainted
url.join(yarl.URL("wat.html")), # $ tainted
url.human_repr(), # $ tainted
)