mirror of
https://github.com/github/codeql.git
synced 2025-12-17 17:23:36 +01:00
225 lines
6.9 KiB
Python
225 lines
6.9 KiB
Python
# --- to make things runable ---
|
|
|
|
ensure_tainted = ensure_not_tainted = print
|
|
|
|
# --- real code ---
|
|
|
|
from fastapi import FastAPI
|
|
from typing import Optional, List
|
|
from pydantic import BaseModel
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
class Foo(BaseModel):
|
|
foo: str
|
|
|
|
|
|
class MyComplexModel(BaseModel):
|
|
field: str
|
|
main_foo: Foo
|
|
other_foos: List[Foo]
|
|
nested_foos: List[List[Foo]]
|
|
|
|
|
|
@app.post("/test_taint/{name}/{number}") # $ routeSetup="/test_taint/{name}/{number}"
|
|
async def test_taint(name : str, number : int, also_input: MyComplexModel): # $ requestHandler routedParameter=name routedParameter=number routedParameter=also_input
|
|
ensure_tainted(
|
|
name, # $ tainted
|
|
number, # $ tainted
|
|
|
|
also_input, # $ tainted
|
|
also_input.field, # $ tainted
|
|
|
|
also_input.main_foo, # $ tainted
|
|
also_input.main_foo.foo, # $ tainted
|
|
|
|
also_input.other_foos, # $ tainted
|
|
also_input.other_foos[0], # $ tainted
|
|
also_input.other_foos[0].foo, # $ tainted
|
|
[f.foo for f in also_input.other_foos], # $ MISSING: tainted
|
|
|
|
also_input.nested_foos, # $ tainted
|
|
also_input.nested_foos[0], # $ tainted
|
|
also_input.nested_foos[0][0], # $ tainted
|
|
also_input.nested_foos[0][0].foo, # $ tainted
|
|
)
|
|
|
|
other_foos = also_input.other_foos
|
|
|
|
ensure_tainted(
|
|
other_foos, # $ tainted
|
|
other_foos[0], # $ tainted
|
|
other_foos[0].foo, # $ tainted
|
|
[f.foo for f in other_foos], # $ MISSING: tainted
|
|
)
|
|
|
|
return "ok" # $ HttpResponse
|
|
|
|
|
|
# --- body ---
|
|
# see https://fastapi.tiangolo.com/tutorial/body-multiple-params/
|
|
|
|
from fastapi import Body
|
|
|
|
# request is made such as `/will-be-query-param?name=foo`
|
|
@app.post("/will-be-query-param") # $ routeSetup="/will-be-query-param"
|
|
async def will_be_query_param(name: str): # $ requestHandler routedParameter=name
|
|
ensure_tainted(name) # $ tainted
|
|
return "ok" # $ HttpResponse
|
|
|
|
# with the `= Body(...)` "annotation" FastAPI will know to transmit `name` as part of
|
|
# the HTTP post body
|
|
@app.post("/will-not-be-query-param") # $ routeSetup="/will-not-be-query-param"
|
|
async def will_not_be_query_param(name: str = Body("foo", media_type="text/plain")): # $ requestHandler routedParameter=name
|
|
ensure_tainted(name) # $ tainted
|
|
return "ok" # $ HttpResponse
|
|
|
|
|
|
# --- form data ---
|
|
# see https://fastapi.tiangolo.com/tutorial/request-forms/
|
|
|
|
from fastapi import Form
|
|
|
|
@app.post("/form-example") # $ routeSetup="/form-example"
|
|
async def form_example(username: str = Form(None)): # $ requestHandler routedParameter=username
|
|
ensure_tainted(username) # $ tainted
|
|
return "ok" # $ HttpResponse
|
|
|
|
|
|
# --- HTTP headers ---
|
|
# see https://fastapi.tiangolo.com/tutorial/header-params/
|
|
|
|
from fastapi import Header
|
|
|
|
@app.get("/header-example") # $ routeSetup="/header-example"
|
|
async def header_example(user_agent: Optional[str] = Header(None)): # $ requestHandler routedParameter=user_agent
|
|
ensure_tainted(user_agent) # $ tainted
|
|
return "ok" # $ HttpResponse
|
|
|
|
|
|
# --- file upload ---
|
|
# see https://fastapi.tiangolo.com/tutorial/request-files/
|
|
# see https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile
|
|
|
|
from fastapi import File, UploadFile
|
|
|
|
@app.post("/file-upload") # $ routeSetup="/file-upload"
|
|
async def file_upload(f1: bytes = File(None), f2: UploadFile = File(None)): # $ requestHandler routedParameter=f1 routedParameter=f2
|
|
ensure_tainted(
|
|
f1, # $ tainted
|
|
|
|
f2, # $ tainted
|
|
f2.filename, # $ MISSING: tainted
|
|
f2.content_type, # $ MISSING: tainted
|
|
f2.file, # $ MISSING: tainted
|
|
f2.file.read(), # $ MISSING: tainted
|
|
f2.file.readline(), # $ MISSING: tainted
|
|
f2.file.readlines(), # $ MISSING: tainted
|
|
await f2.read(), # $ MISSING: tainted
|
|
)
|
|
return "ok" # $ HttpResponse
|
|
|
|
# --- WebSocket ---
|
|
|
|
import starlette.websockets
|
|
from fastapi import WebSocket
|
|
|
|
|
|
assert WebSocket == starlette.websockets.WebSocket
|
|
|
|
|
|
@app.websocket("/ws") # $ routeSetup="/ws"
|
|
async def websocket_test(websocket: WebSocket): # $ requestHandler routedParameter=websocket
|
|
await websocket.accept()
|
|
|
|
ensure_tainted(
|
|
websocket, # $ tainted
|
|
|
|
websocket.url, # $ tainted
|
|
|
|
websocket.url.netloc, # $ tainted
|
|
websocket.url.path, # $ tainted
|
|
websocket.url.query, # $ tainted
|
|
websocket.url.fragment, # $ tainted
|
|
websocket.url.username, # $ tainted
|
|
websocket.url.password, # $ tainted
|
|
websocket.url.hostname, # $ tainted
|
|
websocket.url.port, # $ tainted
|
|
|
|
websocket.url.components, # $ tainted
|
|
websocket.url.components.netloc, # $ tainted
|
|
websocket.url.components.path, # $ tainted
|
|
websocket.url.components.query, # $ tainted
|
|
websocket.url.components.fragment, # $ tainted
|
|
websocket.url.components.username, # $ tainted
|
|
websocket.url.components.password, # $ tainted
|
|
websocket.url.components.hostname, # $ tainted
|
|
websocket.url.components.port, # $ tainted
|
|
|
|
websocket.headers, # $ tainted
|
|
websocket.headers["key"], # $ tainted
|
|
|
|
websocket.query_params, # $ tainted
|
|
websocket.query_params["key"], # $ tainted
|
|
|
|
websocket.cookies, # $ tainted
|
|
websocket.cookies["key"], # $ tainted
|
|
|
|
await websocket.receive(), # $ tainted
|
|
await websocket.receive_bytes(), # $ tainted
|
|
await websocket.receive_text(), # $ tainted
|
|
await websocket.receive_json(), # $ tainted
|
|
)
|
|
|
|
# scheme seems very unlikely to give interesting results, but very likely to give FPs.
|
|
ensure_not_tainted(
|
|
websocket.url.scheme,
|
|
websocket.url.components.scheme,
|
|
)
|
|
|
|
async for data in websocket.iter_bytes():
|
|
ensure_tainted(data) # $ tainted
|
|
|
|
async for data in websocket.iter_text():
|
|
ensure_tainted(data) # $ tainted
|
|
|
|
async for data in websocket.iter_json():
|
|
ensure_tainted(data) # $ tainted
|
|
|
|
|
|
# --- Request ---
|
|
|
|
import starlette.requests
|
|
from fastapi import Request
|
|
|
|
|
|
assert Request == starlette.requests.Request
|
|
|
|
@app.websocket("/req") # $ routeSetup="/req"
|
|
async def request_test(request: Request): # $ requestHandler routedParameter=request
|
|
ensure_tainted(
|
|
request, # $ tainted
|
|
|
|
await request.body(), # $ tainted
|
|
|
|
await request.json(), # $ tainted
|
|
await request.json()["key"], # $ tainted
|
|
|
|
# form() returns a FormData (which is a starlette ImmutableMultiDict)
|
|
await request.form(), # $ tainted
|
|
await request.form()["key"], # $ tainted
|
|
await request.form().getlist("key"), # $ MISSING: tainted
|
|
await request.form().getlist("key")[0], # $ MISSING: tainted
|
|
# data in the form could be an starlette.datastructures.UploadFile
|
|
await request.form()["file"].filename, # $ MISSING: tainted
|
|
await request.form().getlist("file")[0].filename, # $ MISSING: tainted
|
|
|
|
request.cookies, # $ tainted
|
|
request.cookies["key"], # $ tainted
|
|
)
|
|
|
|
async for chunk in request.stream():
|
|
ensure_tainted(chunk) # $ tainted
|