Files
codeql/python/ql/test/library-tests/frameworks/fastapi/taint_test.py
yoff bf8ff487dc Python: switch dataflow library to new (shared) CFG + SSA
Flips the Python dataflow trunk from the legacy CFG (semmle/python/Flow.qll)
and legacy ESSA SSA (semmle/python/essa/*) to the new shared CFG facade
(semmle.python.controlflow.internal.Cfg) and the new SSA adapter
(semmle.python.dataflow.new.internal.SsaImpl), both introduced
additively in the preceding PRs in this stack.

This is the trunk-flip equivalent of the original draft PR #21894 (kept
around as documentation), rebased on top of the four preparatory PRs:

  P1: Remove AstNode.getAFlowNode() and rewrite callers (#21919).
  P2: Qualify Flow.qll's AST references with Py:: prefix (#21920).
  P3: Add new shared-CFG-backed control flow graph (#21921).
  P4: Add new shared-SSA-backed SSA adapter (#21923).

The Python dataflow library (semmle/python/dataflow/new/) now imports
the new CFG facade and SSA adapter. All CFG-typed predicates
(ControlFlowNode, CallNode, BasicBlock, NameNode, AttrNode, ...) are
qualified with the Cfg:: prefix; SSA references switch from
EssaVariable/EssaDefinition to SsaImpl::Definition/SourceVariable.

GuardNode is redesigned to use the new CFG's outcome-node model
(isAfterTrue / isAfterFalse) instead of the legacy ConditionBlock +
flipped indirection. Only BarrierGuard<...> is preserved as public
API.

Framework files (Bottle, FastApi, Django, Tornado, Pyramid, Stdlib,
...) are updated to take CFG nodes from the new facade.

A handful of dataflow consistency tweaks for the new CFG:
- Augmented-assignment targets are treated as both load and store.
- 'from X import *' produces uncertain SSA writes for unknown names.
- CFG nodes are canonicalised so dataflow does not see equivalent
  pre/post-order pairs as distinct nodes.

Two AST tweaks for the new CFG:
- AstNodeImpl: omit PEP 695 type-parameter names from
  FunctionDefExpr / ClassDefExpr children.
- ImportResolution: drop the legacy essa import.

Test churn (~175 files): reblessed library- and query-test .expected
files reflect slightly different CFG granularity, different toString
output, and a handful of true alert deltas in security queries.

Verification: all 367 lib + src + consistency-queries compile clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-29 19:37:37 +00:00

225 lines
6.9 KiB
Python

# --- to make things runable ---
ensure_tainted = ensure_not_tainted = print
# --- real code ---
from fastapi import FastAPI
from typing import Optional, List
from pydantic import BaseModel
app = FastAPI()
class Foo(BaseModel):
foo: str
class MyComplexModel(BaseModel):
field: str
main_foo: Foo
other_foos: List[Foo]
nested_foos: List[List[Foo]]
@app.post("/test_taint/{name}/{number}") # $ routeSetup="/test_taint/{name}/{number}"
async def test_taint(name : str, number : int, also_input: MyComplexModel): # $ requestHandler routedParameter=name routedParameter=number routedParameter=also_input
ensure_tainted(
name, # $ tainted
number, # $ tainted
also_input, # $ tainted
also_input.field, # $ tainted
also_input.main_foo, # $ tainted
also_input.main_foo.foo, # $ tainted
also_input.other_foos, # $ tainted
also_input.other_foos[0], # $ tainted
also_input.other_foos[0].foo, # $ tainted
[f.foo for f in also_input.other_foos], # $ tainted
also_input.nested_foos, # $ tainted
also_input.nested_foos[0], # $ tainted
also_input.nested_foos[0][0], # $ tainted
also_input.nested_foos[0][0].foo, # $ tainted
)
other_foos = also_input.other_foos
ensure_tainted(
other_foos, # $ tainted
other_foos[0], # $ tainted
other_foos[0].foo, # $ tainted
[f.foo for f in other_foos], # $ tainted
)
return "ok" # $ HttpResponse
# --- body ---
# see https://fastapi.tiangolo.com/tutorial/body-multiple-params/
from fastapi import Body
# request is made such as `/will-be-query-param?name=foo`
@app.post("/will-be-query-param") # $ routeSetup="/will-be-query-param"
async def will_be_query_param(name: str): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# with the `= Body(...)` "annotation" FastAPI will know to transmit `name` as part of
# the HTTP post body
@app.post("/will-not-be-query-param") # $ routeSetup="/will-not-be-query-param"
async def will_not_be_query_param(name: str = Body("foo", media_type="text/plain")): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# --- form data ---
# see https://fastapi.tiangolo.com/tutorial/request-forms/
from fastapi import Form
@app.post("/form-example") # $ routeSetup="/form-example"
async def form_example(username: str = Form(None)): # $ requestHandler routedParameter=username
ensure_tainted(username) # $ tainted
return "ok" # $ HttpResponse
# --- HTTP headers ---
# see https://fastapi.tiangolo.com/tutorial/header-params/
from fastapi import Header
@app.get("/header-example") # $ routeSetup="/header-example"
async def header_example(user_agent: Optional[str] = Header(None)): # $ requestHandler routedParameter=user_agent
ensure_tainted(user_agent) # $ tainted
return "ok" # $ HttpResponse
# --- file upload ---
# see https://fastapi.tiangolo.com/tutorial/request-files/
# see https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile
from fastapi import File, UploadFile
@app.post("/file-upload") # $ routeSetup="/file-upload"
async def file_upload(f1: bytes = File(None), f2: UploadFile = File(None)): # $ requestHandler routedParameter=f1 routedParameter=f2
ensure_tainted(
f1, # $ tainted
f2, # $ tainted
f2.filename, # $ MISSING: tainted
f2.content_type, # $ MISSING: tainted
f2.file, # $ MISSING: tainted
f2.file.read(), # $ MISSING: tainted
f2.file.readline(), # $ MISSING: tainted
f2.file.readlines(), # $ MISSING: tainted
await f2.read(), # $ MISSING: tainted
)
return "ok" # $ HttpResponse
# --- WebSocket ---
import starlette.websockets
from fastapi import WebSocket
assert WebSocket == starlette.websockets.WebSocket
@app.websocket("/ws") # $ routeSetup="/ws"
async def websocket_test(websocket: WebSocket): # $ requestHandler routedParameter=websocket
await websocket.accept()
ensure_tainted(
websocket, # $ tainted
websocket.url, # $ tainted
websocket.url.netloc, # $ tainted
websocket.url.path, # $ tainted
websocket.url.query, # $ tainted
websocket.url.fragment, # $ tainted
websocket.url.username, # $ tainted
websocket.url.password, # $ tainted
websocket.url.hostname, # $ tainted
websocket.url.port, # $ tainted
websocket.url.components, # $ tainted
websocket.url.components.netloc, # $ tainted
websocket.url.components.path, # $ tainted
websocket.url.components.query, # $ tainted
websocket.url.components.fragment, # $ tainted
websocket.url.components.username, # $ tainted
websocket.url.components.password, # $ tainted
websocket.url.components.hostname, # $ tainted
websocket.url.components.port, # $ tainted
websocket.headers, # $ tainted
websocket.headers["key"], # $ tainted
websocket.query_params, # $ tainted
websocket.query_params["key"], # $ tainted
websocket.cookies, # $ tainted
websocket.cookies["key"], # $ tainted
await websocket.receive(), # $ tainted
await websocket.receive_bytes(), # $ tainted
await websocket.receive_text(), # $ tainted
await websocket.receive_json(), # $ tainted
)
# scheme seems very unlikely to give interesting results, but very likely to give FPs.
ensure_not_tainted(
websocket.url.scheme,
websocket.url.components.scheme,
)
async for data in websocket.iter_bytes():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_text():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_json():
ensure_tainted(data) # $ tainted
# --- Request ---
import starlette.requests
from fastapi import Request
assert Request == starlette.requests.Request
@app.websocket("/req") # $ routeSetup="/req"
async def request_test(request: Request): # $ requestHandler routedParameter=request
ensure_tainted(
request, # $ tainted
await request.body(), # $ tainted
await request.json(), # $ tainted
await request.json()["key"], # $ tainted
# form() returns a FormData (which is a starlette ImmutableMultiDict)
await request.form(), # $ tainted
await request.form()["key"], # $ tainted
await request.form().getlist("key"), # $ MISSING: tainted
await request.form().getlist("key")[0], # $ MISSING: tainted
# data in the form could be an starlette.datastructures.UploadFile
await request.form()["file"].filename, # $ MISSING: tainted
await request.form().getlist("file")[0].filename, # $ MISSING: tainted
request.cookies, # $ tainted
request.cookies["key"], # $ tainted
)
async for chunk in request.stream():
ensure_tainted(chunk) # $ tainted