mirror of
https://github.com/github/codeql.git
synced 2026-04-30 11:15:13 +02:00
@@ -0,0 +1,12 @@
|
||||
import python
|
||||
import experimental.meta.ConceptsTest
|
||||
|
||||
class DedicatedResponseTest extends HttpServerHttpResponseTest {
|
||||
DedicatedResponseTest() { file.getShortName() = "response_test.py" }
|
||||
}
|
||||
|
||||
class OtherResponseTest extends HttpServerHttpResponseTest {
|
||||
OtherResponseTest() { not this instanceof DedicatedResponseTest }
|
||||
|
||||
override string getARelevantTag() { result = "HttpResponse" }
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
argumentToEnsureNotTaintedNotMarkedAsSpurious
|
||||
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
|
||||
failures
|
||||
@@ -0,0 +1 @@
|
||||
import experimental.meta.InlineTaintTest
|
||||
66
python/ql/test/library-tests/frameworks/fastapi/basic.py
Normal file
66
python/ql/test/library-tests/frameworks/fastapi/basic.py
Normal file
@@ -0,0 +1,66 @@
|
||||
# Taking inspiration from https://realpython.com/fastapi-python-web-apis/
|
||||
|
||||
# run with
|
||||
# uvicorn basic:app --reload
|
||||
# Then visit http://127.0.0.1:8000/docs and http://127.0.0.1:8000/redoc
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/") # $ routeSetup="/"
|
||||
async def root(): # $ requestHandler
|
||||
return {"message": "Hello World"} # $ HttpResponse
|
||||
|
||||
@app.get("/non-async") # $ routeSetup="/non-async"
|
||||
def non_async(): # $ requestHandler
|
||||
return {"message": "non-async"} # $ HttpResponse
|
||||
|
||||
@app.get(path="/kw-arg") # $ routeSetup="/kw-arg"
|
||||
def kw_arg(): # $ requestHandler
|
||||
return {"message": "kw arg"} # $ HttpResponse
|
||||
|
||||
@app.get("/foo/{foo_id}") # $ routeSetup="/foo/{foo_id}"
|
||||
async def get_foo(foo_id: int): # $ requestHandler routedParameter=foo_id
|
||||
# FastAPI does data validation (with `pydantic` PyPI package) under the hood based
|
||||
# on the type annotation we did for `foo_id`, so it will auto-reject anything that's
|
||||
# not an int.
|
||||
return {"foo_id": foo_id} # $ HttpResponse
|
||||
|
||||
# this will work as query param, so `/bar?bar_id=123`
|
||||
@app.get("/bar") # $ routeSetup="/bar"
|
||||
async def get_bar(bar_id: int = 42): # $ requestHandler routedParameter=bar_id
|
||||
return {"bar_id": bar_id} # $ HttpResponse
|
||||
|
||||
# The big deal is that FastAPI works so well together with pydantic, so you can do stuff like this
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
price: float
|
||||
is_offer: Optional[bool] = None
|
||||
|
||||
@app.post("/items/") # $ routeSetup="/items/"
|
||||
async def create_item(item: Item): # $ requestHandler routedParameter=item
|
||||
# Note: calling `item` a routed parameter is slightly untrue, since it doesn't come
|
||||
# from the URL itself, but from the body of the POST request
|
||||
return item # $ HttpResponse
|
||||
|
||||
# this also works fine
|
||||
@app.post("/2items") # $ routeSetup="/2items"
|
||||
async def create_item2(item1: Item, item2: Item): # $ requestHandler routedParameter=item1 routedParameter=item2
|
||||
return (item1, item2) # $ HttpResponse
|
||||
|
||||
@app.api_route("/baz/{baz_id}", methods=["GET"]) # $ routeSetup="/baz/{baz_id}"
|
||||
async def get_baz(baz_id: int): # $ requestHandler routedParameter=baz_id
|
||||
return {"baz_id2": baz_id} # $ HttpResponse
|
||||
|
||||
# Docs:
|
||||
# see https://fastapi.tiangolo.com/tutorial/path-params/
|
||||
|
||||
# Things we should look at supporting:
|
||||
# - https://fastapi.tiangolo.com/tutorial/dependencies/
|
||||
# - https://fastapi.tiangolo.com/tutorial/background-tasks/
|
||||
# - https://fastapi.tiangolo.com/tutorial/middleware/
|
||||
# - https://fastapi.tiangolo.com/tutorial/encoder/
|
||||
145
python/ql/test/library-tests/frameworks/fastapi/response_test.py
Normal file
145
python/ql/test/library-tests/frameworks/fastapi/response_test.py
Normal file
@@ -0,0 +1,145 @@
|
||||
# see https://fastapi.tiangolo.com/advanced/response-cookies/
|
||||
|
||||
from fastapi import FastAPI, Response
|
||||
import fastapi.responses
|
||||
import asyncio
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/response_parameter") # $ routeSetup="/response_parameter"
|
||||
async def response_parameter(response: Response): # $ requestHandler
|
||||
response.set_cookie("key", "value") # $ CookieWrite CookieName="key" CookieValue="value"
|
||||
response.set_cookie(key="key", value="value") # $ CookieWrite CookieName="key" CookieValue="value"
|
||||
response.headers.append("Set-Cookie", "key2=value2") # $ CookieWrite CookieRawHeader="key2=value2"
|
||||
response.headers.append(key="Set-Cookie", value="key2=value2") # $ CookieWrite CookieRawHeader="key2=value2"
|
||||
response.headers["X-MyHeader"] = "header-value"
|
||||
response.status_code = 418
|
||||
return {"message": "response as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
|
||||
|
||||
|
||||
@app.get("/resp_parameter") # $ routeSetup="/resp_parameter"
|
||||
async def resp_parameter(resp: Response): # $ requestHandler
|
||||
resp.status_code = 418
|
||||
return {"message": "resp as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
|
||||
|
||||
|
||||
@app.get("/response_parameter_no_type") # $ routeSetup="/response_parameter_no_type"
|
||||
async def response_parameter_no_type(response): # $ requestHandler routedParameter=response
|
||||
# NOTE: This does in fact not work, since FastAPI relies on the type annotations,
|
||||
# and not on the name of the parameter
|
||||
response.status_code = 418
|
||||
return {"message": "response as parameter"} # $ HttpResponse mimetype=application/json responseBody=Dict
|
||||
|
||||
|
||||
class MyXmlResponse(fastapi.responses.Response):
|
||||
media_type = "application/xml"
|
||||
|
||||
|
||||
@app.get("/response_parameter_custom_type", response_class=MyXmlResponse) # $ routeSetup="/response_parameter_custom_type"
|
||||
async def response_parameter_custom_type(response: MyXmlResponse): # $ requestHandler
|
||||
# NOTE: This is a contrived example of using a wrong annotation for the response
|
||||
# parameter. It will be passed a `fastapi.responses.Response` value when handling an
|
||||
# incoming request, so NOT a `MyXmlResponse` value. Cookies/Headers are still
|
||||
# propagated to the final response though.
|
||||
print(type(response))
|
||||
assert type(response) == fastapi.responses.Response
|
||||
response.set_cookie("key", "value") # $ CookieWrite CookieName="key" CookieValue="value"
|
||||
response.headers["Custom-Response-Type"] = "yes, but only after function has run"
|
||||
xml_data = "<foo>FOO</foo>"
|
||||
return xml_data # $ HttpResponse responseBody=xml_data mimetype=application/xml
|
||||
|
||||
|
||||
# Direct response construction
|
||||
|
||||
# see https://fastapi.tiangolo.com/advanced/response-directly/
|
||||
# see https://fastapi.tiangolo.com/advanced/custom-response/
|
||||
|
||||
|
||||
|
||||
@app.get("/direct_response") # $ routeSetup="/direct_response"
|
||||
async def direct_response(): # $ requestHandler
|
||||
xml_data = "<foo>FOO</foo>"
|
||||
resp = fastapi.responses.Response(xml_data, 200, None, "application/xml") # $ HttpResponse mimetype=application/xml responseBody=xml_data
|
||||
resp = fastapi.responses.Response(content=xml_data, media_type="application/xml") # $ HttpResponse mimetype=application/xml responseBody=xml_data
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
@app.get("/direct_response2", response_class=fastapi.responses.Response) # $ routeSetup="/direct_response2"
|
||||
async def direct_response2(): # $ requestHandler
|
||||
xml_data = "<foo>FOO</foo>"
|
||||
return xml_data # $ HttpResponse responseBody=xml_data
|
||||
|
||||
|
||||
@app.get("/my_xml_response") # $ routeSetup="/my_xml_response"
|
||||
async def my_xml_response(): # $ requestHandler
|
||||
xml_data = "<foo>FOO</foo>"
|
||||
resp = MyXmlResponse(content=xml_data) # $ HttpResponse mimetype=application/xml responseBody=xml_data
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
@app.get("/my_xml_response2", response_class=MyXmlResponse) # $ routeSetup="/my_xml_response2"
|
||||
async def my_xml_response2(): # $ requestHandler
|
||||
xml_data = "<foo>FOO</foo>"
|
||||
return xml_data # $ HttpResponse responseBody=xml_data mimetype=application/xml
|
||||
|
||||
|
||||
@app.get("/html_response") # $ routeSetup="/html_response"
|
||||
async def html_response(): # $ requestHandler
|
||||
hello_world = "<h1>Hello World!</h1>"
|
||||
resp = fastapi.responses.HTMLResponse(hello_world) # $ HttpResponse mimetype=text/html responseBody=hello_world
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
@app.get("/html_response2", response_class=fastapi.responses.HTMLResponse) # $ routeSetup="/html_response2"
|
||||
async def html_response2(): # $ requestHandler
|
||||
hello_world = "<h1>Hello World!</h1>"
|
||||
return hello_world # $ HttpResponse responseBody=hello_world mimetype=text/html
|
||||
|
||||
|
||||
@app.get("/redirect") # $ routeSetup="/redirect"
|
||||
async def redirect(): # $ requestHandler
|
||||
next = "https://www.example.com"
|
||||
resp = fastapi.responses.RedirectResponse(next) # $ HttpResponse HttpRedirectResponse redirectLocation=next
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
@app.get("/redirect2", response_class=fastapi.responses.RedirectResponse) # $ routeSetup="/redirect2"
|
||||
async def redirect2(): # $ requestHandler
|
||||
next = "https://www.example.com"
|
||||
return next # $ HttpResponse HttpRedirectResponse redirectLocation=next
|
||||
|
||||
|
||||
@app.get("/streaming_response") # $ routeSetup="/streaming_response"
|
||||
async def streaming_response(): # $ requestHandler
|
||||
# You can test this with curl:
|
||||
# curl --no-buffer http://127.0.0.1:8000/streaming_response
|
||||
async def content():
|
||||
yield b"Hello "
|
||||
await asyncio.sleep(0.5)
|
||||
yield b"World"
|
||||
await asyncio.sleep(0.5)
|
||||
yield b"!"
|
||||
|
||||
resp = fastapi.responses.StreamingResponse(content()) # $ HttpResponse responseBody=content()
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
# setting `response_class` to `StreamingResponse` does not seem to work
|
||||
# so no such example here
|
||||
|
||||
|
||||
@app.get("/file_response") # $ routeSetup="/file_response"
|
||||
async def file_response(): # $ requestHandler
|
||||
# has internal dependency on PyPI package `aiofiles`
|
||||
# will guess MIME type from file extension
|
||||
|
||||
# We don't really have any good QL modeling of passing a file-path, whose content
|
||||
# will be returned as part of the response... so will leave this as a TODO for now.
|
||||
resp = fastapi.responses.FileResponse(__file__) # $ HttpResponse
|
||||
return resp # $ SPURIOUS: HttpResponse mimetype=application/json responseBody=resp
|
||||
|
||||
|
||||
@app.get("/file_response2", response_class=fastapi.responses.FileResponse) # $ routeSetup="/file_response2"
|
||||
async def file_response2(): # $ requestHandler
|
||||
return __file__ # $ HttpResponse
|
||||
33
python/ql/test/library-tests/frameworks/fastapi/router.py
Normal file
33
python/ql/test/library-tests/frameworks/fastapi/router.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# like blueprints in Flask
|
||||
# see https://fastapi.tiangolo.com/tutorial/bigger-applications/
|
||||
|
||||
from fastapi import APIRouter, FastAPI
|
||||
|
||||
|
||||
inner_router = APIRouter()
|
||||
|
||||
@inner_router.get("/foo") # $ routeSetup="/foo"
|
||||
async def root(): # $ requestHandler
|
||||
return {"msg": "inner_router /foo"} # $ HttpResponse
|
||||
|
||||
outer_router = APIRouter()
|
||||
outer_router.include_router(inner_router, prefix="/inner")
|
||||
|
||||
|
||||
items_router = APIRouter(
|
||||
prefix="/items",
|
||||
tags=["items"],
|
||||
)
|
||||
|
||||
|
||||
@items_router.get("/") # $ routeSetup="/"
|
||||
async def items(): # $ requestHandler
|
||||
return {"msg": "items_router /"} # $ HttpResponse
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.include_router(outer_router, prefix="/outer")
|
||||
app.include_router(items_router)
|
||||
|
||||
# see basic.py for instructions for how to run this code.
|
||||
189
python/ql/test/library-tests/frameworks/fastapi/taint_test.py
Normal file
189
python/ql/test/library-tests/frameworks/fastapi/taint_test.py
Normal file
@@ -0,0 +1,189 @@
|
||||
# --- to make things runable ---
|
||||
|
||||
ensure_tainted = ensure_not_tainted = print
|
||||
|
||||
# --- real code ---
|
||||
|
||||
from fastapi import FastAPI
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Foo(BaseModel):
|
||||
foo: str
|
||||
|
||||
|
||||
class MyComplexModel(BaseModel):
|
||||
field: str
|
||||
main_foo: Foo
|
||||
other_foos: List[Foo]
|
||||
nested_foos: List[List[Foo]]
|
||||
|
||||
|
||||
@app.post("/test_taint/{name}/{number}") # $ routeSetup="/test_taint/{name}/{number}"
|
||||
async def test_taint(name : str, number : int, also_input: MyComplexModel): # $ requestHandler routedParameter=name routedParameter=number routedParameter=also_input
|
||||
ensure_tainted(
|
||||
name, # $ tainted
|
||||
number, # $ tainted
|
||||
|
||||
also_input, # $ tainted
|
||||
also_input.field, # $ tainted
|
||||
|
||||
also_input.main_foo, # $ tainted
|
||||
also_input.main_foo.foo, # $ tainted
|
||||
|
||||
also_input.other_foos, # $ tainted
|
||||
also_input.other_foos[0], # $ tainted
|
||||
also_input.other_foos[0].foo, # $ tainted
|
||||
[f.foo for f in also_input.other_foos], # $ MISSING: tainted
|
||||
|
||||
also_input.nested_foos, # $ tainted
|
||||
also_input.nested_foos[0], # $ tainted
|
||||
also_input.nested_foos[0][0], # $ tainted
|
||||
also_input.nested_foos[0][0].foo, # $ tainted
|
||||
)
|
||||
|
||||
other_foos = also_input.other_foos
|
||||
|
||||
ensure_tainted(
|
||||
other_foos, # $ tainted
|
||||
other_foos[0], # $ tainted
|
||||
other_foos[0].foo, # $ tainted
|
||||
[f.foo for f in other_foos], # $ MISSING: tainted
|
||||
)
|
||||
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
|
||||
# --- body ---
|
||||
# see https://fastapi.tiangolo.com/tutorial/body-multiple-params/
|
||||
|
||||
from fastapi import Body
|
||||
|
||||
# request is made such as `/will-be-query-param?name=foo`
|
||||
@app.post("/will-be-query-param") # $ routeSetup="/will-be-query-param"
|
||||
async def will_be_query_param(name: str): # $ requestHandler routedParameter=name
|
||||
ensure_tainted(name) # $ tainted
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
# with the `= Body(...)` "annotation" FastAPI will know to transmit `name` as part of
|
||||
# the HTTP post body
|
||||
@app.post("/will-not-be-query-param") # $ routeSetup="/will-not-be-query-param"
|
||||
async def will_not_be_query_param(name: str = Body("foo", media_type="text/plain")): # $ requestHandler routedParameter=name
|
||||
ensure_tainted(name) # $ tainted
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
|
||||
# --- form data ---
|
||||
# see https://fastapi.tiangolo.com/tutorial/request-forms/
|
||||
|
||||
from fastapi import Form
|
||||
|
||||
@app.post("/form-example") # $ routeSetup="/form-example"
|
||||
async def form_example(username: str = Form(None)): # $ requestHandler routedParameter=username
|
||||
ensure_tainted(username) # $ tainted
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
|
||||
# --- HTTP headers ---
|
||||
# see https://fastapi.tiangolo.com/tutorial/header-params/
|
||||
|
||||
from fastapi import Header
|
||||
|
||||
@app.get("/header-example") # $ routeSetup="/header-example"
|
||||
async def header_example(user_agent: Optional[str] = Header(None)): # $ requestHandler routedParameter=user_agent
|
||||
ensure_tainted(user_agent) # $ tainted
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
|
||||
# --- file upload ---
|
||||
# see https://fastapi.tiangolo.com/tutorial/request-files/
|
||||
# see https://fastapi.tiangolo.com/tutorial/request-files/#uploadfile
|
||||
|
||||
from fastapi import File, UploadFile
|
||||
|
||||
@app.post("/file-upload") # $ routeSetup="/file-upload"
|
||||
async def file_upload(f1: bytes = File(None), f2: UploadFile = File(None)): # $ requestHandler routedParameter=f1 routedParameter=f2
|
||||
ensure_tainted(
|
||||
f1, # $ tainted
|
||||
|
||||
f2, # $ tainted
|
||||
f2.filename, # $ MISSING: tainted
|
||||
f2.content_type, # $ MISSING: tainted
|
||||
f2.file, # $ MISSING: tainted
|
||||
f2.file.read(), # $ MISSING: tainted
|
||||
f2.file.readline(), # $ MISSING: tainted
|
||||
f2.file.readlines(), # $ MISSING: tainted
|
||||
await f2.read(), # $ MISSING: tainted
|
||||
)
|
||||
return "ok" # $ HttpResponse
|
||||
|
||||
# --- WebSocket ---
|
||||
|
||||
import starlette.websockets
|
||||
from fastapi import WebSocket
|
||||
|
||||
|
||||
assert WebSocket == starlette.websockets.WebSocket
|
||||
|
||||
|
||||
@app.websocket("/ws") # $ routeSetup="/ws"
|
||||
async def websocket_test(websocket: WebSocket): # $ requestHandler routedParameter=websocket
|
||||
await websocket.accept()
|
||||
|
||||
ensure_tainted(
|
||||
websocket, # $ tainted
|
||||
|
||||
websocket.url, # $ tainted
|
||||
|
||||
websocket.url.netloc, # $ tainted
|
||||
websocket.url.path, # $ tainted
|
||||
websocket.url.query, # $ tainted
|
||||
websocket.url.fragment, # $ tainted
|
||||
websocket.url.username, # $ tainted
|
||||
websocket.url.password, # $ tainted
|
||||
websocket.url.hostname, # $ tainted
|
||||
websocket.url.port, # $ tainted
|
||||
|
||||
websocket.url.components, # $ tainted
|
||||
websocket.url.components.netloc, # $ tainted
|
||||
websocket.url.components.path, # $ tainted
|
||||
websocket.url.components.query, # $ tainted
|
||||
websocket.url.components.fragment, # $ tainted
|
||||
websocket.url.components.username, # $ tainted
|
||||
websocket.url.components.password, # $ tainted
|
||||
websocket.url.components.hostname, # $ tainted
|
||||
websocket.url.components.port, # $ tainted
|
||||
|
||||
websocket.headers, # $ tainted
|
||||
websocket.headers["key"], # $ tainted
|
||||
|
||||
websocket.query_params, # $ tainted
|
||||
websocket.query_params["key"], # $ tainted
|
||||
|
||||
websocket.cookies, # $ tainted
|
||||
websocket.cookies["key"], # $ tainted
|
||||
|
||||
await websocket.receive(), # $ tainted
|
||||
await websocket.receive_bytes(), # $ tainted
|
||||
await websocket.receive_text(), # $ tainted
|
||||
await websocket.receive_json(), # $ tainted
|
||||
)
|
||||
|
||||
# scheme seems very unlikely to give interesting results, but very likely to give FPs.
|
||||
ensure_not_tainted(
|
||||
websocket.url.scheme,
|
||||
websocket.url.components.scheme,
|
||||
)
|
||||
|
||||
async for data in websocket.iter_bytes():
|
||||
ensure_tainted(data) # $ tainted
|
||||
|
||||
async for data in websocket.iter_text():
|
||||
ensure_tainted(data) # $ tainted
|
||||
|
||||
async for data in websocket.iter_json():
|
||||
ensure_tainted(data) # $ tainted
|
||||
Reference in New Issue
Block a user