Merge branch 'main' into main

This commit is contained in:
Raul Garcia
2023-03-29 20:27:03 -07:00
committed by GitHub
512 changed files with 20623 additions and 5243 deletions

View File

@@ -11,16 +11,57 @@
*/
import python
import IsComparisons
from Compare comp, Cmpop op, ClassValue c, string alt
where
invalid_portable_is_comparison(comp, op, c) and
not cpython_interned_constant(comp.getASubExpression()) and
(
op instanceof Is and alt = "=="
/** Holds if the comparison `comp` uses `is` or `is not` (represented as `op`) to compare its `left` and `right` arguments. */
predicate comparison_using_is(Compare comp, ControlFlowNode left, Cmpop op, ControlFlowNode right) {
exists(CompareNode fcomp | fcomp = comp.getAFlowNode() |
fcomp.operands(left, op, right) and
(op instanceof Is or op instanceof IsNot)
)
}
private predicate cpython_interned_value(Expr e) {
exists(string text | text = e.(StrConst).getText() |
text.length() = 0
or
op instanceof IsNot and alt = "!="
text.length() = 1 and text.regexpMatch("[U+0000-U+00ff]")
)
or
exists(int i | i = e.(IntegerLiteral).getN().toInt() | -5 <= i and i <= 256)
or
exists(Tuple t | t = e and not exists(t.getAnElt()))
}
predicate uninterned_literal(Expr e) {
(
e instanceof StrConst
or
e instanceof IntegerLiteral
or
e instanceof FloatLiteral
or
e instanceof Dict
or
e instanceof List
or
e instanceof Tuple
) and
not cpython_interned_value(e)
}
from Compare comp, Cmpop op, string alt
where
exists(ControlFlowNode left, ControlFlowNode right |
comparison_using_is(comp, left, op, right) and
(
op instanceof Is and alt = "=="
or
op instanceof IsNot and alt = "!="
)
|
uninterned_literal(left.getNode())
or
uninterned_literal(right.getNode())
)
select comp,
"Values compared using '" + op.getSymbol() +

View File

@@ -4,23 +4,18 @@ import TlsLibraryModel
/**
* Configuration to determine the state of a context being used to create
* a connection. There is one configuration for each pair of `TlsLibrary` and `ProtocolVersion`,
* such that a single configuration only tracks contexts where a specific `ProtocolVersion` is allowed.
* a connection. The configuration uses a flow state to track the `TlsLibrary`
* and the insecure `ProtocolVersion`s that are allowed.
*
* The state is in terms of whether a specific protocol is allowed. This is
* either true or false when the context is created and can then be modified
* later by either restricting or unrestricting the protocol (see the predicates
* `isRestriction` and `isUnrestriction`).
* later by either restricting or unrestricting the protocol (see the predicate
* `isAdditionalFlowStep`).
*
* Since we are interested in the final state, we want the flow to start from
* the last unrestriction, so we disallow flow into unrestrictions. We also
* model the creation as an unrestriction of everything it allows, to account
* for the common case where the creation plays the role of "last unrestriction".
*
* Since we really want "the last unrestriction, not nullified by a restriction",
* we also disallow flow into restrictions.
* The state is represented as a bit vector, where each bit corresponds to a
* protocol version. The bit is set if the protocol is allowed.
*/
module InsecureContextConfiguration2 implements DataFlow::StateConfigSig {
module InsecureContextConfiguration implements DataFlow::StateConfigSig {
private newtype TFlowState =
TMkFlowState(TlsLibrary library, int bits) {
bits in [0 .. max(any(ProtocolVersion v).getBit()) * 2 - 1]
@@ -61,9 +56,14 @@ module InsecureContextConfiguration2 implements DataFlow::StateConfigSig {
}
predicate isSource(DataFlow::Node source, FlowState state) {
exists(ProtocolFamily family |
source = state.getLibrary().unspecific_context_creation(family) and
state.getBits() = family.getBits()
exists(ContextCreation creation | source = creation |
creation = state.getLibrary().unspecific_context_creation() and
state.getBits() =
sum(ProtocolVersion version |
version = creation.getProtocol() and version.isInsecure()
|
version.getBit()
)
)
}
@@ -112,7 +112,7 @@ module InsecureContextConfiguration2 implements DataFlow::StateConfigSig {
}
}
private module InsecureContextFlow = DataFlow::GlobalWithState<InsecureContextConfiguration2>;
private module InsecureContextFlow = DataFlow::GlobalWithState<InsecureContextConfiguration>;
/**
* Holds if `conectionCreation` marks the creation of a connection based on the contex

View File

@@ -12,14 +12,14 @@ class PyOpenSslContextCreation extends ContextCreation, DataFlow::CallCfgNode {
this = API::moduleImport("OpenSSL").getMember("SSL").getMember("Context").getACall()
}
override string getProtocol() {
override ProtocolVersion getProtocol() {
exists(DataFlow::Node protocolArg, PyOpenSsl pyo |
protocolArg in [this.getArg(0), this.getArgByName("method")]
|
protocolArg in [
pyo.specific_version(result).getAValueReachableFromSource(),
pyo.unspecific_version(result).getAValueReachableFromSource()
]
protocolArg = pyo.specific_version(result).getAValueReachableFromSource()
or
protocolArg = pyo.unspecific_version().getAValueReachableFromSource() and
result = any(ProtocolVersion pv)
)
}
}
@@ -51,19 +51,23 @@ class SetOptionsCall extends ProtocolRestriction, DataFlow::CallCfgNode {
}
}
class UnspecificPyOpenSslContextCreation extends PyOpenSslContextCreation, UnspecificContextCreation
{
// UnspecificPyOpenSslContextCreation() { library instanceof PyOpenSsl }
}
class PyOpenSsl extends TlsLibrary {
PyOpenSsl() { this = "pyOpenSSL" }
override string specific_version_name(ProtocolVersion version) { result = version + "_METHOD" }
override string unspecific_version_name(ProtocolFamily family) {
// `"TLS_METHOD"` is not actually available in pyOpenSSL yet, but should be coming soon..
result = family + "_METHOD"
override string unspecific_version_name() {
// See
// - https://www.pyopenssl.org/en/23.0.0/api/ssl.html#module-OpenSSL.SSL
// - https://www.openssl.org/docs/manmaster/man3/DTLS_server_method.html#NOTES
//
// PyOpenSSL also allows DTLS
// see https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context
// although they are not mentioned here:
// https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.TLS_METHOD
result = ["TLS", "SSLv23"] + "_METHOD"
or
result = "TLS_" + ["CLIENT", "SERVER"] + "_METHOD"
}
override API::Node version_constants() { result = API::moduleImport("OpenSSL").getMember("SSL") }
@@ -80,7 +84,5 @@ class PyOpenSsl extends TlsLibrary {
override ProtocolRestriction protocol_restriction() { result instanceof SetOptionsCall }
override ProtocolUnrestriction protocol_unrestriction() {
result instanceof UnspecificPyOpenSslContextCreation
}
override ProtocolUnrestriction protocol_unrestriction() { none() }
}

View File

@@ -10,20 +10,21 @@ import TlsLibraryModel
class SslContextCreation extends ContextCreation, DataFlow::CallCfgNode {
SslContextCreation() { this = API::moduleImport("ssl").getMember("SSLContext").getACall() }
override string getProtocol() {
override ProtocolVersion getProtocol() {
exists(DataFlow::Node protocolArg, Ssl ssl |
protocolArg in [this.getArg(0), this.getArgByName("protocol")]
|
protocolArg =
[
ssl.specific_version(result).getAValueReachableFromSource(),
ssl.unspecific_version(result).getAValueReachableFromSource()
]
protocolArg = ssl.specific_version(result).getAValueReachableFromSource()
or
protocolArg = ssl.unspecific_version().getAValueReachableFromSource() and
// see https://docs.python.org/3/library/ssl.html#id7
result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
)
or
not exists(this.getArg(_)) and
not exists(this.getArgByName(_)) and
result = "TLS"
// see https://docs.python.org/3/library/ssl.html#id7
result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
}
@@ -34,7 +35,7 @@ class SslDefaultContextCreation extends ContextCreation {
// Allowed insecure versions are "TLSv1" and "TLSv1_1"
// see https://docs.python.org/3/library/ssl.html#context-creation
override string getProtocol() { result = "TLS" }
override ProtocolVersion getProtocol() { result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"] }
}
/** Gets a reference to an `ssl.Context` instance. */
@@ -161,33 +162,29 @@ class ContextSetVersion extends ProtocolRestriction, ProtocolUnrestriction, Data
}
}
class UnspecificSslContextCreation extends SslContextCreation, UnspecificContextCreation {
// UnspecificSslContextCreation() { library instanceof Ssl }
// override ProtocolVersion getUnrestriction() {
// result = UnspecificContextCreation.super.getUnrestriction() and
// // These are turned off by default since Python 3.6
// // see https://docs.python.org/3.6/library/ssl.html#ssl.SSLContext
// not result in ["SSLv2", "SSLv3"]
// }
}
class UnspecificSslDefaultContextCreation extends SslDefaultContextCreation {
// override DataFlow::Node getContext() { result = this }
// // see https://docs.python.org/3/library/ssl.html#ssl.create_default_context
// override ProtocolVersion getUnrestriction() {
// result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
// }
}
// class UnspecificSslContextCreation extends SslContextCreation, UnspecificContextCreation {
// // UnspecificSslContextCreation() { library instanceof Ssl }
// override ProtocolVersion getProtocol() {
// result = UnspecificContextCreation.super.getProtocol() and
// // These are turned off by default since Python 3.6
// // see https://docs.python.org/3.6/library/ssl.html#ssl.SSLContext
// not result in ["SSLv2", "SSLv3"]
// }
// }
// class UnspecificSslDefaultContextCreation extends SslDefaultContextCreation {
// // override DataFlow::Node getContext() { result = this }
// // see https://docs.python.org/3/library/ssl.html#ssl.create_default_context
// override ProtocolVersion getProtocol() { result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"] }
// }
class Ssl extends TlsLibrary {
Ssl() { this = "ssl" }
override string specific_version_name(ProtocolVersion version) { result = "PROTOCOL_" + version }
override string unspecific_version_name(ProtocolFamily family) {
family = "SSLv23" and result = "PROTOCOL_" + family
override string unspecific_version_name() {
result = "PROTOCOL_SSLv23"
or
family = "TLS" and result = "PROTOCOL_" + family + ["", "_CLIENT", "_SERVER"]
result = "PROTOCOL_TLS" + ["", "_CLIENT", "_SERVER"]
}
override API::Node version_constants() { result = API::moduleImport("ssl") }
@@ -217,9 +214,5 @@ class Ssl extends TlsLibrary {
result instanceof OptionsAugAndNot
or
result instanceof ContextSetVersion
or
result instanceof UnspecificSslContextCreation
or
result instanceof UnspecificSslDefaultContextCreation
}
}

View File

@@ -37,29 +37,23 @@ class ProtocolVersion extends string {
or
this = "TLSv1_3" and result = 32
}
/** Gets the protocol family for this protocol version. */
ProtocolFamily getFamily() {
result = "SSLv23" and this in ["SSLv2", "SSLv3"]
or
result = "TLS" and this in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
}
/** An unspecific protocol version */
class ProtocolFamily extends string {
ProtocolFamily() { this in ["SSLv23", "TLS"] }
/** Gets the bit mask for this protocol family. */
int getBits() {
result = sum(ProtocolVersion version | version.getFamily() = this | version.getBit())
}
}
/** The creation of a context. */
abstract class ContextCreation extends DataFlow::Node {
/** Gets the protocol version or family for this context. */
abstract string getProtocol();
/**
* Gets the protocol version for this context.
* There can be multiple values if the context was created
* using a non-specific version such as `TLS`.
*/
abstract ProtocolVersion getProtocol();
/**
* Holds if the context was created with a specific version
* rather than with a version flexible method, see:
* https://www.openssl.org/docs/manmaster/man3/DTLS_server_method.html#NOTES
*/
predicate specificVersion() { count(this.getProtocol()) = 1 }
}
/** The creation of a connection from a context. */
@@ -91,13 +85,12 @@ abstract class ProtocolUnrestriction extends DataFlow::Node {
* This also serves as unrestricting these protocols.
*/
abstract class UnspecificContextCreation extends ContextCreation {
// override ProtocolVersion getUnrestriction() {
// // There is only one family, the two names are aliases in OpenSSL.
// // see https://github.com/openssl/openssl/blob/13888e797c5a3193e91d71e5f5a196a2d68d266f/include/openssl/ssl.h.in#L1953-L1955
// family in ["SSLv23", "TLS"] and
// // see https://docs.python.org/3/library/ssl.html#ssl-contexts
// result in ["SSLv2", "SSLv3", "TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
// }
override ProtocolVersion getProtocol() {
// There is only one family, the two names are aliases in OpenSSL.
// see https://github.com/openssl/openssl/blob/13888e797c5a3193e91d71e5f5a196a2d68d266f/include/openssl/ssl.h.in#L1953-L1955
// see https://docs.python.org/3/library/ssl.html#ssl-contexts
result in ["SSLv2", "SSLv3", "TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
}
/** A model of a SSL/TLS library. */
@@ -108,8 +101,8 @@ abstract class TlsLibrary extends string {
/** Gets the name of a specific protocol version. */
abstract string specific_version_name(ProtocolVersion version);
/** Gets a name, which is a member of `version_constants`, that can be used to specify the protocol family `family`. */
abstract string unspecific_version_name(ProtocolFamily family);
/** Gets a name, which is a member of `version_constants`, that can be used to specify the entire protocol family. */
abstract string unspecific_version_name();
/** Gets an API node representing the module or class holding the version constants. */
abstract API::Node version_constants();
@@ -119,9 +112,9 @@ abstract class TlsLibrary extends string {
result = this.version_constants().getMember(this.specific_version_name(version))
}
/** Gets an API node representing the protocol family `family`. */
API::Node unspecific_version(ProtocolFamily family) {
result = this.version_constants().getMember(this.unspecific_version_name(family))
/** Gets an API node representing the protocol entire family. */
API::Node unspecific_version() {
result = this.version_constants().getMember(this.unspecific_version_name())
}
/** Gets a creation of a context with a default protocol. */
@@ -133,14 +126,15 @@ abstract class TlsLibrary extends string {
/** Gets a creation of a context with a specific protocol version, known to be insecure. */
ContextCreation insecure_context_creation(ProtocolVersion version) {
result in [this.specific_context_creation(), this.default_context_creation()] and
result.specificVersion() and
result.getProtocol() = version and
version.isInsecure()
}
/** Gets a context that was created using `family`, known to have insecure instances. */
ContextCreation unspecific_context_creation(ProtocolFamily family) {
ContextCreation unspecific_context_creation() {
result in [this.specific_context_creation(), this.default_context_creation()] and
result.getProtocol() = family
not result.specificVersion()
}
/** Gets a dataflow node representing a connection being created in an insecure manner, not from a context. */

View File

@@ -0,0 +1,4 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<include src="TimingAttackAgainstHash.qhelp" />
</qhelp>

View File

@@ -0,0 +1,37 @@
/**
* @name Timing attack against Hash
* @description When checking a Hash over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid Hash for an arbitrary message
* by running a timing attack if they can send to the validation procedure.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/possible-timing-attack-against-hash
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to equality test
*/
class PossibleTimingAttackAgainstHash extends TaintTracking::Configuration {
PossibleTimingAttackAgainstHash() { this = "PossibleTimingAttackAgainstHash" }
override predicate isSource(DataFlow::Node source) { source instanceof ProduceCryptoCall }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from PossibleTimingAttackAgainstHash config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Possible Timing attack against $@ validation.",
source.getNode().(ProduceCryptoCall).getResultType(), "message"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack Against Hash
"""
import hmac
import hashlib
key = "e179017a-62b0-4996-8a38-e91aa9f1"
msg = "Test"
def sign(pre_key, imsg, alg):
return hmac.new(pre_key, imsg, alg).digest()
def verify(msg, sig):
return hmac.compare_digest(sig, sign(key, msg, hashlib.sha256)) #good

View File

@@ -0,0 +1,55 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
Timing Attack is based on the leakage of information by studying how long it takes the system to respond to different inputs.
it can be circumvented by using a constant-time algorithm for checking the value of Hash,
more precisely, the comparison time should not depend on the content of the input. Otherwise the attacker gains
information that is indirectly leaked by the application. This information may then be used for malicious purposes.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the value of Hash.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating a Hash.
</p>
<sample src="UnSafeComparisonOfHash.py" />
<p>
The next example use a safe constant-time algorithm for validating a Hash:
</p>
<sample src="SafeComparisonOfHash.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
<li>
HMAC:
<a href="https://datatracker.ietf.org/doc/html/rfc2104.html">RFC 2104</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,38 @@
/**
* @name Timing attack against Hash
* @description When checking a Hash over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid Hash for an arbitrary message
* by running a timing attack if they can send to the validation procedure.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/timing-attack-against-hash
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to Equality test.
*/
class TimingAttackAgainsthash extends TaintTracking::Configuration {
TimingAttackAgainsthash() { this = "TimingAttackAgainsthash" }
override predicate isSource(DataFlow::Node source) { source instanceof ProduceCryptoCall }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from TimingAttackAgainsthash config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink) and
sink.getNode().(NonConstantTimeComparisonSink).includesUserInput()
select sink.getNode(), source, sink, "Timing attack against $@ validation.",
source.getNode().(ProduceCryptoCall).getResultType(), "message"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack Against Hash
"""
import hmac
import hashlib
key = "e179017a-62b0-4996-8a38-e91aa9f1"
msg = "Test"
def sign(pre_key, imsg, alg):
return hmac.new(pre_key, imsg, alg).digest()
def verify(msg, sig):
return sig == sign(key, msg, hashlib.sha256) #bad

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack against header value
"""
from flask import Flask
from flask import request
import hmac
@app.route('/good')
def good():
secret = request.headers.get('X-Auth-Token')
if not hmac.compare_digest(secret, "token"):
raise Exception('bad token')
return 'good'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,50 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
A constant-time algorithm should be used for checking the value of sensitive headers.
In other words, the comparison time should not depend on the content of the input.
Otherwise timing information could be used to infer the header's expected, secret value.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the secret value.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating the value of sensitive headers.
</p>
<sample src="UnsafeComparisonOfHeaderValue.py" />
<p>
The next example use a safe constant-time algorithm for validating the value of sensitive headers:
</p>
<sample src="SafeComparisonOfHeaderValue.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against header value
* @description Use of a non-constant-time verification routine to check the value of an HTTP header,
* possibly allowing a timing attack to infer the header's expected value.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-against-header-value
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from a client Secret obtained by an HTTP header to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof ClientSuppliedSecret }
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink) and not sink.getNode().(CompareSink).flowtolen()
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack against header value
"""
from flask import Flask
from flask import request
@app.route('/bad')
def bad():
secret = request.headers.get('X-Auth-Token')
if secret == "token":
raise Exception('bad token')
return 'bad'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,4 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<include src="TimingAttackAgainstSensitiveInfo.qhelp" />
</qhelp>

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against secret
* @description Use of a non-constant-time verification routine to check the value of an secret,
* possibly allowing a timing attack to retrieve sensitive information.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/possible-timing-attack-sensitive-info
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from obtaining a client Secret to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof SecretSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack sensitive info
"""
from flask import Flask
from flask import request
import hmac
app = Flask(__name__)
@app.route('/bad', methods = ['POST', 'GET'])
def bad():
if request.method == 'POST':
password = request.form['pwd']
return hmac.compare_digest(password, "1234")
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,52 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
Timing Attack is based on the leakage of information of secret parameters by studying
how long it takes the system to respond to different inputs.
it can be circumvented by using a constant-time algorithm for checking the value of sensitive info,
more precisely, the comparison time should not depend on the content of the input. Otherwise the attacker gains
information that is indirectly leaked by the application. This information is then used for malicious purposes.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the value of sensitive info.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating a secret.
</p>
<sample src="UnSafeComparisonOfSensitiveInfo.py" />
<p>
The next example use a safe constant-time algorithm for validating a secret:
</p>
<sample src="SafeComparisonOfSensitiveInfo.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,39 @@
/**
* @name Timing attack against secret
* @description Use of a non-constant-time verification routine to check the value of an secret,
* possibly allowing a timing attack to retrieve sensitive information.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/timing-attack-sensitive-info
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from obtaining a client Secret to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof SecretSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink) and
(
source.getNode().(SecretSource).includesUserInput() or
sink.getNode().(NonConstantTimeComparisonSink).includesUserInput()
)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack against sensitive info
"""
from flask import Flask
from flask import request
@app.route('/bad', methods = ['POST', 'GET'])
def bad():
if request.method == 'POST':
password = request.form['pwd']
return password == "test"
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,350 @@
private import python
private import semmle.python.dataflow.new.TaintTracking2
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.DataFlow2
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.frameworks.Flask
private import semmle.python.frameworks.Django
/** A method call that produces cryptographic result. */
abstract class ProduceCryptoCall extends API::CallNode {
/** Gets a type of cryptographic operation such as MAC, signature, Hash or ciphertext. */
abstract string getResultType();
}
/** Gets a reference to the `cryptography.hazmat.primitives` module. */
API::Node cryptographylib() {
result = API::moduleImport("cryptography").getMember("hazmat").getMember("primitives")
}
/** Gets a reference to the `Crypto` module. */
API::Node cryptodome() { result = API::moduleImport(["Crypto", "Cryptodome"]) }
/** A method call that produces a MAC. */
class ProduceMacCall extends ProduceCryptoCall {
ProduceMacCall() {
this = API::moduleImport("hmac").getMember("digest").getACall() or
this =
API::moduleImport("hmac")
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptodome()
.getMember("Hash")
.getMember("HMAC")
.getMember(["new", "HMAC"])
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptographylib()
.getMember("hmac")
.getMember("HMAC")
.getReturn()
.getMember("finalize")
.getACall() or
this =
cryptographylib()
.getMember("cmac")
.getMember("CMAC")
.getReturn()
.getMember("finalize")
.getACall() or
this =
cryptodome()
.getMember("Hash")
.getMember("CMAC")
.getMember(["new", "CMAC"])
.getMember(["digest", "hexdigest"])
.getACall()
}
override string getResultType() { result = "MAC" }
}
/** A method call that produces a signature. */
private class ProduceSignatureCall extends ProduceCryptoCall {
ProduceSignatureCall() {
this =
cryptodome()
.getMember("Signature")
.getMember(["DSS", "pkcs1_15", "pss", "eddsa"])
.getMember("new")
.getReturn()
.getMember("sign")
.getACall()
}
override string getResultType() { result = "signature" }
}
private string hashalgo() {
result = ["sha1", "sha224", "sha256", "sha384", "sha512", "blake2b", "blake2s", "md5"]
}
/** A method call that produces a Hash. */
private class ProduceHashCall extends ProduceCryptoCall {
ProduceHashCall() {
this =
cryptographylib()
.getMember("hashes")
.getMember("Hash")
.getReturn()
.getMember("finalize")
.getACall() or
this =
API::moduleImport("hashlib")
.getMember(["new", hashalgo()])
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptodome()
.getMember(hashalgo())
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall()
}
override string getResultType() { result = "Hash" }
}
/** A method call that produces a ciphertext. */
private class ProduceCiphertextCall extends ProduceCryptoCall {
ProduceCiphertextCall() {
this =
cryptodome()
.getMember("Cipher")
.getMember(["DES", "DES3", "ARC2", "ARC4", "Blowfish", "PKCS1_v1_5"])
.getMember(["ARC4Cipher", "new", "PKCS115_Cipher"])
.getMember("encrypt")
.getACall() or
this =
cryptographylib()
.getMember("ciphers")
.getMember("Cipher")
.getReturn()
.getMember("finalize")
.getACall()
}
override string getResultType() { result = "ciphertext" }
}
/** A data flow sink for comparison. */
private predicate existsFailFastCheck(Expr firstInput, Expr secondInput) {
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq or
compare.getOp(0) instanceof In or
compare.getOp(0) instanceof NotIn
) and
(
compare.getLeft() = firstInput and
compare.getComparator(0) = secondInput and
not compare.getAComparator() instanceof None
or
compare.getLeft() = secondInput and
compare.getComparator(0) = firstInput and
not compare.getAComparator() instanceof None
)
)
}
/** A sink that compares input using fail fast check. */
class NonConstantTimeComparisonSink extends DataFlow::Node {
Expr anotherParameter;
NonConstantTimeComparisonSink() { existsFailFastCheck(this.asExpr(), anotherParameter) }
/** Holds if remote user input was used in the comparison. */
predicate includesUserInput() {
exists(UserInputInComparisonConfig config |
config.hasFlowTo(DataFlow2::exprNode(anotherParameter))
)
}
}
/** A data flow source of the secret obtained. */
class SecretSource extends DataFlow::Node {
CredentialExpr secret;
SecretSource() { secret = this.asExpr() }
/** Holds if the secret was deliverd by remote user. */
predicate includesUserInput() {
exists(UserInputSecretConfig config | config.hasFlowTo(DataFlow2::exprNode(secret)))
}
}
/** A string for `match` that identifies strings that look like they represent secret data. */
private string suspicious() {
result =
[
"%password%", "%passwd%", "%pwd%", "%refresh%token%", "%secret%token", "%secret%key",
"%passcode%", "%passphrase%", "%token%", "%secret%", "%credential%", "%userpass%", "%digest%",
"%signature%", "%mac%"
]
}
/** A variable that may hold sensitive information, judging by its name. * */
class CredentialExpr extends Expr {
CredentialExpr() {
exists(Variable v | this = v.getAnAccess() | v.getId().toLowerCase().matches(suspicious()))
}
}
/**
* A data flow source of the client Secret obtained according to the remote endpoint identifier specified
* (`X-auth-token`, `proxy-authorization`, `X-Csrf-Header`, etc.) in the header.
*
* For example: `request.headers.get("X-Auth-Token")`.
*/
abstract class ClientSuppliedSecret extends DataFlow::CallCfgNode { }
private class FlaskClientSuppliedSecret extends ClientSuppliedSecret {
FlaskClientSuppliedSecret() {
this = Flask::request().getMember("headers").getMember(["get", "get_all", "getlist"]).getACall() and
[this.getArg(0), this.getArgByName(["key", "name"])].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
private class DjangoClientSuppliedSecret extends ClientSuppliedSecret {
DjangoClientSuppliedSecret() {
this =
PrivateDjango::DjangoImpl::DjangoHttp::Request::HttpRequest::classRef()
.getMember(["headers", "META"])
.getMember("get")
.getACall() and
[this.getArg(0), this.getArgByName("key")].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** Gets a reference to the `tornado.web.RequestHandler` module. */
API::Node requesthandler() {
result = API::moduleImport("tornado").getMember("web").getMember("RequestHandler")
}
private class TornadoClientSuppliedSecret extends ClientSuppliedSecret {
TornadoClientSuppliedSecret() {
this = requesthandler().getMember(["headers", "META"]).getMember("get").getACall() and
[this.getArg(0), this.getArgByName("key")].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** Gets a reference to the `werkzeug.datastructures.Headers` module. */
API::Node headers() {
result = API::moduleImport("werkzeug").getMember("datastructures").getMember("Headers")
}
private class WerkzeugClientSuppliedSecret extends ClientSuppliedSecret {
WerkzeugClientSuppliedSecret() {
this =
headers().getMember(["headers", "META"]).getMember(["get", "get_all", "getlist"]).getACall() and
[this.getArg(0), this.getArgByName(["key", "name"])].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** A string for `match` that identifies strings that look like they represent Sensitive Headers. */
private string sensitiveheaders() {
result =
[
"x-auth-token", "x-csrf-token", "http_x_csrf_token", "x-csrf-param", "x-csrf-header",
"http_x_csrf_token", "x-api-key", "authorization", "proxy-authorization", "x-gitlab-token",
"www-authenticate"
]
}
/**
* A config that tracks data flow from remote user input to Variable that hold sensitive info
*/
class UserInputSecretConfig extends TaintTracking::Configuration {
UserInputSecretConfig() { this = "UserInputSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink.asExpr() instanceof CredentialExpr }
}
/**
* A config that tracks data flow from remote user input to Equality test
*/
class UserInputInComparisonConfig extends TaintTracking2::Configuration {
UserInputInComparisonConfig() { this = "UserInputInComparisonConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) {
exists(Compare cmp, Expr left, Expr right, Cmpop cmpop |
cmpop.getSymbol() = ["==", "in", "is not", "!="] and
cmp.compares(left, cmpop, right) and
sink.asExpr() = [left, right]
)
}
}
/**
* A configuration tracing flow from a client Secret obtained by an HTTP header to a len() function.
*/
private class ExcludeLenFunc extends TaintTracking2::Configuration {
ExcludeLenFunc() { this = "ExcludeLenFunc" }
override predicate isSource(DataFlow::Node source) { source instanceof ClientSuppliedSecret }
override predicate isSink(DataFlow::Node sink) {
exists(Call call |
call.getFunc().(Name).getId() = "len" and
sink.asExpr() = call.getArg(0)
)
}
}
/**
* Holds if there is a fast-fail check.
*/
class CompareSink extends DataFlow::Node {
CompareSink() {
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq
) and
(
compare.getLeft() = this.asExpr() and
not compare.getComparator(0).(StrConst).getText() = "bearer"
or
compare.getComparator(0) = this.asExpr() and
not compare.getLeft().(StrConst).getText() = "bearer"
)
)
or
exists(Compare compare |
compare.getOp(0) instanceof IsNot and
(
compare.getLeft() = this.asExpr() and
not compare.getComparator(0) instanceof None
or
compare.getComparator(0) = this.asExpr() and
not compare.getLeft() instanceof None
)
)
}
/**
* Holds if there is a flow to len().
*/
predicate flowtolen() {
exists(ExcludeLenFunc config, DataFlow2::PathNode source, DataFlow2::PathNode sink |
config.hasFlowPath(source, sink)
)
}
}