Merge branch 'main' into jorgectf/python/deserialization

This commit is contained in:
Rasmus Wriedt Larsen
2021-10-28 12:31:34 +02:00
1929 changed files with 197823 additions and 20995 deletions

View File

@@ -0,0 +1,26 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>If an HTTP Header is built using string concatenation or string formatting, and the
components of the concatenation include user input, a user
is likely to be able to manipulate the response.</p>
</overview>
<recommendation>
<p>User input should not be included in an HTTP Header.</p>
</recommendation>
<example>
<p>In the following example, the code appends a user-provided value into a header.</p>
<sample src="header_injection.py" />
</example>
<references>
<li>OWASP: <a href="https://owasp.org/www-community/attacks/HTTP_Response_Splitting">HTTP Response Splitting</a>.</li>
<li>Python Security: <a href="https://python-security.readthedocs.io/vuln/http-header-injection.html">HTTP header injection</a>.</li>
<li>SonarSource: <a href="https://rules.sonarsource.com/python/RSPEC-5167">RSPEC-5167</a>.</li>
</references>
</qhelp>

View File

@@ -0,0 +1,21 @@
/**
* @name HTTP Header Injection
* @description User input should not be used in HTTP headers, otherwise a malicious user
* may be able to inject a value that could manipulate the response.
* @kind path-problem
* @problem.severity error
* @id py/header-injection
* @tags security
* external/cwe/cwe-113
* external/cwe/cwe-079
*/
// determine precision above
import python
import experimental.semmle.python.security.injection.HTTPHeaders
import DataFlow::PathGraph
from HeaderInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ HTTP header is constructed from a $@.", sink.getNode(),
"This", source.getNode(), "user-provided value"

View File

@@ -0,0 +1,9 @@
from flask import Response, request, Flask, make_response
@app.route("/flask_Response")
def flask_Response():
rfs_header = request.args["rfs_header"]
response = Response()
response.headers['HeaderName'] = rfs_header
return response

View File

@@ -0,0 +1,49 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>If unsanitized user input is written to a log entry, a malicious user may be able to forge new log entries.</p>
<p>Forgery can occur if a user provides some input creating the appearance of multiple
log entries. This can include unescaped new-line characters, or HTML or other markup.</p>
</overview>
<recommendation>
<p>
User input should be suitably sanitized before it is logged.
</p>
<p>
If the log entries are plain text then line breaks should be removed from user input, using for example
<code>replace(old, new)</code> or similar. Care should also be taken that user input is clearly marked
in log entries, and that a malicious user cannot cause confusion in other ways.
</p>
<p>
For log entries that will be displayed in HTML, user input should be HTML encoded before being logged, to prevent forgery and
other forms of HTML injection.
</p>
</recommendation>
<example>
<p>
In the example, the name provided by the user is recorded using the log output function (<code>logging.info</code> or <code>app.logger.info</code>, etc.).
In these four cases, the name provided by the user is not provided The processing is recorded. If a malicious user provides <code>Guest%0D%0AUser name: Admin</code>
as a parameter, the log entry will be divided into two lines, the first line is <code>User name: Guest</code> code>, the second line is <code>User name: Admin</code>.
</p>
<sample src="LogInjectionBad.py" />
<p>
In a good example, the program uses the <code>replace</code> function to provide parameter processing to the user, and replace <code>\r\n</code> and <code>\n</code>
with empty characters. To a certain extent, the occurrence of log injection vulnerabilities is reduced.
</p>
<sample src="LogInjectionGood.py" />
</example>
<references>
<li>OWASP: <a href="https://owasp.org/www-community/attacks/Log_Injection">Log Injection</a>.</li>
</references>
</qhelp>

View File

@@ -0,0 +1,20 @@
/**
* @name Log Injection
* @description Building log entries from user-controlled data is vulnerable to
* insertion of forged log entries by a malicious user.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/log-injection
* @tags security
* external/cwe/cwe-117
*/
import python
import experimental.semmle.python.security.injection.LogInjection
import DataFlow::PathGraph
from LogInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ flows to log entry.", source.getNode(),
"User-provided value"

View File

@@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc Log Injection
"""
from flask import Flask
from flask import request
from django.utils.log import request_logger
import logging
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
@app.route('/bad1')
def bad1():
name = request.args.get('name')
app.logger.info('User name: ' + name) # Bad
return 'bad1'
@app.route('/bad2')
def bad2():
name = request.args.get('name')
logging.info('User name: ' + name) # Bad
return 'bad2'
@app.route('/bad3')
def bad3():
name = request.args.get('name')
request_logger.warn('User name: ' + name) # Bad
return 'bad3'
@app.route('/bad4')
def bad4():
name = request.args.get('name')
logtest = logging.getLogger('test')
logtest.debug('User name: ' + name) # Bad
return 'bad4'
if __name__ == '__main__':
app.debug = True
handler = logging.FileHandler('log')
app.logger.addHandler(handler)
app.run()

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc Log Injection
"""
from flask import Flask
from flask import request
import logging
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
@app.route('/good1')
def good1():
name = request.args.get('name')
name = name.replace('\r\n','').replace('\n','')
logging.info('User name: ' + name) # Good
return 'good1'
if __name__ == '__main__':
app.debug = True
handler = logging.FileHandler('log')
app.logger.addHandler(handler)
app.run()

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc ip address spoofing
"""
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/bad1')
def bad1():
client_ip = request.headers.get('x-forwarded-for')
if not client_ip.startswith('192.168.'):
raise Exception('ip illegal')
return 'bad1'
@app.route('/bad2')
def bad2():
client_ip = request.headers.get('x-forwarded-for')
if not client_ip == '127.0.0.1':
raise Exception('ip illegal')
return 'bad2'
@app.route('/good1')
def good1():
client_ip = request.headers.get('x-forwarded-for')
client_ip = client_ip.split(',')[client_ip.split(',').length - 1]
if not client_ip == '127.0.0.1':
raise Exception('ip illegal')
return 'good1'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,35 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>An original client IP address is retrieved from an http header (<code>X-Forwarded-For</code> or <code>X-Real-IP</code> or <code>Proxy-Client-IP</code>
etc.), which is used to ensure security. Attackers can forge the value of these identifiers to
bypass a ban-list, for example.</p>
</overview>
<recommendation>
<p>Do not trust the values of HTTP headers allegedly identifying the originating IP. If you are aware your application will run behind some reverse proxies then the last entry of a <code>X-Forwarded-For</code> header value may be more trustworthy than the rest of it because some reverse proxies append the IP address they observed to the end of any remote-supplied header.</p>
</recommendation>
<example>
<p>The following examples show the bad case and the good case respectively.
In <code>bad1</code> method and <code>bad2</code> method, the client ip the <code>X-Forwarded-For</code> is split into comma-separated values, but the less-trustworthy first one is used. Both of these examples could be deceived by providing a forged HTTP header. The method
<code>good1</code> similarly splits an <code>X-Forwarded-For</code> value, but uses the last, more-trustworthy entry.</p>
<sample src="ClientSuppliedIpUsedInSecurityCheck.py" />
</example>
<references>
<li>Dennis Schneider: <a href="https://www.dennis-schneider.com/blog/prevent-ip-address-spoofing-with-x-forwarded-for-header-and-aws-elb-in-clojure-ring/">
Prevent IP address spoofing with X-Forwarded-For header when using AWS ELB and Clojure Ring</a>
</li>
<li>Security Rule Zero: <a href="https://www.f5.com/company/blog/security-rule-zero-a-warning-about-x-forwarded-for">A Warning about X-Forwarded-For</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,56 @@
/**
* @name IP address spoofing
* @description A remote endpoint identifier is read from an HTTP header. Attackers can modify the value
* of the identifier to forge the client ip.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/ip-address-spoofing
* @tags security
* external/cwe/cwe-348
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import ClientSuppliedIpUsedInSecurityCheckLib
import DataFlow::PathGraph
/**
* Taint-tracking configuration tracing flow from obtaining a client ip from an HTTP header to a sensitive use.
*/
class ClientSuppliedIpUsedInSecurityCheckConfig extends TaintTracking::Configuration {
ClientSuppliedIpUsedInSecurityCheckConfig() { this = "ClientSuppliedIpUsedInSecurityCheckConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof ClientSuppliedIpUsedInSecurityCheck
}
override predicate isSink(DataFlow::Node sink) { sink instanceof PossibleSecurityCheck }
override predicate isAdditionalTaintStep(DataFlow::Node pred, DataFlow::Node succ) {
exists(DataFlow::CallCfgNode ccn |
ccn = API::moduleImport("netaddr").getMember("IPAddress").getACall() and
ccn.getArg(0) = pred and
ccn = succ
)
}
override predicate isSanitizer(DataFlow::Node node) {
// `client_supplied_ip.split(",")[n]` for `n` > 0
exists(Subscript ss |
not ss.getIndex().(IntegerLiteral).getText() = "0" and
ss.getObject().(Call).getFunc().(Attribute).getName() = "split" and
ss.getObject().(Call).getAnArg().(StrConst).getText() = "," and
ss = node.asExpr()
)
}
}
from
ClientSuppliedIpUsedInSecurityCheckConfig config, DataFlow::PathNode source,
DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "IP address spoofing might include code from $@.",
source.getNode(), "this user input"

View File

@@ -0,0 +1,152 @@
private import python
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
/**
* A data flow source of the client ip obtained according to the remote endpoint identifier specified
* (`X-Forwarded-For`, `X-Real-IP`, `Proxy-Client-IP`, etc.) in the header.
*
* For example: `request.headers.get("X-Forwarded-For")`.
*/
abstract class ClientSuppliedIpUsedInSecurityCheck extends DataFlow::CallCfgNode { }
private class FlaskClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
FlaskClientSuppliedIpUsedInSecurityCheck() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "flask.request" and this.getFunction() = get
|
// `get` is a call to request.headers.get or request.headers.get_all or request.headers.getlist
// request.headers
get.getObject()
.(DataFlow::AttrRead)
// request
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() in ["get", "get_all", "getlist"] and
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
)
}
}
private class DjangoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
DjangoClientSuppliedIpUsedInSecurityCheck() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "django.http.request.HttpRequest" and this.getFunction() = get
|
// `get` is a call to request.headers.get or request.META.get
// request.headers
get.getObject()
.(DataFlow::AttrRead)
// request
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() = "get" and
get.getObject().(DataFlow::AttrRead).getAttributeName() in ["headers", "META"] and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
)
}
}
private class TornadoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck {
TornadoClientSuppliedIpUsedInSecurityCheck() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "tornado.web.RequestHandler" and this.getFunction() = get
|
// `get` is a call to `rfs`.request.headers.get
// `rfs`.request.headers
get.getObject()
.(DataFlow::AttrRead)
// `rfs`.request
.getObject()
.(DataFlow::AttrRead)
// `rfs`
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() in ["get", "get_list"] and
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
)
}
}
private string clientIpParameterName() {
result in [
"x-forwarded-for", "x_forwarded_for", "x-real-ip", "x_real_ip", "proxy-client-ip",
"proxy_client_ip", "wl-proxy-client-ip", "wl_proxy_client_ip", "http_x_forwarded_for",
"http-x-forwarded-for", "http_x_forwarded", "http_x_cluster_client_ip", "http_client_ip",
"http_forwarded_for", "http_forwarded", "http_via", "remote_addr"
]
}
/** A data flow sink for ip address forgery vulnerabilities. */
abstract class PossibleSecurityCheck extends DataFlow::Node { }
/** A data flow sink for sql operation. */
private class SqlOperationAsSecurityCheck extends PossibleSecurityCheck {
SqlOperationAsSecurityCheck() { this = any(SqlExecution e).getSql() }
}
/**
* A data flow sink for remote client ip comparison.
*
* For example: `if not ipAddr.startswith('192.168.') : ...` determine whether the client ip starts
* with `192.168.`, and the program can be deceived by forging the ip address.
*/
private class CompareSink extends PossibleSecurityCheck {
CompareSink() {
exists(Call call |
call.getFunc().(Attribute).getName() = "startswith" and
call.getArg(0).(StrConst).getText().regexpMatch(getIpAddressRegex()) and
not call.getArg(0).(StrConst).getText() = "0:0:0:0:0:0:0:1" and
call.getFunc().(Attribute).getObject() = this.asExpr()
)
or
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq
) and
(
compare.getLeft() = this.asExpr() and
compare.getComparator(0).(StrConst).getText() instanceof PrivateHostName and
not compare.getComparator(0).(StrConst).getText() = "0:0:0:0:0:0:0:1"
or
compare.getComparator(0) = this.asExpr() and
compare.getLeft().(StrConst).getText() instanceof PrivateHostName and
not compare.getLeft().(StrConst).getText() = "0:0:0:0:0:0:0:1"
)
)
or
exists(Compare compare |
(
compare.getOp(0) instanceof In or
compare.getOp(0) instanceof NotIn
) and
(
compare.getLeft() = this.asExpr()
or
compare.getComparator(0) = this.asExpr() and
not compare.getLeft().(StrConst).getText() in ["%", ",", "."]
)
)
}
}
string getIpAddressRegex() {
result =
"^((10\\.((1\\d{2})?|(2[0-4]\\d)?|(25[0-5])?|([1-9]\\d|[0-9])?)(\\.)?)|(192\\.168\\.)|172\\.(1[6789]|2[0-9]|3[01])\\.)((1\\d{2})?|(2[0-4]\\d)?|(25[0-5])?|([1-9]\\d|[0-9])?)(\\.)?((1\\d{2})?|(2[0-4]\\d)?|(25[0-5])?|([1-9]\\d|[0-9])?)$"
}
/**
* A string matching private host names of IPv4 and IPv6, which only matches the host portion therefore checking for port is not necessary.
* Several examples are localhost, reserved IPv4 IP addresses including 127.0.0.1, 10.x.x.x, 172.16.x,x, 192.168.x,x, and reserved IPv6 addresses including [0:0:0:0:0:0:0:1] and [::1]
*/
private class PrivateHostName extends string {
bindingset[this]
PrivateHostName() {
this.regexpMatch("(?i)localhost(?:[:/?#].*)?|127\\.0\\.0\\.1(?:[:/?#].*)?|10(?:\\.[0-9]+){3}(?:[:/?#].*)?|172\\.16(?:\\.[0-9]+){2}(?:[:/?#].*)?|192.168(?:\\.[0-9]+){2}(?:[:/?#].*)?|\\[?0:0:0:0:0:0:0:1\\]?(?:[:/?#].*)?|\\[?::1\\]?(?:[:/?#].*)?")
}
}

View File

@@ -1,45 +0,0 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>
Constructing a regular expression with unsanitized user input is dangerous as a malicious user may
be able to modify the meaning of the expression. In particular, such a user may be able to provide
a regular expression fragment that takes exponential time in the worst case, and use that to
perform a Denial of Service attack.
</p>
</overview>
<recommendation>
<p>
Before embedding user input into a regular expression, use a sanitization function such as
<code>re.escape</code> to escape meta-characters that have a special meaning regarding
regular expressions' syntax.
</p>
</recommendation>
<example>
<p>
The following examples are based on a simple Flask web server environment.
</p>
<p>
The following example shows a HTTP request parameter that is used to construct a regular expression
without sanitizing it first:
</p>
<sample src="re_bad.py" />
<p>
Instead, the request parameter should be sanitized first, for example using the function
<code>re.escape</code>. This ensures that the user cannot insert characters which have a
special meaning in regular expressions.
</p>
<sample src="re_good.py" />
</example>
<references>
<li>OWASP: <a href="https://www.owasp.org/index.php/Regular_expression_Denial_of_Service_-_ReDoS">Regular expression Denial of Service - ReDoS</a>.</li>
<li>Wikipedia: <a href="https://en.wikipedia.org/wiki/ReDoS">ReDoS</a>.</li>
<li>Python docs: <a href="https://docs.python.org/3/library/re.html">re</a>.</li>
<li>SonarSource: <a href="https://rules.sonarsource.com/python/type/Vulnerability/RSPEC-2631">RSPEC-2631</a>.</li>
</references>
</qhelp>

View File

@@ -1,29 +0,0 @@
/**
* @name Regular expression injection
* @description User input should not be used in regular expressions without first being escaped,
* otherwise a malicious user may be able to inject an expression that could require
* exponential time on certain inputs.
* @kind path-problem
* @problem.severity error
* @id py/regex-injection
* @tags security
* external/cwe/cwe-730
* external/cwe/cwe-400
*/
// determine precision above
import python
import experimental.semmle.python.security.injection.RegexInjection
import DataFlow::PathGraph
from
RegexInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink,
RegexInjectionSink regexInjectionSink, Attribute methodAttribute
where
config.hasFlowPath(source, sink) and
regexInjectionSink = sink.getNode() and
methodAttribute = regexInjectionSink.getRegexMethod()
select sink.getNode(), source, sink,
"$@ regular expression is constructed from a $@ and executed by $@.", sink.getNode(), "This",
source.getNode(), "user-provided value", methodAttribute,
regexInjectionSink.getRegexModule() + "." + methodAttribute.getName()

View File

@@ -1,15 +0,0 @@
from flask import request, Flask
import re
@app.route("/direct")
def direct():
unsafe_pattern = request.args["pattern"]
re.search(unsafe_pattern, "")
@app.route("/compile")
def compile():
unsafe_pattern = request.args["pattern"]
compiled_pattern = re.compile(unsafe_pattern)
compiled_pattern.search("")

View File

@@ -1,17 +0,0 @@
from flask import request, Flask
import re
@app.route("/direct")
def direct():
unsafe_pattern = request.args['pattern']
safe_pattern = re.escape(unsafe_pattern)
re.search(safe_pattern, "")
@app.route("/compile")
def compile():
unsafe_pattern = request.args['pattern']
safe_pattern = re.escape(unsafe_pattern)
compiled_pattern = re.compile(safe_pattern)
compiled_pattern.search("")

View File

@@ -14,71 +14,34 @@ private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.dataflow.new.TaintTracking
private import experimental.semmle.python.Frameworks
/** Provides classes for modeling Regular Expression-related APIs. */
module RegexExecution {
/** Provides classes for modeling log related APIs. */
module LogOutput {
/**
* A data-flow node that executes a regular expression.
* A data flow node for log output.
*
* Extend this class to model new APIs. If you want to refine existing API models,
* extend `RegexExecution` instead.
* extend `LogOutput` instead.
*/
abstract class Range extends DataFlow::Node {
/**
* Gets the argument containing the executed expression.
* Get the parameter value of the log output function.
*/
abstract DataFlow::Node getRegexNode();
/**
* Gets the library used to execute the regular expression.
*/
abstract string getRegexModule();
abstract DataFlow::Node getAnInput();
}
}
/**
* A data-flow node that executes a regular expression.
* A data flow node for log output.
*
* Extend this class to refine existing API models. If you want to model new APIs,
* extend `RegexExecution::Range` instead.
* extend `LogOutput::Range` instead.
*/
class RegexExecution extends DataFlow::Node {
RegexExecution::Range range;
class LogOutput extends DataFlow::Node {
LogOutput::Range range;
RegexExecution() { this = range }
LogOutput() { this = range }
DataFlow::Node getRegexNode() { result = range.getRegexNode() }
string getRegexModule() { result = range.getRegexModule() }
}
/** Provides classes for modeling Regular Expression escape-related APIs. */
module RegexEscape {
/**
* A data-flow node that escapes a regular expression.
*
* Extend this class to model new APIs. If you want to refine existing API models,
* extend `RegexEscape` instead.
*/
abstract class Range extends DataFlow::Node {
/**
* Gets the argument containing the escaped expression.
*/
abstract DataFlow::Node getRegexNode();
}
}
/**
* A data-flow node that escapes a regular expression.
*
* Extend this class to refine existing API models. If you want to model new APIs,
* extend `RegexEscape::Range` instead.
*/
class RegexEscape extends DataFlow::Node {
RegexEscape::Range range;
RegexEscape() { this = range }
DataFlow::Node getRegexNode() { result = range.getRegexNode() }
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
/** Provides classes for modeling XML parsing APIs. */
@@ -388,3 +351,46 @@ class NoSQLSanitizer extends DataFlow::Node {
/** Gets the argument that specifies the NoSQL query to be sanitized. */
DataFlow::Node getAnInput() { result = range.getAnInput() }
}
/** Provides classes for modeling HTTP Header APIs. */
module HeaderDeclaration {
/**
* A data-flow node that collects functions setting HTTP Headers.
*
* Extend this class to model new APIs. If you want to refine existing API models,
* extend `HeaderDeclaration` instead.
*/
abstract class Range extends DataFlow::Node {
/**
* Gets the argument containing the header name.
*/
abstract DataFlow::Node getNameArg();
/**
* Gets the argument containing the header value.
*/
abstract DataFlow::Node getValueArg();
}
}
/**
* A data-flow node that collects functions setting HTTP Headers.
*
* Extend this class to refine existing API models. If you want to model new APIs,
* extend `HeaderDeclaration::Range` instead.
*/
class HeaderDeclaration extends DataFlow::Node {
HeaderDeclaration::Range range;
HeaderDeclaration() { this = range }
/**
* Gets the argument containing the header name.
*/
DataFlow::Node getNameArg() { result = range.getNameArg() }
/**
* Gets the argument containing the header value.
*/
DataFlow::Node getValueArg() { result = range.getValueArg() }
}

View File

@@ -4,5 +4,9 @@
private import experimental.semmle.python.frameworks.Stdlib
private import experimental.semmle.python.frameworks.XML
private import experimental.semmle.python.frameworks.Flask
private import experimental.semmle.python.frameworks.Django
private import experimental.semmle.python.frameworks.Werkzeug
private import experimental.semmle.python.frameworks.LDAP
private import experimental.semmle.python.frameworks.NoSQL
private import experimental.semmle.python.frameworks.Log

View File

@@ -0,0 +1,93 @@
/**
* Provides classes modeling security-relevant aspects of the `django` PyPI package.
* See https://www.djangoproject.com/.
*/
private import python
private import semmle.python.frameworks.Django
private import semmle.python.dataflow.new.DataFlow
private import experimental.semmle.python.Concepts
private import semmle.python.ApiGraphs
import semmle.python.dataflow.new.RemoteFlowSources
private module PrivateDjango {
private module django {
API::Node http() { result = API::moduleImport("django").getMember("http") }
module http {
API::Node response() { result = http().getMember("response") }
API::Node request() { result = http().getMember("request") }
module request {
module HttpRequest {
class DjangoGETParameter extends DataFlow::Node, RemoteFlowSource::Range {
DjangoGETParameter() { this = request().getMember("GET").getMember("get").getACall() }
override string getSourceType() { result = "django.http.request.GET.get" }
}
}
}
module response {
module HttpResponse {
API::Node baseClassRef() {
result = response().getMember("HttpResponse").getReturn()
or
// Handle `django.http.HttpResponse` alias
result = http().getMember("HttpResponse").getReturn()
}
/** Gets a reference to a header instance. */
private DataFlow::LocalSourceNode headerInstance(DataFlow::TypeTracker t) {
t.start() and
(
exists(SubscriptNode subscript |
subscript.getObject() = baseClassRef().getAUse().asCfgNode() and
result.asCfgNode() = subscript
)
or
result.(DataFlow::AttrRead).getObject() = baseClassRef().getAUse()
)
or
exists(DataFlow::TypeTracker t2 | result = headerInstance(t2).track(t2, t))
}
/** Gets a reference to a header instance use. */
private DataFlow::Node headerInstance() {
headerInstance(DataFlow::TypeTracker::end()).flowsTo(result)
}
/** Gets a reference to a header instance call with `__setitem__`. */
private DataFlow::Node headerSetItemCall() {
result = headerInstance() and
result.(DataFlow::AttrRead).getAttributeName() = "__setitem__"
}
class DjangoResponseSetItemCall extends DataFlow::CallCfgNode, HeaderDeclaration::Range {
DjangoResponseSetItemCall() { this.getFunction() = headerSetItemCall() }
override DataFlow::Node getNameArg() { result = this.getArg(0) }
override DataFlow::Node getValueArg() { result = this.getArg(1) }
}
class DjangoResponseDefinition extends DataFlow::Node, HeaderDeclaration::Range {
DataFlow::Node headerInput;
DjangoResponseDefinition() {
this.asCfgNode().(DefinitionNode) = headerInstance().asCfgNode() and
headerInput.asCfgNode() = this.asCfgNode().(DefinitionNode).getValue()
}
override DataFlow::Node getNameArg() {
result.asExpr() = this.asExpr().(Subscript).getIndex()
}
override DataFlow::Node getValueArg() { result = headerInput }
}
}
}
}
}
}

View File

@@ -0,0 +1,84 @@
/**
* Provides classes modeling security-relevant aspects of the `flask` PyPI package.
* See https://flask.palletsprojects.com/en/1.1.x/.
*/
private import python
private import semmle.python.frameworks.Flask
private import semmle.python.dataflow.new.DataFlow
private import experimental.semmle.python.Concepts
private import semmle.python.ApiGraphs
module ExperimentalFlask {
/**
* A reference to either `flask.make_response` function, or the `make_response` method on
* an instance of `flask.Flask`. This creates an instance of the `flask_response`
* class (class-attribute on a flask application), which by default is
* `flask.Response`.
*
* See
* - https://flask.palletsprojects.com/en/1.1.x/api/#flask.Flask.make_response
* - https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response
*/
private API::Node flaskMakeResponse() {
result =
[API::moduleImport("flask"), Flask::FlaskApp::instance()]
.getMember(["make_response", "jsonify", "make_default_options_response"])
}
/** Gets a reference to a header instance. */
private DataFlow::LocalSourceNode headerInstance(DataFlow::TypeTracker t) {
t.start() and
result.(DataFlow::AttrRead).getObject().getALocalSource() =
[Flask::Response::classRef(), flaskMakeResponse()].getReturn().getAUse()
or
exists(DataFlow::TypeTracker t2 | result = headerInstance(t2).track(t2, t))
}
/** Gets a reference to a header instance use. */
private DataFlow::Node headerInstance() {
headerInstance(DataFlow::TypeTracker::end()).flowsTo(result)
}
/** Gets a reference to a header instance call/subscript */
private DataFlow::Node headerInstanceCall() {
headerInstance() in [result.(DataFlow::AttrRead), result.(DataFlow::AttrRead).getObject()] or
headerInstance().asExpr() = result.asExpr().(Subscript).getObject()
}
class FlaskHeaderDefinition extends DataFlow::Node, HeaderDeclaration::Range {
DataFlow::Node headerInput;
FlaskHeaderDefinition() {
this.asCfgNode().(DefinitionNode) = headerInstanceCall().asCfgNode() and
headerInput.asCfgNode() = this.asCfgNode().(DefinitionNode).getValue()
}
override DataFlow::Node getNameArg() { result.asExpr() = this.asExpr().(Subscript).getIndex() }
override DataFlow::Node getValueArg() { result = headerInput }
}
private class FlaskMakeResponseExtend extends DataFlow::CallCfgNode, HeaderDeclaration::Range {
KeyValuePair item;
FlaskMakeResponseExtend() {
this.getFunction() = headerInstanceCall() and
item = this.getArg(_).asExpr().(Dict).getAnItem()
}
override DataFlow::Node getNameArg() { result.asExpr() = item.getKey() }
override DataFlow::Node getValueArg() { result.asExpr() = item.getValue() }
}
private class FlaskResponse extends DataFlow::CallCfgNode, HeaderDeclaration::Range {
KeyValuePair item;
FlaskResponse() { this = Flask::Response::classRef().getACall() }
override DataFlow::Node getNameArg() { result.asExpr() = item.getKey() }
override DataFlow::Node getValueArg() { result.asExpr() = item.getValue() }
}
}

View File

@@ -0,0 +1,118 @@
/**
* Provides classes modeling security-relevant aspects of the log libraries.
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.RemoteFlowSources
private import experimental.semmle.python.Concepts
private import semmle.python.frameworks.Flask
private import semmle.python.ApiGraphs
/**
* Provides models for Python's log-related libraries.
*/
private module log {
/**
* Log output method list.
*
* See https://docs.python.org/3/library/logging.html#logger-objects
*/
private class LogOutputMethods extends string {
LogOutputMethods() {
this in ["info", "error", "warn", "warning", "debug", "critical", "exception", "log"]
}
}
/**
* The class used to find the log output method of the `logging` module.
*
* See `LogOutputMethods`
*/
private class LoggingCall extends DataFlow::CallCfgNode, LogOutput::Range {
LoggingCall() {
this = API::moduleImport("logging").getMember(any(LogOutputMethods m)).getACall()
}
override DataFlow::Node getAnInput() {
this.getFunction().(DataFlow::AttrRead).getAttributeName() != "log" and
result in [this.getArg(_), this.getArgByName(_)] // this includes the arg named "msg"
or
this.getFunction().(DataFlow::AttrRead).getAttributeName() = "log" and
result in [this.getArg(any(int i | i > 0)), this.getArgByName(any(string s | s != "level"))]
}
}
/**
* The class used to find log output methods related to the `logging.getLogger` instance.
*
* See `LogOutputMethods`
*/
private class LoggerCall extends DataFlow::CallCfgNode, LogOutput::Range {
LoggerCall() {
this =
API::moduleImport("logging")
.getMember("getLogger")
.getReturn()
.getMember(any(LogOutputMethods m))
.getACall()
}
override DataFlow::Node getAnInput() {
this.getFunction().(DataFlow::AttrRead).getAttributeName() != "log" and
result in [this.getArg(_), this.getArgByName(_)] // this includes the arg named "msg"
or
this.getFunction().(DataFlow::AttrRead).getAttributeName() = "log" and
result in [this.getArg(any(int i | i > 0)), this.getArgByName(any(string s | s != "level"))]
}
}
/**
* The class used to find the relevant log output method of the `flask.Flask.logger` instance (flask application).
*
* See `LogOutputMethods`
*/
private class FlaskLoggingCall extends DataFlow::CallCfgNode, LogOutput::Range {
FlaskLoggingCall() {
this =
Flask::FlaskApp::instance()
.getMember("logger")
.getMember(any(LogOutputMethods m))
.getACall()
}
override DataFlow::Node getAnInput() {
this.getFunction().(DataFlow::AttrRead).getAttributeName() != "log" and
result in [this.getArg(_), this.getArgByName(_)] // this includes the arg named "msg"
or
this.getFunction().(DataFlow::AttrRead).getAttributeName() = "log" and
result in [this.getArg(any(int i | i > 0)), this.getArgByName(any(string s | s != "level"))]
}
}
/**
* The class used to find the relevant log output method of the `django.utils.log.request_logger` instance (django application).
*
* See `LogOutputMethods`
*/
private class DjangoLoggingCall extends DataFlow::CallCfgNode, LogOutput::Range {
DjangoLoggingCall() {
this =
API::moduleImport("django")
.getMember("utils")
.getMember("log")
.getMember("request_logger")
.getMember(any(LogOutputMethods m))
.getACall()
}
override DataFlow::Node getAnInput() {
this.getFunction().(DataFlow::AttrRead).getAttributeName() != "log" and
result in [this.getArg(_), this.getArgByName(_)] // this includes the arg named "msg"
or
this.getFunction().(DataFlow::AttrRead).getAttributeName() = "log" and
result in [this.getArg(any(int i | i > 0)), this.getArgByName(any(string s | s != "level"))]
}
}
}

View File

@@ -9,91 +9,3 @@ private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.RemoteFlowSources
private import experimental.semmle.python.Concepts
private import semmle.python.ApiGraphs
/**
* Provides models for Python's `re` library.
*
* See https://docs.python.org/3/library/re.html
*/
private module Re {
/**
* List of `re` methods immediately executing an expression.
*
* See https://docs.python.org/3/library/re.html#module-contents
*/
private class RegexExecutionMethods extends string {
RegexExecutionMethods() {
this in ["match", "fullmatch", "search", "split", "findall", "finditer", "sub", "subn"]
}
}
/**
* A class to find `re` methods immediately executing an expression.
*
* See `RegexExecutionMethods`
*/
private class DirectRegex extends DataFlow::CallCfgNode, RegexExecution::Range {
DataFlow::Node regexNode;
DirectRegex() {
this = API::moduleImport("re").getMember(any(RegexExecutionMethods m)).getACall() and
regexNode = this.getArg(0)
}
override DataFlow::Node getRegexNode() { result = regexNode }
override string getRegexModule() { result = "re" }
}
/**
* A class to find `re` methods immediately executing a compiled expression by `re.compile`.
*
* Given the following example:
*
* ```py
* pattern = re.compile(input)
* pattern.match(s)
* ```
*
* This class will identify that `re.compile` compiles `input` and afterwards
* executes `re`'s `match`. As a result, `this` will refer to `pattern.match(s)`
* and `this.getRegexNode()` will return the node for `input` (`re.compile`'s first argument)
*
*
* See `RegexExecutionMethods`
*
* See https://docs.python.org/3/library/re.html#regular-expression-objects
*/
private class CompiledRegex extends DataFlow::MethodCallNode, RegexExecution::Range {
DataFlow::Node regexNode;
CompiledRegex() {
exists(DataFlow::MethodCallNode patternCall |
patternCall = API::moduleImport("re").getMember("compile").getACall() and
patternCall.flowsTo(this.getObject()) and
this.getMethodName() instanceof RegexExecutionMethods and
regexNode = patternCall.getArg(0)
)
}
override DataFlow::Node getRegexNode() { result = regexNode }
override string getRegexModule() { result = "re" }
}
/**
* A class to find `re` methods escaping an expression.
*
* See https://docs.python.org/3/library/re.html#re.escape
*/
class ReEscape extends DataFlow::CallCfgNode, RegexEscape::Range {
DataFlow::Node regexNode;
ReEscape() {
this = API::moduleImport("re").getMember("escape").getACall() and
regexNode = this.getArg(0)
}
override DataFlow::Node getRegexNode() { result = regexNode }
}
}

View File

@@ -0,0 +1,33 @@
/**
* Provides classes modeling security-relevant aspects of the `Werkzeug` PyPI package.
* See
* - https://pypi.org/project/Werkzeug/
* - https://werkzeug.palletsprojects.com/en/1.0.x/#werkzeug
*/
private import python
private import semmle.python.frameworks.Flask
private import semmle.python.dataflow.new.DataFlow
private import experimental.semmle.python.Concepts
private import semmle.python.ApiGraphs
private module Werkzeug {
module datastructures {
module Headers {
class WerkzeugHeaderAddCall extends DataFlow::CallCfgNode, HeaderDeclaration::Range {
WerkzeugHeaderAddCall() {
this.getFunction().(DataFlow::AttrRead).getObject().getALocalSource() =
API::moduleImport("werkzeug")
.getMember("datastructures")
.getMember("Headers")
.getACall() and
this.getFunction().(DataFlow::AttrRead).getAttributeName() = "add"
}
override DataFlow::Node getNameArg() { result = this.getArg(0) }
override DataFlow::Node getValueArg() { result = this.getArg(1) }
}
}
}
}

View File

@@ -0,0 +1,20 @@
import python
import experimental.semmle.python.Concepts
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
/**
* A taint-tracking configuration for detecting HTTP Header injections.
*/
class HeaderInjectionFlowConfig extends TaintTracking::Configuration {
HeaderInjectionFlowConfig() { this = "HeaderInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) {
exists(HeaderDeclaration headerDeclaration |
sink in [headerDeclaration.getNameArg(), headerDeclaration.getValueArg()]
)
}
}

View File

@@ -0,0 +1,24 @@
import python
import semmle.python.Concepts
import experimental.semmle.python.Concepts
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
/**
* A taint-tracking configuration for tracking untrusted user input used in log entries.
*/
class LogInjectionFlowConfig extends TaintTracking::Configuration {
LogInjectionFlowConfig() { this = "LogInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink = any(LogOutput logoutput).getAnInput() }
override predicate isSanitizer(DataFlow::Node node) {
exists(CallNode call |
node.asCfgNode() = call.getFunction().(AttrNode).getObject("replace") and
call.getArg(0).getNode().(StrConst).getText() in ["\r\n", "\n"]
)
}
}

View File

@@ -1,53 +0,0 @@
/**
* Provides a taint-tracking configuration for detecting regular expression injection
* vulnerabilities.
*/
import python
import experimental.semmle.python.Concepts
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
/**
* A class to find methods executing regular expressions.
*
* See `RegexExecution`
*/
class RegexInjectionSink extends DataFlow::Node {
string regexModule;
Attribute regexMethod;
RegexInjectionSink() {
exists(RegexExecution reExec |
this = reExec.getRegexNode() and
regexModule = reExec.getRegexModule() and
regexMethod = reExec.(DataFlow::CallCfgNode).getFunction().asExpr().(Attribute)
)
}
/**
* Gets the argument containing the executed expression.
*/
string getRegexModule() { result = regexModule }
/**
* Gets the method used to execute the regular expression.
*/
Attribute getRegexMethod() { result = regexMethod }
}
/**
* A taint-tracking configuration for detecting regular expression injections.
*/
class RegexInjectionFlowConfig extends TaintTracking::Configuration {
RegexInjectionFlowConfig() { this = "RegexInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof RegexInjectionSink }
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = any(RegexEscape reEscape).getRegexNode()
}
}