Merge branch 'main' into htmlReg

This commit is contained in:
Erik Krogh Kristensen
2021-11-04 12:58:42 +01:00
450 changed files with 15060 additions and 5288 deletions

View File

@@ -3,13 +3,25 @@ import pkg # $ use=moduleImport("pkg")
async def foo():
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
coro # $ use=moduleImport("pkg").getMember("async_func").getReturn()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def bar():
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_with():
async with pkg.async_func() as result: # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_for():
async for _ in pkg.async_func(): # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in coro: # $ use=moduleImport("pkg").getMember("async_func").getReturn() MISSING: awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
def check_annotations():
# Just to make sure how annotations should look like :)

View File

@@ -0,0 +1,26 @@
import python
import semmle.python.dataflow.new.DataFlow
import TestUtilities.InlineExpectationsTest
import semmle.python.ApiGraphs
class AwaitedTest extends InlineExpectationsTest {
AwaitedTest() { this = "AwaitedTest" }
override string getARelevantTag() { result = "awaited" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(API::Node awaited, DataFlow::Node use, API::Node pred |
awaited = pred.getAwaited() and
use = awaited.getAUse() and
location = use.getLocation() and
// Module variable nodes have no suitable location, so it's best to simply exclude them entirely
// from the inline tests.
not use instanceof DataFlow::ModuleVariableNode and
exists(location.getFile().getRelativePath())
|
tag = "awaited" and
value = pred.getPath() and
element = use.toString()
)
}
}

View File

@@ -7,9 +7,16 @@ class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
override predicate isSource(DataFlow::Node source) {
// Standard sources
source.(DataFlow::CfgNode).getNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
// User defined sources
exists(CallNode call |
call.getFunction().(NameNode).getId() = "taint" and
source.(DataFlow::CfgNode).getNode() = call.getAnArg()
)
}
override predicate isSink(DataFlow::Node sink) {

View File

@@ -0,0 +1,57 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
async def tainted_coro():
return TAINTED_STRING
async def test_await():
coro = tainted_coro()
taint(coro)
s = await coro
ensure_tainted(coro, s) # $ tainted
class AsyncContext:
async def __aenter__(self):
return TAINTED_STRING
async def __aexit__(self, exc_type, exc, tb):
pass
async def test_async_with():
ctx = AsyncContext()
taint(ctx)
async with ctx as tainted:
ensure_tainted(tainted) # $ MISSING: tainted
class AsyncIter:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
async def test_async_for():
iter = AsyncIter()
taint(iter)
async for tainted in iter:
ensure_tainted(tainted) # $ MISSING: tainted
# Make tests runable
import asyncio
asyncio.run(test_await())
asyncio.run(test_async_with())
asyncio.run(test_async_for())

View File

@@ -5,6 +5,11 @@ TAINTED_DICT = {"name": TAINTED_STRING, "some key": "foo"}
NOT_TAINTED = "NOT_TAINTED"
# Use this to force expressions to be tainted
def taint(*args):
pass
def ensure_tainted(*args):
print("- ensure_tainted")
for i, arg in enumerate(args):

View File

@@ -128,6 +128,24 @@ class CodeExecutionTest extends InlineExpectationsTest {
}
}
class SqlConstructionTest extends InlineExpectationsTest {
SqlConstructionTest() { this = "SqlConstructionTest" }
override string getARelevantTag() { result = "constructedSql" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(SqlConstruction e, DataFlow::Node sql |
exists(location.getFile().getRelativePath()) and
sql = e.getSql() and
location = e.getLocation() and
element = sql.toString() and
value = prettyNodeForInlineTest(sql) and
tag = "constructedSql"
)
}
}
class SqlExecutionTest extends InlineExpectationsTest {
SqlExecutionTest() { this = "SqlExecutionTest" }

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,105 @@
import asyncio
import asyncpg
async def test_connection():
conn = await asyncpg.connect()
try:
# The file-like object is passed in as a keyword-only argument.
# See https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.connection.Connection.copy_from_query
await conn.copy_from_query("sql", output="filepath") # $ getSql="sql" getAPathArgument="filepath"
await conn.copy_from_query("sql", "arg1", "arg2", output="filepath") # $ getSql="sql" getAPathArgument="filepath"
await conn.copy_from_table("table", output="filepath") # $ getAPathArgument="filepath"
await conn.copy_to_table("table", source="filepath") # $ getAPathArgument="filepath"
await conn.execute("sql") # $ getSql="sql"
await conn.executemany("sql") # $ getSql="sql"
await conn.fetch("sql") # $ getSql="sql"
await conn.fetchrow("sql") # $ getSql="sql"
await conn.fetchval("sql") # $ getSql="sql"
finally:
await conn.close()
async def test_prepared_statement():
conn = await asyncpg.connect()
try:
pstmt = await conn.prepare("psql") # $ constructedSql="psql"
pstmt.executemany() # $ getSql="psql"
pstmt.fetch() # $ getSql="psql"
pstmt.fetchrow() # $ getSql="psql"
pstmt.fetchval() # $ getSql="psql"
finally:
await conn.close()
# The sql statement is executed when the `CursorFactory` (obtained by e.g. `conn.cursor()`) is awaited.
# See https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.cursor.CursorFactory
async def test_cursor():
conn = await asyncpg.connect()
try:
async with conn.transaction():
cursor = await conn.cursor("sql") # $ getSql="sql" constructedSql="sql"
await cursor.fetch()
pstmt = await conn.prepare("psql") # $ constructedSql="psql"
pcursor = await pstmt.cursor() # $ getSql="psql"
await pcursor.fetch()
async for record in conn.cursor("sql"): # $ getSql="sql" constructedSql="sql"
pass
async for record in pstmt.cursor(): # $ getSql="psql"
pass
cursor_factory = conn.cursor("sql") # $ constructedSql="sql"
cursor = await cursor_factory # $ getSql="sql"
pcursor_factory = pstmt.cursor()
pcursor = await pcursor_factory # $ getSql="psql"
finally:
await conn.close()
async def test_connection_pool():
pool = await asyncpg.create_pool()
try:
await pool.copy_from_query("sql", output="filepath") # $ getSql="sql" getAPathArgument="filepath"
await pool.copy_from_query("sql", "arg1", "arg2", output="filepath") # $ getSql="sql" getAPathArgument="filepath"
await pool.copy_from_table("table", output="filepath") # $ getAPathArgument="filepath"
await pool.copy_to_table("table", source="filepath") # $ getAPathArgument="filepath"
await pool.execute("sql") # $ getSql="sql"
await pool.executemany("sql") # $ getSql="sql"
await pool.fetch("sql") # $ getSql="sql"
await pool.fetchrow("sql") # $ getSql="sql"
await pool.fetchval("sql") # $ getSql="sql"
async with pool.acquire() as conn:
await conn.execute("sql") # $ getSql="sql"
conn = await pool.acquire()
try:
await conn.fetch("sql") # $ getSql="sql"
finally:
await pool.release(conn)
finally:
await pool.close()
async with asyncpg.create_pool() as pool:
await pool.execute("sql") # $ getSql="sql"
async with pool.acquire() as conn:
await conn.execute("sql") # $ getSql="sql"
conn = await pool.acquire()
try:
await conn.fetch("sql") # $ getSql="sql"
finally:
await pool.release(conn)

View File

@@ -0,0 +1,12 @@
import python
import experimental.meta.ConceptsTest
class DedicatedResponseTest extends HttpServerHttpResponseTest {
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
}
class OtherResponseTest extends HttpServerHttpResponseTest {
OtherResponseTest() { not this instanceof DedicatedResponseTest }
override string getARelevantTag() { result = "HttpResponse" }
}

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1,66 @@
# Taking inspiration from https://realpython.com/fastapi-python-web-apis/
# run with
# uvicorn basic:app --reload
# Then visit http://127.0.0.1:8000/docs and http://127.0.0.1:8000/redoc
from fastapi import FastAPI
app = FastAPI()
@app.get("/") # $ routeSetup="/"
async def root(): # $ requestHandler
return {"message": "Hello World"} # $ HttpResponse
@app.get("/non-async") # $ routeSetup="/non-async"
def non_async(): # $ requestHandler
return {"message": "non-async"} # $ HttpResponse
@app.get(path="/kw-arg") # $ routeSetup="/kw-arg"
def kw_arg(): # $ requestHandler
return {"message": "kw arg"} # $ HttpResponse
@app.get("/foo/{foo_id}") # $ routeSetup="/foo/{foo_id}"
async def get_foo(foo_id: int): # $ requestHandler routedParameter=foo_id
# FastAPI does data validation (with `pydantic` PyPI package) under the hood based
# on the type annotation we did for `foo_id`, so it will auto-reject anything that's
# not an int.
return {"foo_id": foo_id} # $ HttpResponse
# this will work as query param, so `/bar?bar_id=123`
@app.get("/bar") # $ routeSetup="/bar"
async def get_bar(bar_id: int = 42): # $ requestHandler routedParameter=bar_id
return {"bar_id": bar_id} # $ HttpResponse
# The big deal is that FastAPI works so well together with pydantic, so you can do stuff like this
from typing import Optional
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
is_offer: Optional[bool] = None
@app.post("/items/") # $ routeSetup="/items/"
async def create_item(item: Item): # $ requestHandler routedParameter=item
# Note: calling `item` a routed parameter is slightly untrue, since it doesn't come
# from the URL itself, but from the body of the POST request
return item # $ HttpResponse
# this also works fine
@app.post("/2items") # $ routeSetup="/2items"
async def create_item2(item1: Item, item2: Item): # $ requestHandler routedParameter=item1 routedParameter=item2
return (item1, item2) # $ HttpResponse
@app.api_route("/baz/{baz_id}", methods=["GET"]) # $ routeSetup="/baz/{baz_id}"
async def get_baz(baz_id: int): # $ requestHandler routedParameter=baz_id
return {"baz_id2": baz_id} # $ HttpResponse
# Docs:
# see https://fastapi.tiangolo.com/tutorial/path-params/
# Things we should look at supporting:
# - https://fastapi.tiangolo.com/tutorial/dependencies/
# - https://fastapi.tiangolo.com/tutorial/background-tasks/
# - https://fastapi.tiangolo.com/tutorial/middleware/
# - https://fastapi.tiangolo.com/tutorial/encoder/

View File

@@ -0,0 +1,145 @@
# see https://fastapi.tiangolo.com/advanced/response-cookies/
from fastapi import FastAPI, Response
import fastapi.responses
import asyncio
app = FastAPI()
@app.get("/response_parameter") # $ routeSetup="/response_parameter"
async def response_parameter(response: Response): # $ requestHandler
response.set_cookie("key", "value") # $ CookieWrite CookieName="key" CookieValue="value"
response.set_cookie(key="key", value="value") # $ CookieWrite CookieName="key" CookieValue="value"
response.headers.append("Set-Cookie", "key2=value2") # $ CookieWrite CookieRawHeader="key2=value2"
response.headers.append(key="Set-Cookie", value="key2=value2") # $ CookieWrite CookieRawHeader="key2=value2"
response.headers["X-MyHeader"] = "header-value"
response.status_code = 418
return {"message": "response as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
@app.get("/resp_parameter") # $ routeSetup="/resp_parameter"
async def resp_parameter(resp: Response): # $ requestHandler
resp.status_code = 418
return {"message": "resp as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
@app.get("/response_parameter_no_type") # $ routeSetup="/response_parameter_no_type"
async def response_parameter_no_type(response): # $ requestHandler routedParameter=response
# NOTE: This does in fact not work, since FastAPI relies on the type annotations,
# and not on the name of the parameter
response.status_code = 418
return {"message": "response as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
class MyXmlResponse(fastapi.responses.Response):
media_type = "application/xml"
@app.get("/response_parameter_custom_type", response_class=MyXmlResponse) # $ routeSetup="/response_parameter_custom_type"
async def response_parameter_custom_type(response: MyXmlResponse): # $ requestHandler
# NOTE: This is a contrived example of using a wrong annotation for the response
# parameter. It will be passed a `fastapi.responses.Response` value when handling an
# incoming request, so NOT a `MyXmlResponse` value. Cookies/Headers are still
# propagated to the final response though.
print(type(response))
assert type(response) == fastapi.responses.Response
response.set_cookie("key", "value") # $ CookieWrite CookieName="key" CookieValue="value"
response.headers["Custom-Response-Type"] = "yes, but only after function has run"
xml_data = "<foo>FOO</foo>"
return xml_data # $ HttpResponse responseBody=xml_data mimetype=application/xml
# Direct response construction
# see https://fastapi.tiangolo.com/advanced/response-directly/
# see https://fastapi.tiangolo.com/advanced/custom-response/
@app.get("/direct_response") # $ routeSetup="/direct_response"
async def direct_response(): # $ requestHandler
xml_data = "<foo>FOO</foo>"
resp = fastapi.responses.Response(xml_data, 200, None, "application/xml") # $ HttpResponse mimetype=application/xml responseBody=xml_data
resp = fastapi.responses.Response(content=xml_data, media_type="application/xml") # $ HttpResponse mimetype=application/xml responseBody=xml_data
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
@app.get("/direct_response2", response_class=fastapi.responses.Response) # $ routeSetup="/direct_response2"
async def direct_response2(): # $ requestHandler
xml_data = "<foo>FOO</foo>"
return xml_data # $ HttpResponse responseBody=xml_data
@app.get("/my_xml_response") # $ routeSetup="/my_xml_response"
async def my_xml_response(): # $ requestHandler
xml_data = "<foo>FOO</foo>"
resp = MyXmlResponse(content=xml_data) # $ HttpResponse mimetype=application/xml responseBody=xml_data
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
@app.get("/my_xml_response2", response_class=MyXmlResponse) # $ routeSetup="/my_xml_response2"
async def my_xml_response2(): # $ requestHandler
xml_data = "<foo>FOO</foo>"
return xml_data # $ HttpResponse responseBody=xml_data mimetype=application/xml
@app.get("/html_response") # $ routeSetup="/html_response"
async def html_response(): # $ requestHandler
hello_world = "<h1>Hello World!</h1>"
resp = fastapi.responses.HTMLResponse(hello_world) # $ HttpResponse mimetype=text/html responseBody=hello_world
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
@app.get("/html_response2", response_class=fastapi.responses.HTMLResponse) # $ routeSetup="/html_response2"
async def html_response2(): # $ requestHandler
hello_world = "<h1>Hello World!</h1>"
return hello_world # $ HttpResponse responseBody=hello_world mimetype=text/html
@app.get("/redirect") # $ routeSetup="/redirect"
async def redirect(): # $ requestHandler
next = "https://www.example.com"
resp = fastapi.responses.RedirectResponse(next) # $ HttpResponse HttpRedirectResponse redirectLocation=next
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
@app.get("/redirect2", response_class=fastapi.responses.RedirectResponse) # $ routeSetup="/redirect2"
async def redirect2(): # $ requestHandler
next = "https://www.example.com"
return next # $ HttpResponse HttpRedirectResponse redirectLocation=next
@app.get("/streaming_response") # $ routeSetup="/streaming_response"
async def streaming_response(): # $ requestHandler
# You can test this with curl:
# curl --no-buffer http://127.0.0.1:8000/streaming_response
async def content():
yield b"Hello "
await asyncio.sleep(0.5)
yield b"World"
await asyncio.sleep(0.5)
yield b"!"
resp = fastapi.responses.StreamingResponse(content()) # $ HttpResponse responseBody=content()
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
# setting `response_class` to `StreamingResponse` does not seem to work
# so no such example here
@app.get("/file_response") # $ routeSetup="/file_response"
async def file_response(): # $ requestHandler
# has internal dependency on PyPI package `aiofiles`
# will guess MIME type from file extension
# We don't really have any good QL modeling of passing a file-path, whose content
# will be returned as part of the response... so will leave this as a TODO for now.
resp = fastapi.responses.FileResponse(__file__) # $ HttpResponse
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
@app.get("/file_response2", response_class=fastapi.responses.FileResponse) # $ routeSetup="/file_response2"
async def file_response2(): # $ requestHandler
return __file__ # $ HttpResponse

View File

@@ -0,0 +1,33 @@
# like blueprints in Flask
# see https://fastapi.tiangolo.com/tutorial/bigger-applications/
from fastapi import APIRouter, FastAPI
inner_router = APIRouter()
@inner_router.get("/foo") # $ routeSetup="/foo"
async def root(): # $ requestHandler
return {"msg": "inner_router /foo"} # $ HttpResponse
outer_router = APIRouter()
outer_router.include_router(inner_router, prefix="/inner")
items_router = APIRouter(
prefix="/items",
tags=["items"],
)
@items_router.get("/") # $ routeSetup="/"
async def items(): # $ requestHandler
return {"msg": "items_router /"} # $ HttpResponse
app = FastAPI()
app.include_router(outer_router, prefix="/outer")
app.include_router(items_router)
# see basic.py for instructions for how to run this code.

View File

@@ -0,0 +1,189 @@
# --- to make things runable ---
ensure_tainted = ensure_not_tainted = print
# --- real code ---
from fastapi import FastAPI
from typing import Optional, List
from pydantic import BaseModel
app = FastAPI()
class Foo(BaseModel):
foo: str
class MyComplexModel(BaseModel):
field: str
main_foo: Foo
other_foos: List[Foo]
nested_foos: List[List[Foo]]
@app.post("/test_taint/{name}/{number}") # $ routeSetup="/test_taint/{name}/{number}"
async def test_taint(name : str, number : int, also_input: MyComplexModel): # $ requestHandler routedParameter=name routedParameter=number routedParameter=also_input
ensure_tainted(
name, # $ tainted
number, # $ tainted
also_input, # $ tainted
also_input.field, # $ tainted
also_input.main_foo, # $ tainted
also_input.main_foo.foo, # $ tainted
also_input.other_foos, # $ tainted
also_input.other_foos[0], # $ tainted
also_input.other_foos[0].foo, # $ tainted
[f.foo for f in also_input.other_foos], # $ MISSING: tainted
also_input.nested_foos, # $ tainted
also_input.nested_foos[0], # $ tainted
also_input.nested_foos[0][0], # $ tainted
also_input.nested_foos[0][0].foo, # $ tainted
)
other_foos = also_input.other_foos
ensure_tainted(
other_foos, # $ tainted
other_foos[0], # $ tainted
other_foos[0].foo, # $ tainted
[f.foo for f in other_foos], # $ MISSING: tainted
)
return "ok" # $ HttpResponse
# --- body ---
# see https://fastapi.tiangolo.com/tutorial/body-multiple-params/
from fastapi import Body
# request is made such as `/will-be-query-param?name=foo`
@app.post("/will-be-query-param") # $ routeSetup="/will-be-query-param"
async def will_be_query_param(name: str): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# with the `= Body(...)` "annotation" FastAPI will know to transmit `name` as part of
# the HTTP post body
@app.post("/will-not-be-query-param") # $ routeSetup="/will-not-be-query-param"
async def will_not_be_query_param(name: str = Body("foo", media_type="text/plain")): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# --- form data ---
# see https://fastapi.tiangolo.com/tutorial/request-forms/
from fastapi import Form
@app.post("/form-example") # $ routeSetup="/form-example"
async def form_example(username: str = Form(None)): # $ requestHandler routedParameter=username
ensure_tainted(username) # $ tainted
return "ok" # $ HttpResponse
# --- HTTP headers ---
# see https://fastapi.tiangolo.com/tutorial/header-params/
from fastapi import Header
@app.get("/header-example") # $ routeSetup="/header-example"
async def header_example(user_agent: Optional[str] = Header(None)): # $ requestHandler routedParameter=user_agent
ensure_tainted(user_agent) # $ tainted
return "ok" # $ HttpResponse
# --- file upload ---
# see https://fastapi.tiangolo.com/tutorial/request-files/
# see https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile
from fastapi import File, UploadFile
@app.post("/file-upload") # $ routeSetup="/file-upload"
async def file_upload(f1: bytes = File(None), f2: UploadFile = File(None)): # $ requestHandler routedParameter=f1 routedParameter=f2
ensure_tainted(
f1, # $ tainted
f2, # $ tainted
f2.filename, # $ MISSING: tainted
f2.content_type, # $ MISSING: tainted
f2.file, # $ MISSING: tainted
f2.file.read(), # $ MISSING: tainted
f2.file.readline(), # $ MISSING: tainted
f2.file.readlines(), # $ MISSING: tainted
await f2.read(), # $ MISSING: tainted
)
return "ok" # $ HttpResponse
# --- WebSocket ---
import starlette.websockets
from fastapi import WebSocket
assert WebSocket == starlette.websockets.WebSocket
@app.websocket("/ws") # $ routeSetup="/ws"
async def websocket_test(websocket: WebSocket): # $ requestHandler routedParameter=websocket
await websocket.accept()
ensure_tainted(
websocket, # $ tainted
websocket.url, # $ tainted
websocket.url.netloc, # $ tainted
websocket.url.path, # $ tainted
websocket.url.query, # $ tainted
websocket.url.fragment, # $ tainted
websocket.url.username, # $ tainted
websocket.url.password, # $ tainted
websocket.url.hostname, # $ tainted
websocket.url.port, # $ tainted
websocket.url.components, # $ tainted
websocket.url.components.netloc, # $ tainted
websocket.url.components.path, # $ tainted
websocket.url.components.query, # $ tainted
websocket.url.components.fragment, # $ tainted
websocket.url.components.username, # $ tainted
websocket.url.components.password, # $ tainted
websocket.url.components.hostname, # $ tainted
websocket.url.components.port, # $ tainted
websocket.headers, # $ tainted
websocket.headers["key"], # $ tainted
websocket.query_params, # $ tainted
websocket.query_params["key"], # $ tainted
websocket.cookies, # $ tainted
websocket.cookies["key"], # $ tainted
await websocket.receive(), # $ tainted
await websocket.receive_bytes(), # $ tainted
await websocket.receive_text(), # $ tainted
await websocket.receive_json(), # $ tainted
)
# scheme seems very unlikely to give interesting results, but very likely to give FPs.
ensure_not_tainted(
websocket.url.scheme,
websocket.url.components.scheme,
)
async for data in websocket.iter_bytes():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_text():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_json():
ensure_tainted(data) # $ tainted

View File

@@ -0,0 +1,7 @@
from flask import send_from_directory, send_file
send_from_directory("dir", "file") # $ getAPathArgument="dir" getAPathArgument="file"
send_from_directory(directory="dir", filename="file") # $ getAPathArgument="dir" getAPathArgument="file"
send_file("file") # $ getAPathArgument="file"
send_file(filename_or_fp="file") # $ getAPathArgument="file"

View File

@@ -105,8 +105,8 @@ def bp1_example(foo): # $ requestHandler routedParameter=foo
app.register_blueprint(bp1) # by default, URLs of blueprints are not prefixed
bp2 = flask.Blueprint("bp2", __name__)
import flask.blueprints
bp2 = flask.blueprints.Blueprint("bp2", __name__)
@bp2.route("/example") # $ routeSetup="/example"
def bp2_example(): # $ requestHandler

View File

@@ -12,7 +12,7 @@ db = SQLAlchemy(app)
# - https://github.com/pallets/flask-sqlalchemy/blob/931ec00d1e27f51508e05706eef41cc4419a0b32/src/flask_sqlalchemy/__init__.py#L765
# - https://github.com/pallets/flask-sqlalchemy/blob/931ec00d1e27f51508e05706eef41cc4419a0b32/src/flask_sqlalchemy/__init__.py#L99-L109
assert str(type(db.text("Foo"))) == "<class 'sqlalchemy.sql.elements.TextClause'>"
assert str(type(db.text("Foo"))) == "<class 'sqlalchemy.sql.elements.TextClause'>" # $ constructedSql="Foo"
# also has engine/session instantiated
@@ -44,8 +44,8 @@ assert result.fetchall() == [("Foo",)]
# text
t = db.text("foo")
t = db.text("foo") # $ constructedSql="foo"
assert isinstance(t, sqlalchemy.sql.expression.TextClause)
t = db.text(text="foo")
t = db.text(text="foo") # $ constructedSql="foo"
assert isinstance(t, sqlalchemy.sql.expression.TextClause)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,33 @@
import ruamel.yaml
# Unsafe:
ruamel.yaml.load(payload) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
ruamel.yaml.load(stream=payload) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
ruamel.yaml.load(payload, ruamel.yaml.Loader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
ruamel.yaml.load(payload, ruamel.yaml.SafeLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML
ruamel.yaml.load(payload, Loader=ruamel.yaml.SafeLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML
ruamel.yaml.load(payload, ruamel.yaml.BaseLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML
ruamel.yaml.safe_load(payload) # $ decodeInput=payload decodeOutput=ruamel.yaml.safe_load(..) decodeFormat=YAML
################################################################################
# load_all variants
################################################################################
# Unsafe:
ruamel.yaml.load_all(payload) # $ decodeInput=payload decodeOutput=ruamel.yaml.load_all(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
ruamel.yaml.safe_load_all(payload) # $ decodeInput=payload decodeOutput=ruamel.yaml.safe_load_all(..) decodeFormat=YAML
################################################################################
# C-based loaders with `libyaml`
################################################################################
# Unsafe:
ruamel.yaml.load(payload, ruamel.yaml.CLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
# Safe:
ruamel.yaml.load(payload, ruamel.yaml.CSafeLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML
ruamel.yaml.load(payload, ruamel.yaml.CBaseLoader) # $ decodeInput=payload decodeOutput=ruamel.yaml.load(..) decodeFormat=YAML

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
# this file doesn't have a .py extension so the extractor doesn't pick it up, so it
# doesn't have to be annotated
# This file is just a Proof of Concept for how code execution can be triggered.
import os
import ruamel.yaml
class Exploit(object):
def __reduce__(self):
return (os.system, ('ls',))
data = Exploit()
serialized_data = ruamel.yaml.dump(data)
# All these will execute `ls`
print("!!! ruamel.yaml.load")
ruamel.yaml.load(serialized_data)
print("!!! ruamel.yaml.load kwarg")
ruamel.yaml.load(stream=serialized_data)
print("!!! ruamel.yaml.load with Loader=ruamel.yaml.Loader")
ruamel.yaml.load(serialized_data, ruamel.yaml.Loader)
print("!!! ruamel.yaml.load with Loader=ruamel.yaml.UnsafeLoader")
ruamel.yaml.load(serialized_data, ruamel.yaml.UnsafeLoader)
print("!!! ruamel.yaml.load with Loader=ruamel.yaml.CLoader")
ruamel.yaml.load(serialized_data, ruamel.yaml.CLoader)
# you need to iterate through the result for it to execute... but it still works
print("!!! ruamel.yaml.load_all")
for _ in ruamel.yaml.load_all(serialized_data):
pass
# check that the safe version is actually safe
print("\n" + "-"*80)
print("safe versions")
print("-" * 80)
print("!!! ruamel.yaml.safe_load")
try:
ruamel.yaml.safe_load(serialized_data)
raise Exception("should not happen")
except ruamel.yaml.constructor.ConstructorError:
pass
print("!!! ruamel.yaml.load with Loader=ruamel.yaml.SafeLoader")
try:
ruamel.yaml.load(serialized_data, ruamel.yaml.SafeLoader)
raise Exception("should not happen")
except ruamel.yaml.constructor.ConstructorError:
pass
print("!!! ruamel.yaml.load with Loader=ruamel.yaml.CSafeLoader")
try:
ruamel.yaml.load(serialized_data, ruamel.yaml.CSafeLoader)
raise Exception("should not happen")
except ruamel.yaml.constructor.ConstructorError:
pass

View File

@@ -46,7 +46,7 @@ with engine.begin() as connection:
connection.execute("some sql") # $ getSql="some sql"
# Injection requiring the text() taint-step
t = text("some sql")
t = text("some sql") # $ constructedSql="some sql"
session.query(User).filter(t)
session.query(User).group_by(User.id).having(t)
session.query(User).group_by(t).first()

View File

@@ -6,7 +6,7 @@ import sqlalchemy.orm
# either v1.4 or v2.0, such that we cover both.
raw_sql = "select 'FOO'"
text_sql = sqlalchemy.text(raw_sql)
text_sql = sqlalchemy.text(raw_sql) # $ constructedSql=raw_sql
Base = sqlalchemy.orm.declarative_base()
@@ -169,7 +169,7 @@ assert session.query(For14).all()[0].id == 14
# and now we can do the actual querying
text_foo = sqlalchemy.text("'FOO'")
text_foo = sqlalchemy.text("'FOO'") # $ constructedSql="'FOO'"
# filter_by is only vulnerable to injection if sqlalchemy.text is used, which is evident
# from the logs produced if this file is run
@@ -298,7 +298,7 @@ with engine.connect() as conn:
assert scalar_result == "FOO"
# This is a contrived example
select = sqlalchemy.select(sqlalchemy.text("'BAR'"))
select = sqlalchemy.select(sqlalchemy.text("'BAR'")) # $ constructedSql="'BAR'"
result = conn.execute(select) # $ getSql=select
assert result.fetchall() == [("BAR",)]

View File

@@ -8,14 +8,14 @@ def test_taint():
ensure_tainted(ts) # $ tainted
t1 = sqlalchemy.text(ts)
t2 = sqlalchemy.text(text=ts)
t3 = sqlalchemy.sql.text(ts)
t4 = sqlalchemy.sql.text(text=ts)
t5 = sqlalchemy.sql.expression.text(ts)
t6 = sqlalchemy.sql.expression.text(text=ts)
t7 = sqlalchemy.sql.expression.TextClause(ts)
t8 = sqlalchemy.sql.expression.TextClause(text=ts)
t1 = sqlalchemy.text(ts) # $ constructedSql=ts
t2 = sqlalchemy.text(text=ts) # $ constructedSql=ts
t3 = sqlalchemy.sql.text(ts) # $ constructedSql=ts
t4 = sqlalchemy.sql.text(text=ts) # $ constructedSql=ts
t5 = sqlalchemy.sql.expression.text(ts) # $ constructedSql=ts
t6 = sqlalchemy.sql.expression.text(text=ts) # $ constructedSql=ts
t7 = sqlalchemy.sql.expression.TextClause(ts) # $ constructedSql=ts
t8 = sqlalchemy.sql.expression.TextClause(text=ts) # $ constructedSql=ts
# Since we flag user-input to a TextClause with its' own query, we don't want to
# have a taint-step for it as that would lead to us also giving an alert for normal

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,38 @@
import toml
from io import StringIO
encoded = 'title = "example"\n'
decoded = {"title" : "example"}
# LOADING
assert decoded == toml.loads(encoded) # $ decodeInput=encoded decodeFormat=TOML decodeOutput=toml.loads(..)
assert decoded == toml.loads(s=encoded) # $ decodeInput=encoded decodeFormat=TOML decodeOutput=toml.loads(..)
# this is not the official way to do things, but it works
assert decoded == toml.decoder.loads(encoded) # $ decodeInput=encoded decodeFormat=TOML decodeOutput=toml.decoder.loads(..)
f_encoded = StringIO(encoded)
assert decoded == toml.load(f_encoded) # $ decodeInput=f_encoded decodeFormat=TOML decodeOutput=toml.load(..)
f_encoded = StringIO(encoded)
assert decoded == toml.load(f=f_encoded) # $ decodeInput=f_encoded decodeFormat=TOML decodeOutput=toml.load(..)
f_encoded = StringIO(encoded)
assert decoded == toml.decoder.load(f_encoded) # $ decodeInput=f_encoded decodeFormat=TOML decodeOutput=toml.decoder.load(..)
# DUMPING
assert encoded == toml.dumps(decoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=toml.dumps(..)
assert encoded == toml.dumps(o=decoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=toml.dumps(..)
assert encoded == toml.encoder.dumps(decoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=toml.encoder.dumps(..)
f_encoded = StringIO()
toml.dump(decoded, f_encoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=f_encoded
assert encoded == f_encoded.getvalue()
f_encoded = StringIO()
toml.dump(o=decoded, f=f_encoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=f_encoded
assert encoded == f_encoded.getvalue()
f_encoded = StringIO()
toml.encoder.dump(decoded, f_encoded) # $ encodeInput=decoded encodeFormat=TOML encodeOutput=f_encoded
assert encoded == f_encoded.getvalue()

View File

@@ -2,6 +2,7 @@ import yaml
# Unsafe:
yaml.load(payload) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(stream=payload) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.load(payload, yaml.Loader) # $decodeInput=payload decodeOutput=yaml.load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.unsafe_load(payload) # $ decodeInput=payload decodeOutput=yaml.unsafe_load(..) decodeFormat=YAML decodeMayExecuteInput
yaml.full_load(payload) # $ decodeInput=payload decodeOutput=yaml.full_load(..) decodeFormat=YAML decodeMayExecuteInput

View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
# this file doesn't have a .py extension so the extractor doesn't pick it up, so it
# doesn't have to be annotated
# This file is just a Proof of Concept for how code execution can be triggered.
import os
import yaml
class Exploit(object):
def __reduce__(self):
return (os.system, ('ls',))
data = Exploit()
serialized_data = yaml.dump(data)
# All these will execute `ls`
print("!!! yaml.unsafe_load")
yaml.unsafe_load(serialized_data)
print("!!! yaml.unsafe_load kwarg")
yaml.unsafe_load(stream=serialized_data)
print("!!! yaml.load with Loader=yaml.UnsafeLoader")
yaml.load(serialized_data, yaml.UnsafeLoader)
# you need to iterate through the result for it to execute... but it still works
print("!!! yaml.unsafe_load_all")
for _ in yaml.unsafe_load_all(serialized_data):
pass
# check that the safe version is actually safe
print("\n" + "-"*80)
print("safe versions")
print("-" * 80)
print("!!! yaml.load")
try:
yaml.load(serialized_data)
raise Exception("should not happen")
except yaml.constructor.ConstructorError:
pass
print("!!! yaml.safe_load")
try:
yaml.safe_load(serialized_data)
raise Exception("should not happen")
except yaml.constructor.ConstructorError:
pass
print("!!! yaml.load with Loader=yaml.SafeLoader")
try:
yaml.load(serialized_data, yaml.SafeLoader)
raise Exception("should not happen")
except yaml.constructor.ConstructorError:
pass

View File

@@ -1,4 +1,6 @@
edges
| flask_path_injection.py:19:15:19:21 | ControlFlowNode for request | flask_path_injection.py:19:15:19:26 | ControlFlowNode for Attribute |
| flask_path_injection.py:19:15:19:26 | ControlFlowNode for Attribute | flask_path_injection.py:21:32:21:38 | ControlFlowNode for dirname |
| path_injection.py:12:16:12:22 | ControlFlowNode for request | path_injection.py:12:16:12:27 | ControlFlowNode for Attribute |
| path_injection.py:12:16:12:27 | ControlFlowNode for Attribute | path_injection.py:13:14:13:47 | ControlFlowNode for Attribute() |
| path_injection.py:19:16:19:22 | ControlFlowNode for request | path_injection.py:19:16:19:27 | ControlFlowNode for Attribute |
@@ -68,6 +70,9 @@ edges
| test_chaining.py:41:9:41:16 | ControlFlowNode for source() | test_chaining.py:42:9:42:19 | ControlFlowNode for normpath() |
| test_chaining.py:44:13:44:23 | ControlFlowNode for normpath() | test_chaining.py:45:14:45:14 | ControlFlowNode for z |
nodes
| flask_path_injection.py:19:15:19:21 | ControlFlowNode for request | semmle.label | ControlFlowNode for request |
| flask_path_injection.py:19:15:19:26 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| flask_path_injection.py:21:32:21:38 | ControlFlowNode for dirname | semmle.label | ControlFlowNode for dirname |
| path_injection.py:12:16:12:22 | ControlFlowNode for request | semmle.label | ControlFlowNode for request |
| path_injection.py:12:16:12:27 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| path_injection.py:13:14:13:47 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
@@ -153,6 +158,7 @@ nodes
| test_chaining.py:44:13:44:23 | ControlFlowNode for normpath() | semmle.label | ControlFlowNode for normpath() |
| test_chaining.py:45:14:45:14 | ControlFlowNode for z | semmle.label | ControlFlowNode for z |
#select
| flask_path_injection.py:21:32:21:38 | ControlFlowNode for dirname | flask_path_injection.py:19:15:19:21 | ControlFlowNode for request | flask_path_injection.py:21:32:21:38 | ControlFlowNode for dirname | This path depends on $@. | flask_path_injection.py:19:15:19:21 | ControlFlowNode for request | a user-provided value |
| path_injection.py:13:14:13:47 | ControlFlowNode for Attribute() | path_injection.py:12:16:12:22 | ControlFlowNode for request | path_injection.py:13:14:13:47 | ControlFlowNode for Attribute() | This path depends on $@. | path_injection.py:12:16:12:22 | ControlFlowNode for request | a user-provided value |
| path_injection.py:21:14:21:18 | ControlFlowNode for npath | path_injection.py:19:16:19:22 | ControlFlowNode for request | path_injection.py:21:14:21:18 | ControlFlowNode for npath | This path depends on $@. | path_injection.py:19:16:19:22 | ControlFlowNode for request | a user-provided value |
| path_injection.py:31:14:31:18 | ControlFlowNode for npath | path_injection.py:27:16:27:22 | ControlFlowNode for request | path_injection.py:31:14:31:18 | ControlFlowNode for npath | This path depends on $@. | path_injection.py:27:16:27:22 | ControlFlowNode for request | a user-provided value |

View File

@@ -0,0 +1,21 @@
from flask import Flask, request, send_from_directory
app = Flask(__name__)
STATIC_DIR = "/server/static/"
# see https://flask.palletsprojects.com/en/1.1.x/api/#flask.send_from_directory
@app.route("/provide-filename")
def download_file():
filename = request.args.get('filename', '')
# ok since `send_from_directory` ensure this stays within `STATIC_DIR`
return send_from_directory(STATIC_DIR, filename) # OK
# see https://flask.palletsprojects.com/en/1.1.x/api/#flask.send_from_directory
@app.route("/also-provide-dirname")
def download_file():
dirname = request.args.get('dirname', '')
filename = request.args.get('filename', '')
return send_from_directory(dirname, filename) # NOT OK