mirror of
https://github.com/github/codeql.git
synced 2026-04-29 18:55:14 +02:00
Python: Fix experimental py/ip-address-spoofing
I realized the modeling was done in a non-recommended way, so I changed the modeling. It was very nice that I could use API graphs for the flask part, and a little sad when I couldn't for Django/Tornado.
This commit is contained in:
@@ -1,73 +1,57 @@
|
||||
private import python
|
||||
private import semmle.python.Concepts
|
||||
private import semmle.python.ApiGraphs
|
||||
private import semmle.python.dataflow.new.RemoteFlowSources
|
||||
private import semmle.python.frameworks.Flask
|
||||
private import semmle.python.frameworks.Django
|
||||
private import semmle.python.frameworks.Tornado
|
||||
|
||||
/**
|
||||
* A data flow source of the client ip obtained according to the remote endpoint identifier specified
|
||||
* (`X-Forwarded-For`, `X-Real-IP`, `Proxy-Client-IP`, etc.) in the header.
|
||||
*
|
||||
* For example: `request.headers.get("X-Forwarded-For")`.
|
||||
*
|
||||
* A call to `request.headers.get` or `request.headers.get_all` or `request.headers.getlist`.
|
||||
*/
|
||||
abstract class ClientSuppliedIpUsedInSecurityCheck extends DataFlow::CallCfgNode { }
|
||||
abstract class ClientSuppliedIpUsedInSecurityCheck extends DataFlow::Node { }
|
||||
|
||||
private class FlaskClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
|
||||
private class FlaskClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
|
||||
DataFlow::MethodCallNode {
|
||||
FlaskClientSuppliedIpUsedInSecurityCheck() {
|
||||
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
|
||||
rfs.getSourceType() = "flask.request" and this.getFunction() = get
|
||||
|
|
||||
// `get` is a call to request.headers.get or request.headers.get_all or request.headers.getlist
|
||||
// request.headers
|
||||
get.getObject()
|
||||
.(DataFlow::AttrRead)
|
||||
// request
|
||||
.getObject()
|
||||
.getALocalSource() = rfs and
|
||||
get.getAttributeName() in ["get", "get_all", "getlist"] and
|
||||
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
)
|
||||
this = Flask::request().getMember("headers").getMember(["get", "get_all", "getlist"]).getACall() and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
}
|
||||
}
|
||||
|
||||
private class DjangoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
|
||||
private class DjangoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
|
||||
DataFlow::MethodCallNode {
|
||||
DjangoClientSuppliedIpUsedInSecurityCheck() {
|
||||
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
|
||||
rfs.getSourceType() = "django.http.request.HttpRequest" and this.getFunction() = get
|
||||
|
|
||||
// `get` is a call to request.headers.get or request.META.get
|
||||
// request.headers
|
||||
get.getObject()
|
||||
.(DataFlow::AttrRead)
|
||||
// request
|
||||
.getObject()
|
||||
.getALocalSource() = rfs and
|
||||
get.getAttributeName() = "get" and
|
||||
get.getObject().(DataFlow::AttrRead).getAttributeName() in ["headers", "META"] and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
)
|
||||
exists(DataFlow::Node req, DataFlow::AttrRead headers |
|
||||
// a call to request.headers.get or request.META.get
|
||||
req = PrivateDjango::DjangoImpl::DjangoHttp::Request::HttpRequest::instance() and
|
||||
headers.getObject().getALocalSource() = req and
|
||||
headers.getAttributeName() in ["headers", "META"] and
|
||||
this.calls(headers, "get")
|
||||
) and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
}
|
||||
}
|
||||
|
||||
private class TornadoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
|
||||
private class TornadoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
|
||||
DataFlow::MethodCallNode {
|
||||
TornadoClientSuppliedIpUsedInSecurityCheck() {
|
||||
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
|
||||
rfs.getSourceType() = "tornado.web.RequestHandler" and this.getFunction() = get
|
||||
// a call to self.request.headers.get or self.request.headers.get_list inside a tornado requesthandler
|
||||
exists(
|
||||
Tornado::TornadoModule::Web::RequestHandler::SelfParam selfParam, DataFlow::AttrRead headers,
|
||||
DataFlow::AttrRead req
|
||||
|
|
||||
// `get` is a call to `rfs`.request.headers.get
|
||||
// `rfs`.request.headers
|
||||
get.getObject()
|
||||
.(DataFlow::AttrRead)
|
||||
// `rfs`.request
|
||||
.getObject()
|
||||
.(DataFlow::AttrRead)
|
||||
// `rfs`
|
||||
.getObject()
|
||||
.getALocalSource() = rfs and
|
||||
get.getAttributeName() in ["get", "get_list"] and
|
||||
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
)
|
||||
req.getObject().getALocalSource() = selfParam and
|
||||
req.getAttributeName() = "request" and
|
||||
headers.getObject().getALocalSource() = req and
|
||||
headers.getAttributeName() = "headers" and
|
||||
this.calls(headers, ["get", "get_list"])
|
||||
) and
|
||||
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user