Python: FastAPI: Model WebSocket usage

This commit is contained in:
Rasmus Wriedt Larsen
2021-10-25 15:22:24 +02:00
parent b69977b37a
commit 7619d0fc33
6 changed files with 303 additions and 35 deletions

View File

@@ -29,6 +29,7 @@ private import semmle.python.frameworks.PyMySQL
private import semmle.python.frameworks.Rsa
private import semmle.python.frameworks.Simplejson
private import semmle.python.frameworks.SqlAlchemy
private import semmle.python.frameworks.Starlette
private import semmle.python.frameworks.Stdlib
private import semmle.python.frameworks.Tornado
private import semmle.python.frameworks.Twisted

View File

@@ -10,6 +10,7 @@ private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.frameworks.Pydantic
private import semmle.python.frameworks.Starlette
/**
* Provides models for the `fastapi` PyPI package.
@@ -49,7 +50,7 @@ private module FastApi {
exists(string routeAddingMethod |
routeAddingMethod = HTTP::httpVerbLower()
or
routeAddingMethod = "api_route"
routeAddingMethod in ["api_route", "websocket"]
|
this = App::instance().getMember(routeAddingMethod).getACall()
or
@@ -94,6 +95,17 @@ private module FastApi {
// ---------------------------------------------------------------------------
// Response modeling
// ---------------------------------------------------------------------------
/**
* A parameter to a request handler that has a WebSocket type-annotation.
*/
private class WebSocketRequestHandlerParam extends Starlette::WebSocket::InstanceSource,
DataFlow::ParameterNode {
WebSocketRequestHandlerParam() {
this.getParameter().getAnnotation() = Starlette::WebSocket::classRef().getAUse().asExpr() and
any(FastApiRouteSetup rs).getARequestHandler().getArgByName(_) = this.getParameter()
}
}
/**
* Provides models for the `fastapi.Response` class and subclasses.
*

View File

@@ -0,0 +1,162 @@
/**
* Provides classes modeling security-relevant aspects of the `starlette` PyPI package.
*
* See
* - https://pypi.org/project/starlette/
* - https://www.starlette.io/
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.frameworks.internal.InstanceTaintStepsHelper
private import semmle.python.frameworks.Stdlib
/**
* INTERNAL: Do not use.
*
* Provides models for `starlette` PyPI package.
*
* See
* - https://pypi.org/project/starlette/
* - https://www.starlette.io/
*/
module Starlette {
/**
* Provides models for the `starlette.websockets.WebSocket` class
*
* See https://www.starlette.io/websockets/.
*/
module WebSocket {
/** Gets a reference to the `starlette.websockets.WebSocket` class. */
API::Node classRef() {
result = API::moduleImport("starlette").getMember("websockets").getMember("WebSocket")
or
result = API::moduleImport("fastapi").getMember("WebSocket")
}
/**
* A source of instances of `starlette.websockets.WebSocket`, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `WebSocket::instance()` to get references to instances of `starlette.websockets.WebSocket`.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
/** A direct instantiation of `starlette.websockets.WebSocket`. */
private class ClassInstantiation extends InstanceSource, DataFlow::CallCfgNode {
ClassInstantiation() { this = classRef().getACall() }
}
/** Gets a reference to an instance of `starlette.websockets.WebSocket`. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of `starlette.websockets.WebSocket`. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
/**
* Taint propagation for `starlette.websockets.WebSocket`.
*/
private class InstanceTaintSteps extends InstanceTaintStepsHelper {
InstanceTaintSteps() { this = "starlette.websockets.WebSocket" }
override DataFlow::Node getInstance() { result = instance() }
override string getAttributeName() { result in ["url", "headers", "query_params", "cookies"] }
override string getMethodName() { none() }
override string getAsyncMethodName() {
result in [
"receive", "receive_bytes", "receive_text", "receive_json", "iter_bytes", "iter_text",
"iter_json"
]
}
}
/** An attribute read on a `starlette.websockets.WebSocket` instance that is a `starlette.requests.URL` instance. */
private class UrlInstances extends URL::InstanceSource {
UrlInstances() {
this.(DataFlow::AttrRead).getObject() = instance() and
this.(DataFlow::AttrRead).getAttributeName() = "url"
}
}
}
/**
* Provides models for the `starlette.requests.URL` class
*
* See the URL part of https://www.starlette.io/websockets/.
*/
module URL {
/** Gets a reference to the `starlette.requests.URL` class. */
private API::Node classRef() {
result = API::moduleImport("starlette").getMember("requests").getMember("URL")
}
/**
* A source of instances of `starlette.requests.URL`, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `URL::instance()` to get references to instances of `starlette.requests.URL`.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
/** A direct instantiation of `starlette.requests.URL`. */
private class ClassInstantiation extends InstanceSource, DataFlow::CallCfgNode {
ClassInstantiation() { this = classRef().getACall() }
}
/** Gets a reference to an instance of `starlette.requests.URL`. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of `starlette.requests.URL`. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
/**
* Taint propagation for `starlette.requests.URL`.
*/
private class InstanceTaintSteps extends InstanceTaintStepsHelper {
InstanceTaintSteps() { this = "starlette.requests.URL" }
override DataFlow::Node getInstance() { result = instance() }
override string getAttributeName() {
result in [
"components", "netloc", "path", "query", "fragment", "username", "password", "hostname",
"port"
]
}
override string getMethodName() { none() }
override string getAsyncMethodName() { none() }
}
/** An attribute read on a `starlette.requests.URL` instance that is a `urllib.parse.SplitResult` instance. */
private class UrlSplitInstances extends Stdlib::SplitResult::InstanceSource {
UrlSplitInstances() {
this.(DataFlow::AttrRead).getObject() = instance() and
this.(DataFlow::AttrRead).getAttributeName() = "components"
}
}
}
}

View File

@@ -167,6 +167,74 @@ module Stdlib {
override string getAsyncMethodName() { none() }
}
}
/**
* Provides models for the `urllib.parse.SplitResult` class
*
* See https://docs.python.org/3.9/library/urllib.parse.html#urllib.parse.SplitResult.
*/
module SplitResult {
/** Gets a reference to the `urllib.parse.SplitResult` class. */
private API::Node classRef() {
result = API::moduleImport("urllib").getMember("parse").getMember("SplitResult")
}
/**
* A source of instances of `urllib.parse.SplitResult`, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `SplitResult::instance()` to get references to instances of `urllib.parse.SplitResult`.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
/** A direct instantiation of `urllib.parse.SplitResult`. */
private class ClassInstantiation extends InstanceSource, DataFlow::CallCfgNode {
ClassInstantiation() { this = classRef().getACall() }
}
/** Gets a reference to an instance of `urllib.parse.SplitResult`. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of `urllib.parse.SplitResult`. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
/**
* Taint propagation for `urllib.parse.SplitResult`.
*/
private class InstanceTaintSteps extends InstanceTaintStepsHelper {
InstanceTaintSteps() { this = "urllib.parse.SplitResult" }
override DataFlow::Node getInstance() { result = instance() }
override string getAttributeName() {
result in [
"netloc", "path", "query", "fragment", "username", "password", "hostname", "port"
]
}
override string getMethodName() { none() }
override string getAsyncMethodName() { none() }
}
/**
* Extra taint propagation for `urllib.parse.SplitResult`, not covered by `InstanceTaintSteps`.
*/
private class AdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// TODO
none()
}
}
}
}
/**
@@ -1749,6 +1817,30 @@ private module StdlibPrivate {
override string getKind() { result = Escaping::getRegexKind() }
}
// ---------------------------------------------------------------------------
// urllib
// ---------------------------------------------------------------------------
/**
* A call to `urllib.parse.urlsplit`
*
* See https://docs.python.org/3.9/library/urllib.parse.html#urllib.parse.urlsplit
*/
class UrllibParseUrlsplitCall extends Stdlib::SplitResult::InstanceSource, DataFlow::CallCfgNode {
UrllibParseUrlsplitCall() {
this = API::moduleImport("urllib").getMember("parse").getMember("urlsplit").getACall()
}
/** Gets the argument that specifies the URL. */
DataFlow::Node getUrl() { result in [this.getArg(0), this.getArgByName("url")] }
}
/** Extra taint-step such that the result of `urllib.parse.urlsplit(tainted_string)` is tainted. */
private class UrllibParseUrlsplitCallAdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
nodeTo.(UrllibParseUrlsplitCall).getUrl() = nodeFrom
}
}
}
// ---------------------------------------------------------------------------