Files
codeql/python/ql/test/library-tests/frameworks/fastapi/taint_test.py
Rasmus Wriedt Larsen 34631a8784 Python: Model FastAPI requests
Co-authored-by: Joe Farebrother <joefarebrother@github.com>
2024-12-18 15:58:51 +01:00

225 lines
6.9 KiB
Python

# --- to make things runable ---
ensure_tainted = ensure_not_tainted = print
# --- real code ---
from fastapi import FastAPI
from typing import Optional, List
from pydantic import BaseModel
app = FastAPI()
class Foo(BaseModel):
foo: str
class MyComplexModel(BaseModel):
field: str
main_foo: Foo
other_foos: List[Foo]
nested_foos: List[List[Foo]]
@app.post("/test_taint/{name}/{number}") # $ routeSetup="/test_taint/{name}/{number}"
async def test_taint(name : str, number : int, also_input: MyComplexModel): # $ requestHandler routedParameter=name routedParameter=number routedParameter=also_input
ensure_tainted(
name, # $ tainted
number, # $ tainted
also_input, # $ tainted
also_input.field, # $ tainted
also_input.main_foo, # $ tainted
also_input.main_foo.foo, # $ tainted
also_input.other_foos, # $ tainted
also_input.other_foos[0], # $ tainted
also_input.other_foos[0].foo, # $ tainted
[f.foo for f in also_input.other_foos], # $ MISSING: tainted
also_input.nested_foos, # $ tainted
also_input.nested_foos[0], # $ tainted
also_input.nested_foos[0][0], # $ tainted
also_input.nested_foos[0][0].foo, # $ tainted
)
other_foos = also_input.other_foos
ensure_tainted(
other_foos, # $ tainted
other_foos[0], # $ tainted
other_foos[0].foo, # $ tainted
[f.foo for f in other_foos], # $ MISSING: tainted
)
return "ok" # $ HttpResponse
# --- body ---
# see https://fastapi.tiangolo.com/tutorial/body-multiple-params/
from fastapi import Body
# request is made such as `/will-be-query-param?name=foo`
@app.post("/will-be-query-param") # $ routeSetup="/will-be-query-param"
async def will_be_query_param(name: str): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# with the `= Body(...)` "annotation" FastAPI will know to transmit `name` as part of
# the HTTP post body
@app.post("/will-not-be-query-param") # $ routeSetup="/will-not-be-query-param"
async def will_not_be_query_param(name: str = Body("foo", media_type="text/plain")): # $ requestHandler routedParameter=name
ensure_tainted(name) # $ tainted
return "ok" # $ HttpResponse
# --- form data ---
# see https://fastapi.tiangolo.com/tutorial/request-forms/
from fastapi import Form
@app.post("/form-example") # $ routeSetup="/form-example"
async def form_example(username: str = Form(None)): # $ requestHandler routedParameter=username
ensure_tainted(username) # $ tainted
return "ok" # $ HttpResponse
# --- HTTP headers ---
# see https://fastapi.tiangolo.com/tutorial/header-params/
from fastapi import Header
@app.get("/header-example") # $ routeSetup="/header-example"
async def header_example(user_agent: Optional[str] = Header(None)): # $ requestHandler routedParameter=user_agent
ensure_tainted(user_agent) # $ tainted
return "ok" # $ HttpResponse
# --- file upload ---
# see https://fastapi.tiangolo.com/tutorial/request-files/
# see https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile
from fastapi import File, UploadFile
@app.post("/file-upload") # $ routeSetup="/file-upload"
async def file_upload(f1: bytes = File(None), f2: UploadFile = File(None)): # $ requestHandler routedParameter=f1 routedParameter=f2
ensure_tainted(
f1, # $ tainted
f2, # $ tainted
f2.filename, # $ MISSING: tainted
f2.content_type, # $ MISSING: tainted
f2.file, # $ MISSING: tainted
f2.file.read(), # $ MISSING: tainted
f2.file.readline(), # $ MISSING: tainted
f2.file.readlines(), # $ MISSING: tainted
await f2.read(), # $ MISSING: tainted
)
return "ok" # $ HttpResponse
# --- WebSocket ---
import starlette.websockets
from fastapi import WebSocket
assert WebSocket == starlette.websockets.WebSocket
@app.websocket("/ws") # $ routeSetup="/ws"
async def websocket_test(websocket: WebSocket): # $ requestHandler routedParameter=websocket
await websocket.accept()
ensure_tainted(
websocket, # $ tainted
websocket.url, # $ tainted
websocket.url.netloc, # $ tainted
websocket.url.path, # $ tainted
websocket.url.query, # $ tainted
websocket.url.fragment, # $ tainted
websocket.url.username, # $ tainted
websocket.url.password, # $ tainted
websocket.url.hostname, # $ tainted
websocket.url.port, # $ tainted
websocket.url.components, # $ tainted
websocket.url.components.netloc, # $ tainted
websocket.url.components.path, # $ tainted
websocket.url.components.query, # $ tainted
websocket.url.components.fragment, # $ tainted
websocket.url.components.username, # $ tainted
websocket.url.components.password, # $ tainted
websocket.url.components.hostname, # $ tainted
websocket.url.components.port, # $ tainted
websocket.headers, # $ tainted
websocket.headers["key"], # $ tainted
websocket.query_params, # $ tainted
websocket.query_params["key"], # $ tainted
websocket.cookies, # $ tainted
websocket.cookies["key"], # $ tainted
await websocket.receive(), # $ tainted
await websocket.receive_bytes(), # $ tainted
await websocket.receive_text(), # $ tainted
await websocket.receive_json(), # $ tainted
)
# scheme seems very unlikely to give interesting results, but very likely to give FPs.
ensure_not_tainted(
websocket.url.scheme,
websocket.url.components.scheme,
)
async for data in websocket.iter_bytes():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_text():
ensure_tainted(data) # $ tainted
async for data in websocket.iter_json():
ensure_tainted(data) # $ tainted
# --- Request ---
import starlette.requests
from fastapi import Request
assert Request == starlette.requests.Request
@app.websocket("/req") # $ routeSetup="/req"
async def request_test(request: Request): # $ requestHandler routedParameter=request
ensure_tainted(
request, # $ tainted
await request.body(), # $ tainted
await request.json(), # $ tainted
await request.json()["key"], # $ tainted
# form() returns a FormData (which is a starlette ImmutableMultiDict)
await request.form(), # $ tainted
await request.form()["key"], # $ tainted
await request.form().getlist("key"), # $ MISSING: tainted
await request.form().getlist("key")[0], # $ MISSING: tainted
# data in the form could be an starlette.datastructures.UploadFile
await request.form()["file"].filename, # $ MISSING: tainted
await request.form().getlist("file")[0].filename, # $ MISSING: tainted
request.cookies, # $ tainted
request.cookies["key"], # $ tainted
)
async for chunk in request.stream():
ensure_tainted(chunk) # $ tainted