Python: Timing attack

This commit is contained in:
root
2022-06-27 12:18:45 -04:00
parent 861a368734
commit 655b9d4262
17 changed files with 552 additions and 0 deletions

View File

@@ -0,0 +1,55 @@
/**
* @name Timing attack against digest validation
* @description When checking a signature over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid digest for an arbitrary message
* by running a timing attack if they can send to the validation procedure
* both the message and the signature.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-against-signature
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to equality test
*/
class PossibleTimingAttackAgainstSignature extends TaintTracking::Configuration {
PossibleTimingAttackAgainstSignature() { this = "PossibleTimingAttackAgainstSignature" }
override predicate isSource(DataFlow::Node source) {
source = API::moduleImport("hmac").getMember("digest").getACall() or
source =
API::moduleImport("hmac")
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
source =
API::moduleImport("hashlib")
.getMember([
"new", "sha1", "sha224", "sha256", "sha384", "sha512", "blake2b", "blake2s", "md5"
])
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall()
}
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}
from PossibleTimingAttackAgainstSignature config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Possible Timing attack against $@ validation.", source,
source.getNode()

View File

@@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack
"""
import hmac
from flask import Flask
from flask import request
import hashlib
key = "e179017a-62b0-4996-8a38-e91aa9f1"
@app.route('/good1')
def good1():
Secret = request.headers.get('X-Auth-Token')
if not hmac.compare_digest(Secret, "token"):
raise Exception('bad token')
return 'good1'
@app.route('/good2')
def check_credentials(password):
return hmac.compare_digest(password, "token")
def sign(pre_key, msg, alg):
return hmac.new(pre_key, msg, alg).digest()
def verify(msg, sig):
return hmac.compare_digest(sig, sign(key, msg, hashlib.sha256)) #good
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,52 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
Timing Attack is based on the leakage of information of secret parameters by studying
how long it takes the system to respond to different inputs.
it can be circumvented by using a constant-time algorithm for checking the value of sensitive info,
more precisely, the comparison time should not depend on the content of the input. Otherwise the attacker gains
information that is indirectly leaked by the application. This information is then used for malicious purposes,
such as guessing the password of a user.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the value of sensitive info.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating a secret.
</p>
<sample src="UnsafeComparison.py" />
<p>
The next example use a safe constant-time algorithm for validating a secret:
</p>
<sample src="SafeComparison.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,168 @@
private import python
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.TaintTracking2
private import semmle.python.dataflow.new.TaintTracking3
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.RemoteFlowSources
/** A data flow sink for comparison. */
class CompareSink extends DataFlow::Node {
CompareSink() {
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq or
compare.getOp(0) instanceof In or
compare.getOp(0) instanceof NotIn
) and
(
compare.getLeft() = this.asExpr()
or
compare.getComparator(0) = this.asExpr()
)
)
}
}
/** A string for `match` that identifies strings that look like they represent secret data. */
private string suspicious() {
result =
[
"%password%", "%passwd%", "%pwd%", "%refresh%token%", "%secret%token", "%secret%key",
"%passcode%", "%passphrase%", "%token%", "%secret%", "%credential%", "%key%", "%UserPass%"
]
}
/** A variable that may hold sensitive information, judging by its name. * */
class CredentialExpr extends Expr {
CredentialExpr() {
exists(Variable v | this = v.getAnAccess() | v.getId().toLowerCase().matches(suspicious()))
}
}
/**
* A data flow source of the client Secret obtained according to the remote endpoint identifier specified
* (`X-auth-token`, `proxy-authorization`, `X-Csrf-Header`, etc.) in the header.
*
* For example: `request.headers.get("X-Auth-Token")`.
*/
abstract class ClientSuppliedsecret extends DataFlow::CallCfgNode { }
private class FlaskClientSuppliedsecret extends ClientSuppliedsecret {
FlaskClientSuppliedsecret() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "flask.request" and this.getFunction() = get
|
// `get` is a call to request.headers.get or request.headers.get_all or request.headers.getlist
// request.headers
get.getObject()
.(DataFlow::AttrRead)
// request
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() in ["get", "get_all", "getlist"] and
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = sensitiveheaders()
)
}
}
private class DjangoClientSuppliedsecret extends ClientSuppliedsecret {
DjangoClientSuppliedsecret() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "django.http.request.HttpRequest" and this.getFunction() = get
|
// `get` is a call to request.headers.get or request.META.get
// request.headers
get.getObject()
.(DataFlow::AttrRead)
// request
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() = "get" and
get.getObject().(DataFlow::AttrRead).getAttributeName() in ["headers", "META"] and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = sensitiveheaders()
)
}
}
private class TornadoClientSuppliedsecret extends ClientSuppliedsecret {
TornadoClientSuppliedsecret() {
exists(RemoteFlowSource rfs, DataFlow::AttrRead get |
rfs.getSourceType() = "tornado.web.RequestHandler" and this.getFunction() = get
|
// `get` is a call to `rfs`.request.headers.get
// `rfs`.request.headers
get.getObject()
.(DataFlow::AttrRead)
// `rfs`.request
.getObject()
.(DataFlow::AttrRead)
// `rfs`
.getObject()
.getALocalSource() = rfs and
get.getAttributeName() in ["get", "get_list"] and
get.getObject().(DataFlow::AttrRead).getAttributeName() = "headers" and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = sensitiveheaders()
)
}
}
/** A string for `match` that identifies strings that look like they represent Sensitive Headers. */
private string sensitiveheaders() {
result =
[
"x-auth-token", "x-csrf-token", "http_x_csrf_token", "x-csrf-param", "x-csrf-header",
"http_x_csrf_token", "x-api-key", "authorization", "proxy-authorization"
]
}
/**
* A config that tracks data flow from remote user input to cryptographic operations
*/
private class UserInputMsgConfig extends TaintTracking::Configuration {
UserInputMsgConfig() { this = "UserInputMsgConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("hmac").getMember("digest").getACall() or
sink =
API::moduleImport("hmac")
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
sink =
API::moduleImport("hashlib")
.getMember([
"new", "sha1", "sha224", "sha256", "sha384", "sha512", "blake2b", "blake2s", "md5"
])
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall()
}
}
/**
* A config that tracks data flow from remote user input to Variable that hold sensitive info
*/
class UserInputSecretConfig extends TaintTracking2::Configuration {
UserInputSecretConfig() { this = "UserInputSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink.asExpr() instanceof CredentialExpr }
}
/**
* A config that tracks data flow from remote user input to Equality test
*/
class UserInputInComparisonConfig extends TaintTracking3::Configuration {
UserInputInComparisonConfig() { this = "UserInputInComparisonConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against header value
* @description Use of a non-constant-time verification routine to check the value of an HTTP header,
* possibly allowing a timing attack to infer the header's expected value.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-against-header-value
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import TimingAttack
import DataFlow::PathGraph
/**
* Taint-tracking configuration tracing flow from obtaining a client Secret from an HTTP header to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof ClientSuppliedsecret }
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against secret
* @description Use of a non-constant-time verification routine to check the value of an secret,
* possibly allowing a timing attack to retrieve sensitive information.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-sensitive-info
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import TimingAttack
import DataFlow::PathGraph
/**
* Taint-tracking configuration tracing flow from obtaining a client Secret to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source.asExpr() instanceof CredentialExpr }
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,38 @@
/**
* @name Timing attack against digest validation
* @description When checking a signature over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid digest for an arbitrary message
* by running a timing attack if they can send to the validation procedure
* both the message and the signature.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-against-signature
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to Equality test.
*/
class TimingAttackAgainstsignature extends TaintTracking::Configuration {
TimingAttackAgainstsignature() { this = "TimingAttackAgainstsignature" }
override predicate isSource(DataFlow::Node source) { source instanceof UserInputMsgConfig }
override predicate isSink(DataFlow::Node sink) { sink instanceof UserInputInComparisonConfig }
}
from TimingAttackAgainstsignature config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source,
source.getNode()

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack
"""
from flask import Flask
from flask import request
import hmac
app = Flask(__name__)
key = "e179017a-62b0-4996-8a38-e91aa9f1"
@app.route('/bad1')
def bad1():
Secret = request.headers.get('X-Auth-Token')
if not Secret == "key":
raise Exception('bad token')
return 'bad1'
@app.route('/bad2')
def check_credentials(password):
return password == "token"
def sign(pre_key, msg, alg):
return hmac.new(pre_key, msg, alg).digest()
def verify(msg, sig):
return sig == sign(key, msg, hashlib.sha256) #bad
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,8 @@
edges
| TimingAttackAgainstSignature.py:13:12:13:47 | ControlFlowNode for Attribute() | TimingAttackAgainstSignature.py:19:19:19:48 | ControlFlowNode for sign() |
nodes
| TimingAttackAgainstSignature.py:13:12:13:47 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| TimingAttackAgainstSignature.py:19:19:19:48 | ControlFlowNode for sign() | semmle.label | ControlFlowNode for sign() |
subpaths
#select
| TimingAttackAgainstSignature.py:19:19:19:48 | ControlFlowNode for sign() | TimingAttackAgainstSignature.py:13:12:13:47 | ControlFlowNode for Attribute() | TimingAttackAgainstSignature.py:19:19:19:48 | ControlFlowNode for sign() | Possible Timing attack against $@ validation. | TimingAttackAgainstSignature.py:13:12:13:47 | ControlFlowNode for Attribute() | TimingAttackAgainstSignature.py:13:12:13:47 | ControlFlowNode for Attribute() |

View File

@@ -0,0 +1 @@
experimental/Security/CWE/CWE-208/PossibleTimingAttackAgainstSignature.ql

View File

@@ -0,0 +1,6 @@
edges
nodes
| TimingAttackAgainstHeader.py:14:12:14:46 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
subpaths
#select
| TimingAttackAgainstHeader.py:14:12:14:46 | ControlFlowNode for Attribute() | TimingAttackAgainstHeader.py:14:12:14:46 | ControlFlowNode for Attribute() | TimingAttackAgainstHeader.py:14:12:14:46 | ControlFlowNode for Attribute() | Timing attack against $@ validation. | TimingAttackAgainstHeader.py:14:12:14:46 | ControlFlowNode for Attribute() | client-supplied token |

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc Timing Attack Against Header
"""
from flask import Flask
from flask import request
from django.utils.crypto import constant_time_compare
app = Flask(__name__)
@app.route('/bad')
def bad1():
if not request.headers.get('X-Auth-Token') == "key":
raise Exception('bad token')
return 'bad1'
@app.route('/good')
def good1():
if not constant_time_compare(Secret, "token"):
raise Exception('bad token')
return 'good1'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1 @@
experimental/Security/CWE/CWE-208/TimingAttackAgainstHeader.ql

View File

@@ -0,0 +1,6 @@
edges
nodes
| TimingAttackAgainstSensitiveInfo.py:15:16:15:23 | ControlFlowNode for password | semmle.label | ControlFlowNode for password |
subpaths
#select
| TimingAttackAgainstSensitiveInfo.py:15:16:15:23 | ControlFlowNode for password | TimingAttackAgainstSensitiveInfo.py:15:16:15:23 | ControlFlowNode for password | TimingAttackAgainstSensitiveInfo.py:15:16:15:23 | ControlFlowNode for password | Timing attack against $@ validation. | TimingAttackAgainstSensitiveInfo.py:15:16:15:23 | ControlFlowNode for password | client-supplied token |

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack against Secret
"""
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/bad')
def check_credentials():
if request.method == 'POST':
password = request.form['pwd']
return password == "token"
@app.route('/good')
def check_credentials(password):
if request.method == 'POST':
password = request.form['pwd']
return constant_time_string_compare(password, "token")
def constant_time_string_compare(a, b):
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1 @@
experimental/Security/CWE/CWE-208/TimingAttackAgainstSensitiveInfo.ql

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc Timing Attack Against Signature
"""
import hashlib
import hmac
from django.utils.crypto import constant_time_compare
key = "e179017a-62b0-4996-8a38-e91aa9f1"
def sign(pre_key, msg, alg):
return hmac.new(pre_key, msg, alg).digest()
def verify1(msg, sig):
return constant_time_string_compare(sig, sign(key, msg, hashlib.sha256)) #good
def verify2(msg, sig):
return sig == sign(key, msg, hashlib.sha256) #bad