mirror of
https://github.com/github/codeql.git
synced 2026-04-27 09:45:15 +02:00
Merge pull request #13731 from pwntester/py/aiohttp_improvements
Python: Aiohttp improvements
This commit is contained in:
@@ -292,10 +292,11 @@ module HttpServerHttpResponseTest implements TestSig {
|
||||
exists(DedicatedResponseTest d | d.isDedicatedFile(file))
|
||||
) and
|
||||
(
|
||||
exists(Http::Server::HttpResponse response |
|
||||
location = response.getLocation() and
|
||||
element = response.toString() and
|
||||
value = prettyNodeForInlineTest(response.getBody()) and
|
||||
exists(Http::Server::HttpResponse response, DataFlow::Node body |
|
||||
body = response.getBody() and
|
||||
location = body.getLocation() and
|
||||
element = body.toString() and
|
||||
value = prettyNodeForInlineTest(body) and
|
||||
tag = "responseBody"
|
||||
)
|
||||
or
|
||||
|
||||
@@ -1,2 +1,19 @@
|
||||
import experimental.meta.InlineTaintTest
|
||||
import MakeInlineTaintTest<TestTaintTrackingConfig>
|
||||
|
||||
predicate isSafe(DataFlow::GuardNode g, ControlFlowNode node, boolean branch) {
|
||||
g.(CallNode).getFunction().(NameNode).getId() = "is_safe" and
|
||||
node = g.(CallNode).getArg(_) and
|
||||
branch = true
|
||||
}
|
||||
|
||||
module CustomSanitizerOverridesConfig implements DataFlow::ConfigSig {
|
||||
predicate isSource = TestTaintTrackingConfig::isSource/1;
|
||||
|
||||
predicate isSink = TestTaintTrackingConfig::isSink/1;
|
||||
|
||||
predicate isBarrier(DataFlow::Node node) {
|
||||
node = DataFlow::BarrierGuard<isSafe/3>::getABarrierNode()
|
||||
}
|
||||
}
|
||||
|
||||
import MakeInlineTaintTest<CustomSanitizerOverridesConfig>
|
||||
|
||||
@@ -33,3 +33,5 @@ async def test():
|
||||
assert context.verify_mode == ssl.VerifyMode.CERT_NONE
|
||||
|
||||
s.get("url", ssl=context) # $ clientRequestUrlPart="url" MISSING: clientRequestCertValidationDisabled
|
||||
|
||||
s.ws_connect("url") # $ clientRequestUrlPart="url"
|
||||
|
||||
@@ -23,6 +23,9 @@ async def html_text(request): # $ requestHandler
|
||||
async def html_body(request): # $ requestHandler
|
||||
return web.Response(body=b"foo", content_type="text/html") # $ HttpResponse mimetype=text/html responseBody=b"foo"
|
||||
|
||||
@routes.get("/html_body_header") # $ routeSetup="/html_body_header"
|
||||
async def html_body_header(request): # $ requestHandler
|
||||
return web.Response(headers={"content-type": "text/html"}, text="foo") # $ HttpResponse mimetype=text/html responseBody="foo"
|
||||
|
||||
@routes.get("/html_body_set_later") # $ routeSetup="/html_body_set_later"
|
||||
async def html_body_set_later(request): # $ requestHandler
|
||||
@@ -65,6 +68,26 @@ async def redirect_302(request): # $ requestHandler
|
||||
else:
|
||||
raise web.HTTPFound(location="/logout") # $ HttpResponse HttpRedirectResponse mimetype=application/octet-stream redirectLocation="/logout"
|
||||
|
||||
|
||||
@routes.get("/file_response") # $ routeSetup="/file_response"
|
||||
async def file_response(request): # $ requestHandler
|
||||
filename = "foo.txt"
|
||||
resp = web.FileResponse(filename) # $ HttpResponse mimetype=application/octet-stream getAPathArgument=filename
|
||||
resp = web.FileResponse(path=filename) # $ HttpResponse mimetype=application/octet-stream getAPathArgument=filename
|
||||
return resp
|
||||
|
||||
|
||||
@routes.get("/streaming_response") # $ routeSetup="/streaming_response"
|
||||
async def streaming_response(request): # $ requestHandler
|
||||
resp = web.StreamResponse() # $ HttpResponse mimetype=application/octet-stream
|
||||
await resp.prepare(request)
|
||||
|
||||
await resp.write(b"foo") # $ responseBody=b"foo"
|
||||
await resp.write(data=b"bar") # $ responseBody=b"bar"
|
||||
await resp.write_eof(b"baz") # $ responseBody=b"baz"
|
||||
|
||||
return resp
|
||||
|
||||
################################################################################
|
||||
# Cookies
|
||||
################################################################################
|
||||
|
||||
@@ -142,10 +142,36 @@ class TaintTestClass(web.View):
|
||||
self.request.url # $ tainted
|
||||
)
|
||||
|
||||
# not a request handler, and not called, but since we have type-annotation, should be a
|
||||
# remote-flow-source.
|
||||
async def test_source_from_type_annotation(request: web.Request):
|
||||
# picking out just a few of the tests from `test_taint` above, to show that we have
|
||||
# the same taint-steps :)
|
||||
ensure_tainted(
|
||||
request, # $ tainted
|
||||
request.url, # $ tainted
|
||||
await request.content.read(), # $ tainted
|
||||
)
|
||||
|
||||
# Test that since we can reach the `request` object in the helper function, we don't
|
||||
# introduce a new remote-flow-source, but instead use the one from the caller. (which is
|
||||
# checked to not be tainted)
|
||||
async def test_sanitizer(request): # $ requestHandler
|
||||
ensure_tainted(request, request.url, await request.content.read()) # $ tainted
|
||||
|
||||
if (is_safe(request)):
|
||||
ensure_not_tainted(request, request.url, await request.content.read())
|
||||
test_safe_helper_function_no_route_with_type(request)
|
||||
|
||||
|
||||
async def test_safe_helper_function_no_route_with_type(request: web.Request):
|
||||
ensure_not_tainted(request, request.url, await request.content.read()) # $ SPURIOUS: tainted
|
||||
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_get(r"/test_taint/{name}/{number:\d+}", test_taint) # $ routeSetup="/test_taint/{name}/{number:\d+}"
|
||||
app.router.add_view(r"/test_taint_class", TaintTestClass) # $ routeSetup="/test_taint_class"
|
||||
app.router.add_view(r"/test_sanitizer", test_sanitizer) # $ routeSetup="/test_sanitizer"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user