Merge branch 'github:main' into amammad-python-paramiko

This commit is contained in:
amammad
2023-04-26 23:28:29 -07:00
committed by GitHub
6009 changed files with 460222 additions and 384939 deletions

View File

@@ -1,3 +1,27 @@
## 0.7.0
### Bug Fixes
* Nonlocal variables are excluded from alerts.
## 0.6.6
No user-facing changes.
## 0.6.5
### New Queries
* Added a new query, `py/shell-command-constructed-from-input`, to detect libraries that unsafely construct shell commands from their inputs.
## 0.6.4
No user-facing changes.
## 0.6.3
No user-facing changes.
## 0.6.2
No user-facing changes.

View File

@@ -11,16 +11,57 @@
*/
import python
import IsComparisons
from Compare comp, Cmpop op, ClassValue c, string alt
where
invalid_portable_is_comparison(comp, op, c) and
not cpython_interned_constant(comp.getASubExpression()) and
(
op instanceof Is and alt = "=="
/** Holds if the comparison `comp` uses `is` or `is not` (represented as `op`) to compare its `left` and `right` arguments. */
predicate comparison_using_is(Compare comp, ControlFlowNode left, Cmpop op, ControlFlowNode right) {
exists(CompareNode fcomp | fcomp = comp.getAFlowNode() |
fcomp.operands(left, op, right) and
(op instanceof Is or op instanceof IsNot)
)
}
private predicate cpython_interned_value(Expr e) {
exists(string text | text = e.(StrConst).getText() |
text.length() = 0
or
op instanceof IsNot and alt = "!="
text.length() = 1 and text.regexpMatch("[U+0000-U+00ff]")
)
or
exists(int i | i = e.(IntegerLiteral).getN().toInt() | -5 <= i and i <= 256)
or
exists(Tuple t | t = e and not exists(t.getAnElt()))
}
predicate uninterned_literal(Expr e) {
(
e instanceof StrConst
or
e instanceof IntegerLiteral
or
e instanceof FloatLiteral
or
e instanceof Dict
or
e instanceof List
or
e instanceof Tuple
) and
not cpython_interned_value(e)
}
from Compare comp, Cmpop op, string alt
where
exists(ControlFlowNode left, ControlFlowNode right |
comparison_using_is(comp, left, op, right) and
(
op instanceof Is and alt = "=="
or
op instanceof IsNot and alt = "!="
)
|
uninterned_literal(left.getNode())
or
uninterned_literal(right.getNode())
)
select comp,
"Values compared using '" + op.getSymbol() +

View File

@@ -1,48 +1,36 @@
/**
* Definitions for reasoning about untrusted data used in APIs defined outside the
* database.
* user-written code.
*/
import python
private import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.Concepts
import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.internal.DataFlowPrivate as DataFlowPrivate
private import semmle.python.dataflow.new.internal.TaintTrackingPrivate as TaintTrackingPrivate
private import semmle.python.types.Builtins
private import semmle.python.objects.ObjectInternal
// IMPLEMENTATION NOTES:
//
// This query uses *both* the new data-flow library, and points-to. Why? To get this
// finished quickly, so it can provide value for our field team and ourselves.
//
// In the long run, it should not need to use points-to for anything. Possibly this can
// even be helpful in figuring out what we need from TypeTrackers and the new data-flow
// library to be fully operational.
//
// At least it will allow us to provide a baseline comparison against a solution that
// doesn't use points-to at all
//
// There is a few dirty things we do here:
// 1. DataFlowPrivate: since `DataFlowCall` and `DataFlowCallable` are not exposed
// publicly, but we really want access to them.
// 2. points-to: we kinda need to do this since this is what powers `DataFlowCall` and
// `DataFlowCallable`
// 3. ObjectInternal: to provide better names for built-in functions and methods. If we
// really wanted to polish our points-to implementation, we could move this
// functionality into `BuiltinFunctionValue` and `BuiltinMethodValue`, but will
// probably require some more work: for this query, it's totally ok to use
// `builtins.open` for the code `open(f)`, but well, it requires a bit of thinking to
// figure out if that is desirable in general. I simply skipped a corner here!
// 4. TaintTrackingPrivate: Nothing else gives us access to `defaultAdditionalTaintStep` :(
/**
* A callable that is considered a "safe" external API from a security perspective.
* An external API that is considered "safe" from a security perspective.
*/
class SafeExternalApi extends Unit {
/** Gets a callable that is considered a "safe" external API from a security perspective. */
abstract DataFlowPrivate::DataFlowCallable getSafeCallable();
/**
* Gets a call that is considered "safe" from a security perspective. You can use API
* graphs to find calls to functions you know are safe.
*
* Which works even when the external library isn't extracted.
*/
abstract DataFlow::CallCfgNode getSafeCall();
/**
* Gets a callable that is considered a "safe" external API from a security
* perspective.
*
* You probably want to define this as `none()` and use `getSafeCall` instead, since
* that can handle the external library not being extracted.
*/
DataFlowPrivate::DataFlowCallable getSafeCallable() { none() }
}
/** DEPRECATED: Alias for SafeExternalApi */
@@ -50,42 +38,127 @@ deprecated class SafeExternalAPI = SafeExternalApi;
/** The default set of "safe" external APIs. */
private class DefaultSafeExternalApi extends SafeExternalApi {
override DataFlowPrivate::DataFlowCallable getSafeCallable() {
exists(CallableValue cv | cv = result.getCallableValue() |
cv = Value::named(["len", "isinstance", "getattr", "hasattr"])
or
exists(ClassValue cls, string attr |
cls = Value::named("dict") and attr in ["__getitem__", "__setitem__"]
|
cls.lookup(attr) = cv
)
override DataFlow::CallCfgNode getSafeCall() {
result =
API::builtin([
"len", "enumerate", "isinstance", "getattr", "hasattr", "bool", "float", "int", "repr",
"str", "type"
]).getACall()
}
}
/**
* Gets a human readable representation of `node`.
*
* Note that this is only defined for API nodes that are allowed as external APIs,
* so `None.json.dumps` will for example not be allowed.
*/
string apiNodeToStringRepr(API::Node node) {
node = API::builtin(result)
or
node = API::moduleImport(result)
or
exists(API::Node base, string basename |
base.getDepth() < node.getDepth() and
basename = apiNodeToStringRepr(base) and
not base = API::builtin(["None", "True", "False"])
|
exists(string m | node = base.getMember(m) | result = basename + "." + m)
or
node = base.getReturn() and
result = basename + "()" and
not base.getACall() = any(SafeExternalApi safe).getSafeCall()
or
node = base.getAwaited() and
result = basename
)
}
predicate resolvedCall(CallNode call) {
DataFlowPrivate::resolveCall(call, _, _) or
DataFlowPrivate::resolveClassCall(call, _)
}
newtype TInterestingExternalApiCall =
TUnresolvedCall(DataFlow::CallCfgNode call) {
exists(call.getLocation().getFile().getRelativePath()) and
not resolvedCall(call.getNode()) and
not call = any(SafeExternalApi safe).getSafeCall()
} or
TResolvedCall(DataFlowPrivate::DataFlowCall call) {
exists(call.getLocation().getFile().getRelativePath()) and
exists(call.getCallable()) and
not call.getCallable() = any(SafeExternalApi safe).getSafeCallable() and
// ignore calls inside codebase, and ignore calls that are marked as safe. This is
// only needed as long as we extract dependencies. When we stop doing that, all
// targets of resolved calls will be from user-written code.
not exists(call.getCallable().getLocation().getFile().getRelativePath()) and
not exists(DataFlow::CallCfgNode callCfgNode | callCfgNode.getNode() = call.getNode() |
any(SafeExternalApi safe).getSafeCall() = callCfgNode
)
}
abstract class InterestingExternalApiCall extends TInterestingExternalApiCall {
/** Gets the argument at position `apos`, if any */
abstract DataFlow::Node getArgument(DataFlowPrivate::ArgumentPosition apos);
/** Gets a textual representation of this element. */
abstract string toString();
/**
* Gets a human-readable name for the external API.
*/
abstract string getApiName();
}
class UnresolvedCall extends InterestingExternalApiCall, TUnresolvedCall {
DataFlow::CallCfgNode call;
UnresolvedCall() { this = TUnresolvedCall(call) }
override DataFlow::Node getArgument(DataFlowPrivate::ArgumentPosition apos) {
exists(int i | apos.isPositional(i) | result = call.getArg(i))
or
exists(string name | apos.isKeyword(name) | result = call.getArgByName(name))
}
override string toString() {
result = "ExternalAPI:UnresolvedCall: " + call.getNode().getNode().toString()
}
override string getApiName() {
exists(API::Node apiNode |
result = apiNodeToStringRepr(apiNode) and
apiNode.getACall() = call
)
}
}
class ResolvedCall extends InterestingExternalApiCall, TResolvedCall {
DataFlowPrivate::DataFlowCall dfCall;
ResolvedCall() { this = TResolvedCall(dfCall) }
override DataFlow::Node getArgument(DataFlowPrivate::ArgumentPosition apos) {
result = dfCall.getArgument(apos)
}
override string toString() {
result = "ExternalAPI:ResolvedCall: " + dfCall.getNode().getNode().toString()
}
override string getApiName() {
exists(DataFlow::CallCfgNode call, API::Node apiNode | dfCall.getNode() = call.getNode() |
result = apiNodeToStringRepr(apiNode) and
apiNode.getACall() = call
)
}
}
/** A node representing data being passed to an external API through a call. */
class ExternalApiDataNode extends DataFlow::Node {
DataFlowPrivate::DataFlowCallable callable;
int i;
ExternalApiDataNode() {
exists(DataFlowPrivate::DataFlowCall call |
exists(call.getLocation().getFile().getRelativePath())
|
callable = call.getCallable() and
// TODO: this ignores some complexity of keyword arguments (especially keyword-only args)
this = call.getArg(i)
) and
not any(SafeExternalApi safe).getSafeCallable() = callable and
exists(Value cv | cv = callable.getCallableValue() |
cv.isAbsent()
or
cv.isBuiltin()
or
cv.(CallableValue).getScope().getLocation().getFile().inStdlib()
or
not exists(cv.(CallableValue).getScope().getLocation().getFile().getRelativePath())
) and
exists(InterestingExternalApiCall call | this = call.getArgument(_)) and
// Not already modeled as a taint step
not TaintTrackingPrivate::defaultAdditionalTaintStep(this, _) and
// for `list.append(x)`, we have a additional taint step from x -> [post] list.
@@ -95,12 +168,6 @@ class ExternalApiDataNode extends DataFlow::Node {
TaintTrackingPrivate::defaultAdditionalTaintStep(_, post)
)
}
/** Gets the index for the parameter that will receive this untrusted data */
int getIndex() { result = i }
/** Gets the callable to which this argument is passed. */
DataFlowPrivate::DataFlowCallable getCallable() { result = callable }
}
/** DEPRECATED: Alias for ExternalApiDataNode */
@@ -133,19 +200,26 @@ deprecated class UntrustedExternalAPIDataNode = UntrustedExternalApiDataNode;
/** An external API which is used with untrusted data. */
private newtype TExternalApi =
/** An untrusted API method `m` where untrusted data is passed at `index`. */
TExternalApiParameter(DataFlowPrivate::DataFlowCallable callable, int index) {
exists(UntrustedExternalApiDataNode n |
callable = n.getCallable() and
index = n.getIndex()
MkExternalApi(string repr, DataFlowPrivate::ArgumentPosition apos) {
exists(UntrustedExternalApiDataNode ex, InterestingExternalApiCall call |
ex = call.getArgument(apos) and
repr = call.getApiName()
)
}
/** An external API which is used with untrusted data. */
class ExternalApiUsedWithUntrustedData extends TExternalApi {
/** A argument of an external API which is used with untrusted data. */
class ExternalApiUsedWithUntrustedData extends MkExternalApi {
string repr;
DataFlowPrivate::ArgumentPosition apos;
ExternalApiUsedWithUntrustedData() { this = MkExternalApi(repr, apos) }
/** Gets a possibly untrusted use of this external API. */
UntrustedExternalApiDataNode getUntrustedDataNode() {
this = TExternalApiParameter(result.getCallable(), result.getIndex())
exists(InterestingExternalApiCall call |
result = call.getArgument(apos) and
call.getApiName() = repr
)
}
/** Gets the number of untrusted sources used with this external API. */
@@ -154,63 +228,8 @@ class ExternalApiUsedWithUntrustedData extends TExternalApi {
}
/** Gets a textual representation of this element. */
string toString() {
exists(
DataFlowPrivate::DataFlowCallable callable, int index, string callableString,
string indexString
|
this = TExternalApiParameter(callable, index) and
indexString = "param " + index and
exists(CallableValue cv | cv = callable.getCallableValue() |
callableString =
cv.getScope().getEnclosingModule().getName() + "." + cv.getScope().getQualifiedName()
or
not exists(cv.getScope()) and
(
cv instanceof BuiltinFunctionValue and
callableString = pretty_builtin_function_value(cv)
or
cv instanceof BuiltinMethodValue and
callableString = pretty_builtin_method_value(cv)
or
not cv instanceof BuiltinFunctionValue and
not cv instanceof BuiltinMethodValue and
callableString = cv.toString()
)
) and
result = callableString + " [" + indexString + "]"
)
}
string toString() { result = repr + " [" + apos + "]" }
}
/** DEPRECATED: Alias for ExternalApiUsedWithUntrustedData */
deprecated class ExternalAPIUsedWithUntrustedData = ExternalApiUsedWithUntrustedData;
/** Gets the fully qualified name for the `BuiltinFunctionValue` bfv. */
private string pretty_builtin_function_value(BuiltinFunctionValue bfv) {
exists(Builtin b | b = bfv.(BuiltinFunctionObjectInternal).getBuiltin() |
result = prefix_with_module_if_found(b)
)
}
/** Gets the fully qualified name for the `BuiltinMethodValue` bmv. */
private string pretty_builtin_method_value(BuiltinMethodValue bmv) {
exists(Builtin b | b = bmv.(BuiltinMethodObjectInternal).getBuiltin() |
exists(Builtin cls | cls.isClass() and cls.getMember(b.getName()) = b |
result = prefix_with_module_if_found(cls) + "." + b.getName()
)
or
not exists(Builtin cls | cls.isClass() and cls.getMember(b.getName()) = b) and
result = b.getName()
)
}
/** Helper predicate that tries to adds module qualifier to `b`. Will succeed even if module not found. */
private string prefix_with_module_if_found(Builtin b) {
exists(Builtin mod | mod.isModule() and mod.getMember(b.getName()) = b |
result = mod.getName() + "." + b.getName()
)
or
not exists(Builtin mod | mod.isModule() and mod.getMember(b.getName()) = b) and
result = b.getName()
}

View File

@@ -11,11 +11,9 @@ relevant for security analysis of this application.</p>
<p>An external API is defined as a call to a method that is not defined in the source
code, and is not modeled as a taint step in the default taint library. External APIs may
be from the Python standard library or dependencies. The query will report the fully qualified name,
along with <code>[param x]</code>, where <code>x</code> indicates the position of
the parameter receiving the untrusted data. Note that for methods and
<code>classmethod</code>s, parameter 0 represents the class instance or class itself
respectively.</p>
be from the Python standard library or dependencies. The query will report the fully
qualified name, along with <code>[position index]</code> or <code>[keyword name]</code>,
to indicate the argument passing the untrusted data.</p>
<p>Note that an excepted sink might not be included in the results, if it also defines a
taint step. This is the case for <code>pickle.loads</code> which is a sink for the
@@ -24,8 +22,6 @@ Unsafe Deserialization query, but is also a taint step for other queries.</p>
<p>Note: Compared to the Java version of this query, we currently do not give special
care to methods that are overridden in the source code.</p>
<p>Note: Currently this query will only report results for external packages that are extracted.</p>
</overview>
<recommendation>

View File

@@ -11,11 +11,9 @@ be modeled as either taint steps, or sinks for specific problems.</p>
<p>An external API is defined as a call to a method that is not defined in the source
code, and is not modeled as a taint step in the default taint library. External APIs may
be from the Python standard library or dependencies. The query will report the fully qualified name,
along with <code>[param x]</code>, where <code>x</code> indicates the position of
the parameter receiving the untrusted data. Note that for methods and
<code>classmethod</code>s, parameter 0 represents the class instance or class itself
respectively.</p>
be from the Python standard library or dependencies. The query will report the fully
qualified name, along with <code>[position index]</code> or <code>[keyword name]</code>,
to indicate the argument passing the untrusted data.</p>
<p>Note that an excepted sink might not be included in the results, if it also defines a
taint step. This is the case for <code>pickle.loads</code> which is a sink for the
@@ -24,8 +22,6 @@ Unsafe Deserialization query, but is also a taint step for other queries.</p>
<p>Note: Compared to the Java version of this query, we currently do not give special
care to methods that are overridden in the source code.</p>
<p>Note: Currently this query will only report results for external packages that are extracted.</p>
</overview>
<recommendation>

View File

@@ -1,36 +1,30 @@
import os.path
from flask import Flask, request, abort
app = Flask(__name__)
urlpatterns = [
# Route to user_picture
url(r'^user-pic1$', user_picture1, name='user-picture1'),
url(r'^user-pic2$', user_picture2, name='user-picture2'),
url(r'^user-pic3$', user_picture3, name='user-picture3')
]
def user_picture1(request):
"""A view that is vulnerable to malicious file access."""
filename = request.GET.get('p')
@app.route("/user_picture1")
def user_picture1():
filename = request.args.get('p')
# BAD: This could read any file on the file system
data = open(filename, 'rb').read()
return HttpResponse(data)
return data
def user_picture2(request):
"""A view that is vulnerable to malicious file access."""
@app.route("/user_picture2")
def user_picture2():
base_path = '/server/static/images'
filename = request.GET.get('p')
filename = request.args.get('p')
# BAD: This could still read any file on the file system
data = open(os.path.join(base_path, filename), 'rb').read()
return HttpResponse(data)
return data
def user_picture3(request):
"""A view that is not vulnerable to malicious file access."""
@app.route("/user_picture3")
def user_picture3():
base_path = '/server/static/images'
filename = request.GET.get('p')
filename = request.args.get('p')
#GOOD -- Verify with normalised version of path
fullpath = os.path.normpath(os.path.join(base_path, filename))
if not fullpath.startswith(base_path):
raise SecurityException()
raise Exception("not allowed")
data = open(fullpath, 'rb').read()
return HttpResponse(data)
return data

View File

@@ -1,7 +1,7 @@
import sys
import tarfile
with tarfile.open('archive.zip') as tar:
with tarfile.open(sys.argv[1]) as tar:
#BAD : This could write any file on the filesystem.
for entry in tar:
tar.extract(entry, "/tmp/unpack/")

View File

@@ -1,8 +1,8 @@
import sys
import tarfile
import os.path
with tarfile.open('archive.zip') as tar:
with tarfile.open(sys.argv[1]) as tar:
for entry in tar:
#GOOD: Check that entry is safe
if os.path.isabs(entry.name) or ".." in entry.name:

View File

@@ -0,0 +1,73 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>
Dynamically constructing a shell command with inputs from library
functions may inadvertently change the meaning of the shell command.
Clients using the exported function may use inputs containing
characters that the shell interprets in a special way, for instance
quotes and spaces.
This can result in the shell command misbehaving, or even
allowing a malicious user to execute arbitrary commands on the system.
</p>
</overview>
<recommendation>
<p>
If possible, provide the dynamic arguments to the shell as an array
to APIs such as <code>subprocess.run</code> to avoid interpretation by the shell.
</p>
<p>
Alternatively, if the shell command must be constructed
dynamically, then add code to ensure that special characters
do not alter the shell command unexpectedly.
</p>
</recommendation>
<example>
<p>
The following example shows a dynamically constructed shell
command that downloads a file from a remote URL.
</p>
<sample src="examples/unsafe-shell-command-construction.py" />
<p>
The shell command will, however, fail to work as intended if the
input contains spaces or other special characters interpreted in a
special way by the shell.
</p>
<p>
Even worse, a client might pass in user-controlled
data, not knowing that the input is interpreted as a shell command.
This could allow a malicious user to provide the input <code>http://example.org; cat /etc/passwd</code>
in order to execute the command <code>cat /etc/passwd</code>.
</p>
<p>
To avoid such potentially catastrophic behaviors, provide the
input from library functions as an argument that does not
get interpreted by a shell:
</p>
<sample src="examples/unsafe-shell-command-construction_fixed.py" />
</example>
<references>
<li>
OWASP:
<a href="https://www.owasp.org/index.php/Command_Injection">Command Injection</a>.
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,27 @@
/**
* @name Unsafe shell command constructed from library input
* @description Using externally controlled strings in a command line may allow a malicious
* user to change the meaning of the command.
* @kind path-problem
* @problem.severity error
* @security-severity 6.3
* @precision medium
* @id py/shell-command-constructed-from-input
* @tags correctness
* security
* external/cwe/cwe-078
* external/cwe/cwe-088
* external/cwe/cwe-073
*/
import python
import semmle.python.security.dataflow.UnsafeShellCommandConstructionQuery
import DataFlow::PathGraph
from Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink, Sink sinkNode
where
config.hasFlowPath(source, sink) and
sinkNode = sink.getNode()
select sinkNode.getStringConstruction(), source, sink,
"This " + sinkNode.describe() + " which depends on $@ is later used in a $@.", source.getNode(),
"library input", sinkNode.getCommandExecution(), "shell command"

View File

@@ -0,0 +1,4 @@
import os
def download(path):
os.system("wget " + path) # NOT OK

View File

@@ -0,0 +1,4 @@
import subprocess
def download(path):
subprocess.run(["wget", path]) # OK

View File

@@ -16,11 +16,17 @@ import semmle.python.ApiGraphs
private API::Node unsafe_paramiko_policy(string name) {
name in ["AutoAddPolicy", "WarningPolicy"] and
result = API::moduleImport("paramiko").getMember("client").getMember(name)
(
result = API::moduleImport("paramiko").getMember("client").getMember(name)
or
result = API::moduleImport("paramiko").getMember(name)
)
}
private API::Node paramikoSshClientInstance() {
result = API::moduleImport("paramiko").getMember("client").getMember("SSHClient").getReturn()
or
result = API::moduleImport("paramiko").getMember("SSHClient").getReturn()
}
from DataFlow::CallCfgNode call, DataFlow::Node arg, string name

View File

@@ -4,64 +4,116 @@ import TlsLibraryModel
/**
* Configuration to determine the state of a context being used to create
* a connection. There is one configuration for each pair of `TlsLibrary` and `ProtocolVersion`,
* such that a single configuration only tracks contexts where a specific `ProtocolVersion` is allowed.
* a connection. The configuration uses a flow state to track the `TlsLibrary`
* and the insecure `ProtocolVersion`s that are allowed.
*
* The state is in terms of whether a specific protocol is allowed. This is
* either true or false when the context is created and can then be modified
* later by either restricting or unrestricting the protocol (see the predicates
* `isRestriction` and `isUnrestriction`).
* later by either restricting or unrestricting the protocol (see the predicate
* `isAdditionalFlowStep`).
*
* Since we are interested in the final state, we want the flow to start from
* the last unrestriction, so we disallow flow into unrestrictions. We also
* model the creation as an unrestriction of everything it allows, to account
* for the common case where the creation plays the role of "last unrestriction".
*
* Since we really want "the last unrestriction, not nullified by a restriction",
* we also disallow flow into restrictions.
* The state is represented as a bit vector, where each bit corresponds to a
* protocol version. The bit is set if the protocol is allowed.
*/
class InsecureContextConfiguration extends DataFlow::Configuration {
TlsLibrary library;
ProtocolVersion tracked_version;
module InsecureContextConfiguration implements DataFlow::StateConfigSig {
private newtype TFlowState =
TMkFlowState(TlsLibrary library, int bits) {
bits in [0 .. max(any(ProtocolVersion v).getBit()) * 2 - 1]
}
InsecureContextConfiguration() {
this = library + "Allows" + tracked_version and
tracked_version.isInsecure()
class FlowState extends TFlowState {
int getBits() { this = TMkFlowState(_, result) }
TlsLibrary getLibrary() { this = TMkFlowState(result, _) }
predicate allowsInsecureVersion(ProtocolVersion v) {
v.isInsecure() and this.getBits().bitAnd(v.getBit()) != 0
}
string toString() {
result =
"FlowState(" + this.getLibrary().toString() + ", " +
concat(ProtocolVersion v | this.allowsInsecureVersion(v) | v, ", ") + ")"
}
}
ProtocolVersion getTrackedVersion() { result = tracked_version }
override predicate isSource(DataFlow::Node source) { this.isUnrestriction(source) }
override predicate isSink(DataFlow::Node sink) {
sink = library.connection_creation().getContext()
}
override predicate isBarrierIn(DataFlow::Node node) {
this.isRestriction(node)
private predicate relevantState(FlowState state) {
isSource(_, state)
or
this.isUnrestriction(node)
}
private predicate isRestriction(DataFlow::Node node) {
exists(ProtocolRestriction r |
r = library.protocol_restriction() and
r.getRestriction() = tracked_version
|
node = r.getContext()
exists(FlowState state0 | relevantState(state0) |
exists(ProtocolRestriction r |
r = state0.getLibrary().protocol_restriction() and
state.getBits() = state0.getBits().bitAnd(sum(r.getRestriction().getBit()).bitNot()) and
state0.getLibrary() = state.getLibrary()
)
or
exists(ProtocolUnrestriction pu |
pu = state0.getLibrary().protocol_unrestriction() and
state.getBits() = state0.getBits().bitOr(sum(pu.getUnrestriction().getBit())) and
state0.getLibrary() = state.getLibrary()
)
)
}
private predicate isUnrestriction(DataFlow::Node node) {
exists(ProtocolUnrestriction pu |
pu = library.protocol_unrestriction() and
pu.getUnrestriction() = tracked_version
|
node = pu.getContext()
predicate isSource(DataFlow::Node source, FlowState state) {
exists(ContextCreation creation | source = creation |
creation = state.getLibrary().unspecific_context_creation() and
state.getBits() =
sum(ProtocolVersion version |
version = creation.getProtocol() and version.isInsecure()
|
version.getBit()
)
)
}
predicate isSink(DataFlow::Node sink, FlowState state) {
sink = state.getLibrary().connection_creation().getContext() and
state.allowsInsecureVersion(_)
}
predicate isAdditionalFlowStep(
DataFlow::Node node1, FlowState state1, DataFlow::Node node2, FlowState state2
) {
DataFlow::localFlowStep(node1, node2) and
relevantState(state1) and
(
exists(ProtocolRestriction r |
r = state1.getLibrary().protocol_restriction() and
node2 = r.getContext() and
state2.getBits() = state1.getBits().bitAnd(sum(r.getRestriction().getBit()).bitNot()) and
state1.getLibrary() = state2.getLibrary()
)
or
exists(ProtocolUnrestriction pu |
pu = state1.getLibrary().protocol_unrestriction() and
node2 = pu.getContext() and
state2.getBits() = state1.getBits().bitOr(sum(pu.getUnrestriction().getBit())) and
state1.getLibrary() = state2.getLibrary()
)
)
}
predicate isBarrier(DataFlow::Node node, FlowState state) {
relevantState(state) and
(
exists(ProtocolRestriction r |
r = state.getLibrary().protocol_restriction() and
node = r.getContext() and
state.allowsInsecureVersion(r.getRestriction())
)
or
exists(ProtocolUnrestriction pu |
pu = state.getLibrary().protocol_unrestriction() and
node = pu.getContext() and
not state.allowsInsecureVersion(pu.getUnrestriction())
)
)
}
}
private module InsecureContextFlow = DataFlow::GlobalWithState<InsecureContextConfiguration>;
/**
* Holds if `conectionCreation` marks the creation of a connection based on the contex
* found at `contextOrigin` and allowing `insecure_version`.
@@ -74,8 +126,11 @@ predicate unsafe_connection_creation_with_context(
boolean specific
) {
// Connection created from a context allowing `insecure_version`.
exists(InsecureContextConfiguration c | c.hasFlow(contextOrigin, connectionCreation) |
insecure_version = c.getTrackedVersion() and
exists(InsecureContextFlow::PathNode src, InsecureContextFlow::PathNode sink |
InsecureContextFlow::flowPath(src, sink) and
src.getNode() = contextOrigin and
sink.getNode() = connectionCreation and
sink.getState().allowsInsecureVersion(insecure_version) and
specific = false
)
or

View File

@@ -12,14 +12,14 @@ class PyOpenSslContextCreation extends ContextCreation, DataFlow::CallCfgNode {
this = API::moduleImport("OpenSSL").getMember("SSL").getMember("Context").getACall()
}
override string getProtocol() {
override ProtocolVersion getProtocol() {
exists(DataFlow::Node protocolArg, PyOpenSsl pyo |
protocolArg in [this.getArg(0), this.getArgByName("method")]
|
protocolArg in [
pyo.specific_version(result).getAValueReachableFromSource(),
pyo.unspecific_version(result).getAValueReachableFromSource()
]
protocolArg = pyo.specific_version(result).getAValueReachableFromSource()
or
protocolArg = pyo.unspecific_version().getAValueReachableFromSource() and
result = any(ProtocolVersion pv)
)
}
}
@@ -51,18 +51,23 @@ class SetOptionsCall extends ProtocolRestriction, DataFlow::CallCfgNode {
}
}
class UnspecificPyOpenSslContextCreation extends PyOpenSslContextCreation, UnspecificContextCreation {
UnspecificPyOpenSslContextCreation() { library instanceof PyOpenSsl }
}
class PyOpenSsl extends TlsLibrary {
PyOpenSsl() { this = "pyOpenSSL" }
override string specific_version_name(ProtocolVersion version) { result = version + "_METHOD" }
override string unspecific_version_name(ProtocolFamily family) {
// `"TLS_METHOD"` is not actually available in pyOpenSSL yet, but should be coming soon..
result = family + "_METHOD"
override string unspecific_version_name() {
// See
// - https://www.pyopenssl.org/en/23.0.0/api/ssl.html#module-OpenSSL.SSL
// - https://www.openssl.org/docs/manmaster/man3/DTLS_server_method.html#NOTES
//
// PyOpenSSL also allows DTLS
// see https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context
// although they are not mentioned here:
// https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.TLS_METHOD
result = ["TLS", "SSLv23"] + "_METHOD"
or
result = "TLS_" + ["CLIENT", "SERVER"] + "_METHOD"
}
override API::Node version_constants() { result = API::moduleImport("OpenSSL").getMember("SSL") }
@@ -79,7 +84,5 @@ class PyOpenSsl extends TlsLibrary {
override ProtocolRestriction protocol_restriction() { result instanceof SetOptionsCall }
override ProtocolUnrestriction protocol_unrestriction() {
result instanceof UnspecificPyOpenSslContextCreation
}
override ProtocolUnrestriction protocol_unrestriction() { none() }
}

View File

@@ -10,20 +10,21 @@ import TlsLibraryModel
class SslContextCreation extends ContextCreation, DataFlow::CallCfgNode {
SslContextCreation() { this = API::moduleImport("ssl").getMember("SSLContext").getACall() }
override string getProtocol() {
override ProtocolVersion getProtocol() {
exists(DataFlow::Node protocolArg, Ssl ssl |
protocolArg in [this.getArg(0), this.getArgByName("protocol")]
|
protocolArg =
[
ssl.specific_version(result).getAValueReachableFromSource(),
ssl.unspecific_version(result).getAValueReachableFromSource()
]
protocolArg = ssl.specific_version(result).getAValueReachableFromSource()
or
protocolArg = ssl.unspecific_version().getAValueReachableFromSource() and
// see https://docs.python.org/3/library/ssl.html#id7
result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
)
or
not exists(this.getArg(_)) and
not exists(this.getArgByName(_)) and
result = "TLS"
// see https://docs.python.org/3/library/ssl.html#id7
result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
}
@@ -34,7 +35,7 @@ class SslDefaultContextCreation extends ContextCreation {
// Allowed insecure versions are "TLSv1" and "TLSv1_1"
// see https://docs.python.org/3/library/ssl.html#context-creation
override string getProtocol() { result = "TLS" }
override ProtocolVersion getProtocol() { result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"] }
}
/** Gets a reference to an `ssl.Context` instance. */
@@ -161,35 +162,29 @@ class ContextSetVersion extends ProtocolRestriction, ProtocolUnrestriction, Data
}
}
class UnspecificSslContextCreation extends SslContextCreation, UnspecificContextCreation {
UnspecificSslContextCreation() { library instanceof Ssl }
override ProtocolVersion getUnrestriction() {
result = UnspecificContextCreation.super.getUnrestriction() and
// These are turned off by default since Python 3.6
// see https://docs.python.org/3.6/library/ssl.html#ssl.SSLContext
not result in ["SSLv2", "SSLv3"]
}
}
class UnspecificSslDefaultContextCreation extends SslDefaultContextCreation, ProtocolUnrestriction {
override DataFlow::Node getContext() { result = this }
// see https://docs.python.org/3/library/ssl.html#ssl.create_default_context
override ProtocolVersion getUnrestriction() {
result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
}
// class UnspecificSslContextCreation extends SslContextCreation, UnspecificContextCreation {
// // UnspecificSslContextCreation() { library instanceof Ssl }
// override ProtocolVersion getProtocol() {
// result = UnspecificContextCreation.super.getProtocol() and
// // These are turned off by default since Python 3.6
// // see https://docs.python.org/3.6/library/ssl.html#ssl.SSLContext
// not result in ["SSLv2", "SSLv3"]
// }
// }
// class UnspecificSslDefaultContextCreation extends SslDefaultContextCreation {
// // override DataFlow::Node getContext() { result = this }
// // see https://docs.python.org/3/library/ssl.html#ssl.create_default_context
// override ProtocolVersion getProtocol() { result in ["TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"] }
// }
class Ssl extends TlsLibrary {
Ssl() { this = "ssl" }
override string specific_version_name(ProtocolVersion version) { result = "PROTOCOL_" + version }
override string unspecific_version_name(ProtocolFamily family) {
family = "SSLv23" and result = "PROTOCOL_" + family
override string unspecific_version_name() {
result = "PROTOCOL_SSLv23"
or
family = "TLS" and result = "PROTOCOL_" + family + ["", "_CLIENT", "_SERVER"]
result = "PROTOCOL_TLS" + ["", "_CLIENT", "_SERVER"]
}
override API::Node version_constants() { result = API::moduleImport("ssl") }
@@ -219,9 +214,5 @@ class Ssl extends TlsLibrary {
result instanceof OptionsAugAndNot
or
result instanceof ContextSetVersion
or
result instanceof UnspecificSslContextCreation
or
result instanceof UnspecificSslDefaultContextCreation
}
}

View File

@@ -15,24 +15,45 @@ class ProtocolVersion extends string {
or
this = "TLSv1" and version = ["TLSv1_1", "TLSv1_2", "TLSv1_3"]
or
this = ["TLSv1", "TLSv1_1"] and version = ["TLSv1_2", "TLSv1_3"]
this = "TLSv1_1" and version = ["TLSv1_2", "TLSv1_3"]
or
this = ["TLSv1", "TLSv1_1", "TLSv1_2"] and version = "TLSv1_3"
this = "TLSv1_2" and version = "TLSv1_3"
}
/** Holds if this protocol version is known to be insecure. */
predicate isInsecure() { this in ["SSLv2", "SSLv3", "TLSv1", "TLSv1_1"] }
}
/** An unspecific protocol version */
class ProtocolFamily extends string {
ProtocolFamily() { this in ["SSLv23", "TLS"] }
/** Gets the bit mask for this protocol version. */
int getBit() {
this = "SSLv2" and result = 1
or
this = "SSLv3" and result = 2
or
this = "TLSv1" and result = 4
or
this = "TLSv1_1" and result = 8
or
this = "TLSv1_2" and result = 16
or
this = "TLSv1_3" and result = 32
}
}
/** The creation of a context. */
abstract class ContextCreation extends DataFlow::Node {
/** Gets the protocol version or family for this context. */
abstract string getProtocol();
/**
* Gets the protocol version for this context.
* There can be multiple values if the context was created
* using a non-specific version such as `TLS`.
*/
abstract ProtocolVersion getProtocol();
/**
* Holds if the context was created with a specific version
* rather than with a version flexible method, see:
* https://www.openssl.org/docs/manmaster/man3/DTLS_server_method.html#NOTES
*/
predicate specificVersion() { count(this.getProtocol()) = 1 }
}
/** The creation of a connection from a context. */
@@ -63,18 +84,10 @@ abstract class ProtocolUnrestriction extends DataFlow::Node {
* A context is being created with a range of allowed protocols.
* This also serves as unrestricting these protocols.
*/
abstract class UnspecificContextCreation extends ContextCreation, ProtocolUnrestriction {
TlsLibrary library;
ProtocolFamily family;
UnspecificContextCreation() { this.getProtocol() = family }
override DataFlow::CfgNode getContext() { result = this }
override ProtocolVersion getUnrestriction() {
abstract class UnspecificContextCreation extends ContextCreation {
override ProtocolVersion getProtocol() {
// There is only one family, the two names are aliases in OpenSSL.
// see https://github.com/openssl/openssl/blob/13888e797c5a3193e91d71e5f5a196a2d68d266f/include/openssl/ssl.h.in#L1953-L1955
family in ["SSLv23", "TLS"] and
// see https://docs.python.org/3/library/ssl.html#ssl-contexts
result in ["SSLv2", "SSLv3", "TLSv1", "TLSv1_1", "TLSv1_2", "TLSv1_3"]
}
@@ -88,8 +101,8 @@ abstract class TlsLibrary extends string {
/** Gets the name of a specific protocol version. */
abstract string specific_version_name(ProtocolVersion version);
/** Gets a name, which is a member of `version_constants`, that can be used to specify the protocol family `family`. */
abstract string unspecific_version_name(ProtocolFamily family);
/** Gets a name, which is a member of `version_constants`, that can be used to specify the entire protocol family. */
abstract string unspecific_version_name();
/** Gets an API node representing the module or class holding the version constants. */
abstract API::Node version_constants();
@@ -99,9 +112,9 @@ abstract class TlsLibrary extends string {
result = this.version_constants().getMember(this.specific_version_name(version))
}
/** Gets an API node representing the protocol family `family`. */
API::Node unspecific_version(ProtocolFamily family) {
result = this.version_constants().getMember(this.unspecific_version_name(family))
/** Gets an API node representing the protocol entire family. */
API::Node unspecific_version() {
result = this.version_constants().getMember(this.unspecific_version_name())
}
/** Gets a creation of a context with a default protocol. */
@@ -113,14 +126,15 @@ abstract class TlsLibrary extends string {
/** Gets a creation of a context with a specific protocol version, known to be insecure. */
ContextCreation insecure_context_creation(ProtocolVersion version) {
result in [this.specific_context_creation(), this.default_context_creation()] and
result.specificVersion() and
result.getProtocol() = version and
version.isInsecure()
}
/** Gets a context that was created using `family`, known to have insecure instances. */
ContextCreation unspecific_context_creation(ProtocolFamily family) {
ContextCreation unspecific_context_creation() {
result in [this.specific_context_creation(), this.default_context_creation()] and
result.getProtocol() = family
not result.specificVersion()
}
/** Gets a dataflow node representing a connection being created in an insecure manner, not from a context. */

View File

@@ -15,7 +15,9 @@ import Undefined
import semmle.python.pointsto.PointsTo
predicate uninitialized_local(NameNode use) {
exists(FastLocalVariable local | use.uses(local) or use.deletes(local) | not local.escapes()) and
exists(FastLocalVariable local | use.uses(local) or use.deletes(local) |
not local.escapes() and not local = any(Nonlocal nl).getAVariable()
) and
(
any(Uninitialized uninit).taints(use) and
PointsToInternal::reachableBlock(use.getBasicBlock(), _)

View File

@@ -13,8 +13,8 @@ where
or
key = "Interpreter version" and
exists(string major, string minor |
py_flags_versioned("version.major", major, _) and
py_flags_versioned("version.minor", minor, _) and
py_flags_versioned("extractor_python_version.major", major, _) and
py_flags_versioned("extractor_python_version.minor", minor, _) and
value = major + "." + minor
)
or

View File

@@ -0,0 +1,3 @@
## 0.6.3
No user-facing changes.

View File

@@ -0,0 +1,3 @@
## 0.6.4
No user-facing changes.

View File

@@ -0,0 +1,5 @@
## 0.6.5
### New Queries
* Added a new query, `py/shell-command-constructed-from-input`, to detect libraries that unsafely construct shell commands from their inputs.

View File

@@ -0,0 +1,3 @@
## 0.6.6
No user-facing changes.

View File

@@ -0,0 +1,5 @@
## 0.7.0
### Bug Fixes
* Nonlocal variables are excluded from alerts.

View File

@@ -1,2 +1,2 @@
---
lastReleaseVersion: 0.6.2
lastReleaseVersion: 0.7.0

View File

@@ -0,0 +1,56 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>Extracting files from a malicious tarball without validating that the destination file path
is within the destination directory using <code>shutil.unpack_archive()</code> can cause files outside the
destination directory to be overwritten, due to the possible presence of directory traversal elements
(<code>..</code>) in archive path names.</p>
<p>Tarball contain archive entries representing each file in the archive. These entries
include a file path for the entry, but these file paths are not restricted and may contain
unexpected special elements such as the directory traversal element (<code>..</code>). If these
file paths are used to determine an output file to write the contents of the archive item to, then
the file may be written to an unexpected location. This can result in sensitive information being
revealed or deleted, or an attacker being able to influence behavior by modifying unexpected
files.</p>
<p>For example, if a tarball contains a file entry <code>../sneaky-file.txt</code>, and the tarball
is extracted to the directory <code>/tmp/tmp123</code>, then naively combining the paths would result
in an output file path of <code>/tmp/tmp123/../sneaky-file.txt</code>, which would cause the file to be
written to <code>/tmp/</code>.</p>
</overview>
<recommendation>
<p>Ensure that output paths constructed from tarball entries are validated
to prevent writing files to unexpected locations.</p>
<p>Consider using a safer module, such as: <code>zipfile</code></p>
</recommendation>
<example>
<p>
In this example an archive is extracted without validating file paths.
</p>
<sample src="examples/HIT_UnsafeUnpack.py" />
<p>To fix this vulnerability, we need to call the function <code>tarfile.extract()</code>
on each <code>member</code> after verifying that it does not contain either <code>..</code> or startswith <code>/</code>.
</p>
<sample src="examples/NoHIT_UnsafeUnpack.py" />
</example>
<references>
<li>
Shutil official documentation
<a href="https://docs.python.org/3/library/shutil.html?highlight=unpack_archive#shutil.unpack_archive">shutil.unpack_archive() warning.</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,24 @@
/**
* @name Arbitrary file write during a tarball extraction from a user controlled source
* @description Extracting files from a potentially malicious tarball using `shutil.unpack_archive()` without validating
* that the destination file path is within the destination directory can cause files outside
* the destination directory to be overwritten. More precisely, if the tarball comes from a user controlled
* location either a remote one or cli argument.
* @kind path-problem
* @id py/unsafe-unpacking
* @problem.severity error
* @security-severity 7.5
* @precision medium
* @tags security
* experimental
* external/cwe/cwe-022
*/
import python
import experimental.Security.UnsafeUnpackQuery
import DataFlow::PathGraph
from UnsafeUnpackingConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"Unsafe extraction from a malicious tarball retrieved from a remote location."

View File

@@ -0,0 +1,12 @@
import requests
import shutil
url = "https://www.someremote.location/tarball.tar.gz"
response = requests.get(url, stream=True)
tarpath = "/tmp/tmp456/tarball.tar.gz"
with open(tarpath, "wb") as f:
f.write(response.raw.read())
untarredpath = "/tmp/tmp123"
shutil.unpack_archive(tarpath, untarredpath)

View File

@@ -0,0 +1,17 @@
import requests
import tarfile
url = "https://www.someremote.location/tarball.tar.gz"
response = requests.get(url, stream=True)
tarpath = "/tmp/tmp456/tarball.tar.gz"
with open(tarpath, "wb") as f:
f.write(response.raw.read())
untarredpath = "/tmp/tmp123"
with tarfile.open(tarpath) as tar:
for member in tar.getmembers():
if member.name.startswith("/") or ".." in member.name:
raise Exception("Path traversal identified in tarball")
tar.extract(untarredpath, member)

View File

@@ -0,0 +1,4 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<include src="TimingAttackAgainstHash.qhelp" />
</qhelp>

View File

@@ -0,0 +1,37 @@
/**
* @name Timing attack against Hash
* @description When checking a Hash over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid Hash for an arbitrary message
* by running a timing attack if they can send to the validation procedure.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/possible-timing-attack-against-hash
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to equality test
*/
class PossibleTimingAttackAgainstHash extends TaintTracking::Configuration {
PossibleTimingAttackAgainstHash() { this = "PossibleTimingAttackAgainstHash" }
override predicate isSource(DataFlow::Node source) { source instanceof ProduceCryptoCall }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from PossibleTimingAttackAgainstHash config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Possible Timing attack against $@ validation.",
source.getNode().(ProduceCryptoCall).getResultType(), "message"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack Against Hash
"""
import hmac
import hashlib
key = "e179017a-62b0-4996-8a38-e91aa9f1"
msg = "Test"
def sign(pre_key, imsg, alg):
return hmac.new(pre_key, imsg, alg).digest()
def verify(msg, sig):
return hmac.compare_digest(sig, sign(key, msg, hashlib.sha256)) #good

View File

@@ -0,0 +1,55 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
Timing Attack is based on the leakage of information by studying how long it takes the system to respond to different inputs.
it can be circumvented by using a constant-time algorithm for checking the value of Hash,
more precisely, the comparison time should not depend on the content of the input. Otherwise the attacker gains
information that is indirectly leaked by the application. This information may then be used for malicious purposes.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the value of Hash.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating a Hash.
</p>
<sample src="UnSafeComparisonOfHash.py" />
<p>
The next example use a safe constant-time algorithm for validating a Hash:
</p>
<sample src="SafeComparisonOfHash.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
<li>
HMAC:
<a href="https://datatracker.ietf.org/doc/html/rfc2104.html">RFC 2104</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,38 @@
/**
* @name Timing attack against Hash
* @description When checking a Hash over a message, a constant-time algorithm should be used.
* Otherwise, an attacker may be able to forge a valid Hash for an arbitrary message
* by running a timing attack if they can send to the validation procedure.
* A successful attack can result in authentication bypass.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/timing-attack-against-hash
* @tags security
* external/cwe/cwe-208
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration that tracks data flow from cryptographic operations
* to Equality test.
*/
class TimingAttackAgainsthash extends TaintTracking::Configuration {
TimingAttackAgainsthash() { this = "TimingAttackAgainsthash" }
override predicate isSource(DataFlow::Node source) { source instanceof ProduceCryptoCall }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from TimingAttackAgainsthash config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink) and
sink.getNode().(NonConstantTimeComparisonSink).includesUserInput()
select sink.getNode(), source, sink, "Timing attack against $@ validation.",
source.getNode().(ProduceCryptoCall).getResultType(), "message"

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack Against Hash
"""
import hmac
import hashlib
key = "e179017a-62b0-4996-8a38-e91aa9f1"
msg = "Test"
def sign(pre_key, imsg, alg):
return hmac.new(pre_key, imsg, alg).digest()
def verify(msg, sig):
return sig == sign(key, msg, hashlib.sha256) #bad

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack against header value
"""
from flask import Flask
from flask import request
import hmac
@app.route('/good')
def good():
secret = request.headers.get('X-Auth-Token')
if not hmac.compare_digest(secret, "token"):
raise Exception('bad token')
return 'good'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,50 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
A constant-time algorithm should be used for checking the value of sensitive headers.
In other words, the comparison time should not depend on the content of the input.
Otherwise timing information could be used to infer the header's expected, secret value.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the secret value.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating the value of sensitive headers.
</p>
<sample src="UnsafeComparisonOfHeaderValue.py" />
<p>
The next example use a safe constant-time algorithm for validating the value of sensitive headers:
</p>
<sample src="SafeComparisonOfHeaderValue.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against header value
* @description Use of a non-constant-time verification routine to check the value of an HTTP header,
* possibly allowing a timing attack to infer the header's expected value.
* @kind path-problem
* @problem.severity error
* @precision high
* @id py/timing-attack-against-header-value
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from a client Secret obtained by an HTTP header to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof ClientSuppliedSecret }
override predicate isSink(DataFlow::Node sink) { sink instanceof CompareSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink) and not sink.getNode().(CompareSink).flowtolen()
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack against header value
"""
from flask import Flask
from flask import request
@app.route('/bad')
def bad():
secret = request.headers.get('X-Auth-Token')
if secret == "token":
raise Exception('bad token')
return 'bad'
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,4 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<include src="TimingAttackAgainstSensitiveInfo.qhelp" />
</qhelp>

View File

@@ -0,0 +1,34 @@
/**
* @name Timing attack against secret
* @description Use of a non-constant-time verification routine to check the value of an secret,
* possibly allowing a timing attack to retrieve sensitive information.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/possible-timing-attack-sensitive-info
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from obtaining a client Secret to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof SecretSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc preventing timing attack sensitive info
"""
from flask import Flask
from flask import request
import hmac
app = Flask(__name__)
@app.route('/bad', methods = ['POST', 'GET'])
def bad():
if request.method == 'POST':
password = request.form['pwd']
return hmac.compare_digest(password, "1234")
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -0,0 +1,52 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>
Timing Attack is based on the leakage of information of secret parameters by studying
how long it takes the system to respond to different inputs.
it can be circumvented by using a constant-time algorithm for checking the value of sensitive info,
more precisely, the comparison time should not depend on the content of the input. Otherwise the attacker gains
information that is indirectly leaked by the application. This information is then used for malicious purposes.
</p>
</overview>
<recommendation>
<p>
Two types of countermeasures can be applied against timing attacks. The first one consists
in eliminating timing variations whereas the second renders these variations useless for an attacker.
The only absolute way to prevent timing attacks is to make the computation strictly constant time,
independent of the input.
Use <code>hmac.compare_digest()</code> method to securely check the value of sensitive info.
If this method is used, then the calculation time depends only on the length of input byte arrays,
and does not depend on the contents of the arrays.
Unlike <code>==</code> is a fail fast check, If the first byte is not equal, it will return immediately.
</p>
</recommendation>
<example>
<p>
The following example uses <code>==</code> which is a fail fast check for validating a secret.
</p>
<sample src="UnSafeComparisonOfSensitiveInfo.py" />
<p>
The next example use a safe constant-time algorithm for validating a secret:
</p>
<sample src="SafeComparisonOfSensitiveInfo.py" />
</example>
<references>
<li>
Wikipedia:
<a href="https://en.wikipedia.org/wiki/Timing_attack">Timing attack</a>.
</li>
<li>
<a href="https://docs.python.org/3/library/hmac.html#hmac.compare_digest">hmac.compare_digest() method</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,39 @@
/**
* @name Timing attack against secret
* @description Use of a non-constant-time verification routine to check the value of an secret,
* possibly allowing a timing attack to retrieve sensitive information.
* @kind path-problem
* @problem.severity error
* @precision low
* @id py/timing-attack-sensitive-info
* @tags security
* external/cwe/cwe-208
* experimental
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.security.TimingAttack
import DataFlow::PathGraph
/**
* A configuration tracing flow from obtaining a client Secret to a unsafe Comparison.
*/
class ClientSuppliedSecretConfig extends TaintTracking::Configuration {
ClientSuppliedSecretConfig() { this = "ClientSuppliedSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof SecretSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof NonConstantTimeComparisonSink }
}
from ClientSuppliedSecretConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where
config.hasFlowPath(source, sink) and
(
source.getNode().(SecretSource).includesUserInput() or
sink.getNode().(NonConstantTimeComparisonSink).includesUserInput()
)
select sink.getNode(), source, sink, "Timing attack against $@ validation.", source.getNode(),
"client-supplied token"

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Desc timing attack against sensitive info
"""
from flask import Flask
from flask import request
@app.route('/bad', methods = ['POST', 'GET'])
def bad():
if request.method == 'POST':
password = request.form['pwd']
return password == "test"
if __name__ == '__main__':
app.debug = True
app.run()

View File

@@ -1,7 +1,7 @@
/**
* @name Unsafe usage of v1 version of Azure Storage client-side encryption.
* @description Using version v1 of Azure Storage client-side encryption is insecure, and may enable an attacker to decrypt encrypted data
* @kind problem
* @kind path-problem
* @tags security
* experimental
* cryptography
@@ -12,80 +12,145 @@
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
predicate isUnsafeClientSideAzureStorageEncryptionViaAttributes(Call call, AttrNode node) {
exists(
API::Node n, API::Node n2, Attribute a, AssignStmt astmt, API::Node uploadBlob,
ControlFlowNode ctrlFlowNode, string s
|
s in ["key_encryption_key", "key_resolver_function"] and
n =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getReturn()
.getMember(s) and
n2 =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getReturn()
.getMember("upload_blob") and
n.getAValueReachableFromSource().asExpr() = a and
astmt.getATarget() = a and
a.getAFlowNode() = node and
uploadBlob =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getReturn()
.getMember("upload_blob") and
uploadBlob.getACall().asExpr() = call and
ctrlFlowNode = call.getAFlowNode() and
node.strictlyReaches(ctrlFlowNode) and
node != ctrlFlowNode and
not exists(
AssignStmt astmt2, Attribute a2, AttrNode encryptionVersionSet, StrConst uc,
API::Node encryptionVersion
|
uc = astmt2.getValue() and
uc.getText() in ["'2.0'", "2.0"] and
encryptionVersion =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getReturn()
.getMember("encryption_version") and
encryptionVersion.getAValueReachableFromSource().asExpr() = a2 and
astmt2.getATarget() = a2 and
a2.getAFlowNode() = encryptionVersionSet and
encryptionVersionSet.strictlyReaches(ctrlFlowNode)
)
)
API::Node getBlobServiceClient(boolean isSource) {
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobServiceClient")
.getReturn()
or
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobServiceClient")
.getMember("from_connection_string")
.getReturn()
}
predicate isUnsafeClientSideAzureStorageEncryptionViaObjectCreation(Call call, ControlFlowNode node) {
exists(API::Node c, string s, Keyword k | k.getAFlowNode() = node |
c.getACall().asExpr() = call and
c = API::moduleImport("azure").getMember("storage").getMember("blob").getMember(s) and
s in ["ContainerClient", "BlobClient", "BlobServiceClient"] and
k.getArg() = "key_encryption_key" and
k = call.getANamedArg() and
not k.getValue() instanceof None and
not exists(Keyword k2 | k2 = call.getANamedArg() |
k2.getArg() = "encryption_version" and
k2.getValue().(StrConst).getText() in ["'2.0'", "2.0"]
)
)
API::CallNode getTransitionToContainerClient() {
result = getBlobServiceClient(_).getMember("get_container_client").getACall()
or
result = getBlobClient(_).getMember("_get_container_client").getACall()
}
from Call call, ControlFlowNode node
where
isUnsafeClientSideAzureStorageEncryptionViaAttributes(call, node) or
isUnsafeClientSideAzureStorageEncryptionViaObjectCreation(call, node)
select node, "Unsafe usage of v1 version of Azure Storage client-side encryption."
API::Node getContainerClient(boolean isSource) {
isSource = false and
result = getTransitionToContainerClient().getReturn()
or
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("ContainerClient")
.getReturn()
or
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("ContainerClient")
.getMember(["from_connection_string", "from_container_url"])
.getReturn()
}
API::CallNode getTransitionToBlobClient() {
result = [getBlobServiceClient(_), getContainerClient(_)].getMember("get_blob_client").getACall()
}
API::Node getBlobClient(boolean isSource) {
isSource = false and
result = getTransitionToBlobClient().getReturn()
or
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getReturn()
or
isSource = true and
result =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember("BlobClient")
.getMember(["from_connection_string", "from_blob_url"])
.getReturn()
}
API::Node anyClient(boolean isSource) {
result in [getBlobServiceClient(isSource), getContainerClient(isSource), getBlobClient(isSource)]
}
newtype TAzureFlowState =
MkUsesV1Encryption() or
MkUsesNoEncryption()
module AzureBlobClientConfig implements DataFlow::StateConfigSig {
class FlowState = TAzureFlowState;
predicate isSource(DataFlow::Node node, FlowState state) {
state = MkUsesNoEncryption() and
node = anyClient(true).asSource()
}
predicate isBarrier(DataFlow::Node node, FlowState state) {
exists(state) and
exists(DataFlow::AttrWrite attr |
node = anyClient(_).getAValueReachableFromSource() and
attr.accesses(node, "encryption_version") and
attr.getValue().asExpr().(StrConst).getText() in ["'2.0'", "2.0"]
)
or
// small optimization to block flow with no encryption out of the post-update node
// for the attribute assignment.
isAdditionalFlowStep(_, MkUsesNoEncryption(), node, MkUsesV1Encryption()) and
state = MkUsesNoEncryption()
}
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
exists(DataFlow::MethodCallNode call |
call in [getTransitionToContainerClient(), getTransitionToBlobClient()] and
node1 = call.getObject() and
node2 = call
)
}
predicate isAdditionalFlowStep(
DataFlow::Node node1, FlowState state1, DataFlow::Node node2, FlowState state2
) {
node1 = node2.(DataFlow::PostUpdateNode).getPreUpdateNode() and
state1 = MkUsesNoEncryption() and
state2 = MkUsesV1Encryption() and
exists(DataFlow::AttrWrite attr |
node1 = anyClient(_).getAValueReachableFromSource() and
attr.accesses(node1, ["key_encryption_key", "key_resolver_function"])
)
}
predicate isSink(DataFlow::Node node, FlowState state) {
state = MkUsesV1Encryption() and
exists(DataFlow::MethodCallNode call |
call = getBlobClient(_).getMember("upload_blob").getACall() and
node = call.getObject()
)
}
}
module AzureBlobClient = DataFlow::GlobalWithState<AzureBlobClientConfig>;
import AzureBlobClient::PathGraph
from AzureBlobClient::PathNode source, AzureBlobClient::PathNode sink
where AzureBlobClient::flowPath(source, sink)
select sink, source, sink, "Unsafe usage of v1 version of Azure Storage client-side encryption"

View File

@@ -16,7 +16,8 @@ private import semmle.python.frameworks.Tornado
abstract class ClientSuppliedIpUsedInSecurityCheck extends DataFlow::Node { }
private class FlaskClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
DataFlow::MethodCallNode {
DataFlow::MethodCallNode
{
FlaskClientSuppliedIpUsedInSecurityCheck() {
this = Flask::request().getMember("headers").getMember(["get", "get_all", "getlist"]).getACall() and
this.getArg(0).asExpr().(StrConst).getText().toLowerCase() = clientIpParameterName()
@@ -24,7 +25,8 @@ private class FlaskClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpU
}
private class DjangoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
DataFlow::MethodCallNode {
DataFlow::MethodCallNode
{
DjangoClientSuppliedIpUsedInSecurityCheck() {
exists(DataFlow::Node req, DataFlow::AttrRead headers |
// a call to request.headers.get or request.META.get
@@ -38,7 +40,8 @@ private class DjangoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIp
}
private class TornadoClientSuppliedIpUsedInSecurityCheck extends ClientSuppliedIpUsedInSecurityCheck,
DataFlow::MethodCallNode {
DataFlow::MethodCallNode
{
TornadoClientSuppliedIpUsedInSecurityCheck() {
// a call to self.request.headers.get or self.request.headers.get_list inside a tornado requesthandler
exists(

View File

@@ -1,6 +1,6 @@
/**
* @name SimpleXMLRPCServer DoS vulnerability
* @description SimpleXMLRPCServer is vulnerable to DoS attacks from untrusted user input
* @name SimpleXMLRPCServer denial of service
* @description SimpleXMLRPCServer is vulnerable to denial of service attacks from untrusted user input
* @kind problem
* @problem.severity warning
* @precision high

View File

@@ -0,0 +1,213 @@
/**
* Provides a taint-tracking configuration for detecting "UnsafeUnpacking" vulnerabilities.
*/
import python
import semmle.python.Concepts
import semmle.python.dataflow.new.internal.DataFlowPublic
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.TaintTracking
import semmle.python.frameworks.Stdlib
import semmle.python.dataflow.new.RemoteFlowSources
/**
* Handle those three cases of Tarfile opens:
* - `tarfile.open()`
* - `tarfile.TarFile()`
* - `MKtarfile.Tarfile.open()`
*/
API::Node tarfileOpen() {
result in [
API::moduleImport("tarfile").getMember(["open", "TarFile"]),
API::moduleImport("tarfile").getMember("TarFile").getASubclass().getMember("open")
]
}
/**
* A class for handling the previous three cases, plus the use of `closing` in with the previous cases
*/
class AllTarfileOpens extends API::CallNode {
AllTarfileOpens() {
this = tarfileOpen().getACall()
or
exists(API::Node closing, Node arg |
closing = API::moduleImport("contextlib").getMember("closing") and
this = closing.getACall() and
arg = this.getArg(0) and
arg = tarfileOpen().getACall()
)
}
}
class UnsafeUnpackingConfig extends TaintTracking::Configuration {
UnsafeUnpackingConfig() { this = "UnsafeUnpackingConfig" }
override predicate isSource(DataFlow::Node source) {
// A source coming from a remote location
source instanceof RemoteFlowSource
or
// A source coming from a CLI argparse module
// see argparse: https://docs.python.org/3/library/argparse.html
exists(MethodCallNode args |
args = source.(AttrRead).getObject().getALocalSource() and
args =
API::moduleImport("argparse")
.getMember("ArgumentParser")
.getReturn()
.getMember("parse_args")
.getACall()
)
or
// A source catching an S3 file download
// see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_file
source =
API::moduleImport("boto3")
.getMember("client")
.getReturn()
.getMember(["download_file", "download_fileobj"])
.getACall()
.getArg(2)
or
// A source catching an S3 file download
// see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
source =
API::moduleImport("boto3")
.getMember("Session")
.getReturn()
.getMember("client")
.getReturn()
.getMember(["download_file", "download_fileobj"])
.getACall()
.getArg(2)
or
// A source download a file using wget
// see wget: https://pypi.org/project/wget/
exists(API::CallNode mcn |
mcn = API::moduleImport("wget").getMember("download").getACall() and
if exists(mcn.getArg(1)) then source = mcn.getArg(1) else source = mcn.getReturn().asSource()
)
or
// catch the Django uploaded files as a source
// see HttpRequest.FILES: https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.HttpRequest.FILES
source.(AttrRead).getAttributeName() = "FILES"
}
override predicate isSink(DataFlow::Node sink) {
(
// A sink capturing method calls to `unpack_archive`.
sink = API::moduleImport("shutil").getMember("unpack_archive").getACall().getArg(0)
or
// A sink capturing method calls to `extractall` without `members` argument.
// For a call to `file.extractall` without `members` argument, `file` is considered a sink.
exists(MethodCallNode call, AllTarfileOpens atfo |
call = atfo.getReturn().getMember("extractall").getACall() and
not exists(call.getArgByName("members")) and
sink = call.getObject()
)
or
// A sink capturing method calls to `extractall` with `members` argument.
// For a call to `file.extractall` with `members` argument, `file` is considered a sink if not
// a the `members` argument contains a NameConstant as None, a List or call to the method `getmembers`.
// Otherwise, the argument of `members` is considered a sink.
exists(MethodCallNode call, Node arg, AllTarfileOpens atfo |
call = atfo.getReturn().getMember("extractall").getACall() and
arg = call.getArgByName("members") and
if
arg.asCfgNode() instanceof NameConstantNode or
arg.asCfgNode() instanceof ListNode
then sink = call.getObject()
else
if arg.(MethodCallNode).getMethodName() = "getmembers"
then sink = arg.(MethodCallNode).getObject()
else sink = call.getArgByName("members")
)
or
// An argument to `extract` is considered a sink.
exists(AllTarfileOpens atfo |
sink = atfo.getReturn().getMember("extract").getACall().getArg(0)
)
or
//An argument to `_extract_member` is considered a sink.
exists(MethodCallNode call, AllTarfileOpens atfo |
call = atfo.getReturn().getMember("_extract_member").getACall() and
call.getArg(1).(AttrRead).accesses(sink, "name")
)
) and
not sink.getScope().getLocation().getFile().inStdlib()
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// Reading the response
nodeTo.(MethodCallNode).calls(nodeFrom, "read")
or
// Open a file for access
exists(MethodCallNode cn |
cn.calls(nodeTo, "open") and
cn.flowsTo(nodeFrom)
)
or
// Open a file for access using builtin
exists(API::CallNode cn |
cn = API::builtin("open").getACall() and
nodeTo = cn.getArg(0) and
cn.flowsTo(nodeFrom)
)
or
// Write access
exists(MethodCallNode cn |
cn.calls(nodeTo, "write") and
nodeFrom = cn.getArg(0)
)
or
// Retrieve Django uploaded files
// see getlist(): https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.QueryDict.getlist
// see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
nodeTo.(MethodCallNode).calls(nodeFrom, ["getlist", "get", "chunks"])
or
// Considering the use of "fs"
// see fs: https://docs.djangoproject.com/en/4.1/ref/files/storage/#the-filesystemstorage-class
nodeTo =
API::moduleImport("django")
.getMember("core")
.getMember("files")
.getMember("storage")
.getMember("FileSystemStorage")
.getReturn()
.getMember(["save", "path"])
.getACall() and
nodeFrom = nodeTo.(MethodCallNode).getArg(0)
or
// Accessing the name or raw content
nodeTo.(AttrRead).accesses(nodeFrom, ["name", "raw"])
or
// Join the base_dir to the filename
nodeTo = API::moduleImport("os").getMember("path").getMember("join").getACall() and
nodeFrom = nodeTo.(API::CallNode).getArg(1)
or
// Go through an Open for a Tarfile
nodeTo = tarfileOpen().getACall() and nodeFrom = nodeTo.(MethodCallNode).getArg(0)
or
// Handle the case where the getmembers is used.
nodeTo.(MethodCallNode).calls(nodeFrom, "getmembers") and
nodeFrom instanceof AllTarfileOpens
or
// To handle the case of `with closing(tarfile.open()) as file:`
// we add a step from the first argument of `closing` to the call to `closing`,
// whenever that first argument is a return of `tarfile.open()`.
nodeTo = API::moduleImport("contextlib").getMember("closing").getACall() and
nodeFrom = nodeTo.(API::CallNode).getArg(0) and
nodeFrom = tarfileOpen().getReturn().getAValueReachableFromSource()
or
// see Path : https://docs.python.org/3/library/pathlib.html#pathlib.Path
nodeTo = API::moduleImport("pathlib").getMember("Path").getACall() and
nodeFrom = nodeTo.(API::CallNode).getArg(0)
or
// Use of absolutepath
// see absolute : https://docs.python.org/3/library/pathlib.html#pathlib.Path.absolute
exists(API::CallNode mcn |
mcn = API::moduleImport("pathlib").getMember("Path").getACall() and
nodeTo = mcn.getAMethodCall("absolute") and
nodeFrom = mcn.getArg(0)
)
}
}

View File

@@ -59,12 +59,11 @@ module InsecureRandomness {
*/
class RandomFnSink extends Sink {
RandomFnSink() {
exists(DataFlowCallable randomFn |
randomFn
.getName()
exists(Function func |
func.getName()
.regexpMatch("(?i).*(gen(erate)?|make|mk|create).*(nonce|salt|pepper|Password).*")
|
this.getEnclosingCallable() = randomFn
this.asExpr().getScope() = func
)
}
}

View File

@@ -0,0 +1,350 @@
private import python
private import semmle.python.dataflow.new.TaintTracking2
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.DataFlow2
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.frameworks.Flask
private import semmle.python.frameworks.Django
/** A method call that produces cryptographic result. */
abstract class ProduceCryptoCall extends API::CallNode {
/** Gets a type of cryptographic operation such as MAC, signature, Hash or ciphertext. */
abstract string getResultType();
}
/** Gets a reference to the `cryptography.hazmat.primitives` module. */
API::Node cryptographylib() {
result = API::moduleImport("cryptography").getMember("hazmat").getMember("primitives")
}
/** Gets a reference to the `Crypto` module. */
API::Node cryptodome() { result = API::moduleImport(["Crypto", "Cryptodome"]) }
/** A method call that produces a MAC. */
class ProduceMacCall extends ProduceCryptoCall {
ProduceMacCall() {
this = API::moduleImport("hmac").getMember("digest").getACall() or
this =
API::moduleImport("hmac")
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptodome()
.getMember("Hash")
.getMember("HMAC")
.getMember(["new", "HMAC"])
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptographylib()
.getMember("hmac")
.getMember("HMAC")
.getReturn()
.getMember("finalize")
.getACall() or
this =
cryptographylib()
.getMember("cmac")
.getMember("CMAC")
.getReturn()
.getMember("finalize")
.getACall() or
this =
cryptodome()
.getMember("Hash")
.getMember("CMAC")
.getMember(["new", "CMAC"])
.getMember(["digest", "hexdigest"])
.getACall()
}
override string getResultType() { result = "MAC" }
}
/** A method call that produces a signature. */
private class ProduceSignatureCall extends ProduceCryptoCall {
ProduceSignatureCall() {
this =
cryptodome()
.getMember("Signature")
.getMember(["DSS", "pkcs1_15", "pss", "eddsa"])
.getMember("new")
.getReturn()
.getMember("sign")
.getACall()
}
override string getResultType() { result = "signature" }
}
private string hashalgo() {
result = ["sha1", "sha224", "sha256", "sha384", "sha512", "blake2b", "blake2s", "md5"]
}
/** A method call that produces a Hash. */
private class ProduceHashCall extends ProduceCryptoCall {
ProduceHashCall() {
this =
cryptographylib()
.getMember("hashes")
.getMember("Hash")
.getReturn()
.getMember("finalize")
.getACall() or
this =
API::moduleImport("hashlib")
.getMember(["new", hashalgo()])
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall() or
this =
cryptodome()
.getMember(hashalgo())
.getMember("new")
.getReturn()
.getMember(["digest", "hexdigest"])
.getACall()
}
override string getResultType() { result = "Hash" }
}
/** A method call that produces a ciphertext. */
private class ProduceCiphertextCall extends ProduceCryptoCall {
ProduceCiphertextCall() {
this =
cryptodome()
.getMember("Cipher")
.getMember(["DES", "DES3", "ARC2", "ARC4", "Blowfish", "PKCS1_v1_5"])
.getMember(["ARC4Cipher", "new", "PKCS115_Cipher"])
.getMember("encrypt")
.getACall() or
this =
cryptographylib()
.getMember("ciphers")
.getMember("Cipher")
.getReturn()
.getMember("finalize")
.getACall()
}
override string getResultType() { result = "ciphertext" }
}
/** A data flow sink for comparison. */
private predicate existsFailFastCheck(Expr firstInput, Expr secondInput) {
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq or
compare.getOp(0) instanceof In or
compare.getOp(0) instanceof NotIn
) and
(
compare.getLeft() = firstInput and
compare.getComparator(0) = secondInput and
not compare.getAComparator() instanceof None
or
compare.getLeft() = secondInput and
compare.getComparator(0) = firstInput and
not compare.getAComparator() instanceof None
)
)
}
/** A sink that compares input using fail fast check. */
class NonConstantTimeComparisonSink extends DataFlow::Node {
Expr anotherParameter;
NonConstantTimeComparisonSink() { existsFailFastCheck(this.asExpr(), anotherParameter) }
/** Holds if remote user input was used in the comparison. */
predicate includesUserInput() {
exists(UserInputInComparisonConfig config |
config.hasFlowTo(DataFlow2::exprNode(anotherParameter))
)
}
}
/** A data flow source of the secret obtained. */
class SecretSource extends DataFlow::Node {
CredentialExpr secret;
SecretSource() { secret = this.asExpr() }
/** Holds if the secret was deliverd by remote user. */
predicate includesUserInput() {
exists(UserInputSecretConfig config | config.hasFlowTo(DataFlow2::exprNode(secret)))
}
}
/** A string for `match` that identifies strings that look like they represent secret data. */
private string suspicious() {
result =
[
"%password%", "%passwd%", "%pwd%", "%refresh%token%", "%secret%token", "%secret%key",
"%passcode%", "%passphrase%", "%token%", "%secret%", "%credential%", "%userpass%", "%digest%",
"%signature%", "%mac%"
]
}
/** A variable that may hold sensitive information, judging by its name. * */
class CredentialExpr extends Expr {
CredentialExpr() {
exists(Variable v | this = v.getAnAccess() | v.getId().toLowerCase().matches(suspicious()))
}
}
/**
* A data flow source of the client Secret obtained according to the remote endpoint identifier specified
* (`X-auth-token`, `proxy-authorization`, `X-Csrf-Header`, etc.) in the header.
*
* For example: `request.headers.get("X-Auth-Token")`.
*/
abstract class ClientSuppliedSecret extends DataFlow::CallCfgNode { }
private class FlaskClientSuppliedSecret extends ClientSuppliedSecret {
FlaskClientSuppliedSecret() {
this = Flask::request().getMember("headers").getMember(["get", "get_all", "getlist"]).getACall() and
[this.getArg(0), this.getArgByName(["key", "name"])].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
private class DjangoClientSuppliedSecret extends ClientSuppliedSecret {
DjangoClientSuppliedSecret() {
this =
PrivateDjango::DjangoImpl::DjangoHttp::Request::HttpRequest::classRef()
.getMember(["headers", "META"])
.getMember("get")
.getACall() and
[this.getArg(0), this.getArgByName("key")].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** Gets a reference to the `tornado.web.RequestHandler` module. */
API::Node requesthandler() {
result = API::moduleImport("tornado").getMember("web").getMember("RequestHandler")
}
private class TornadoClientSuppliedSecret extends ClientSuppliedSecret {
TornadoClientSuppliedSecret() {
this = requesthandler().getMember(["headers", "META"]).getMember("get").getACall() and
[this.getArg(0), this.getArgByName("key")].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** Gets a reference to the `werkzeug.datastructures.Headers` module. */
API::Node headers() {
result = API::moduleImport("werkzeug").getMember("datastructures").getMember("Headers")
}
private class WerkzeugClientSuppliedSecret extends ClientSuppliedSecret {
WerkzeugClientSuppliedSecret() {
this =
headers().getMember(["headers", "META"]).getMember(["get", "get_all", "getlist"]).getACall() and
[this.getArg(0), this.getArgByName(["key", "name"])].asExpr().(StrConst).getText().toLowerCase() =
sensitiveheaders()
}
}
/** A string for `match` that identifies strings that look like they represent Sensitive Headers. */
private string sensitiveheaders() {
result =
[
"x-auth-token", "x-csrf-token", "http_x_csrf_token", "x-csrf-param", "x-csrf-header",
"http_x_csrf_token", "x-api-key", "authorization", "proxy-authorization", "x-gitlab-token",
"www-authenticate"
]
}
/**
* A config that tracks data flow from remote user input to Variable that hold sensitive info
*/
class UserInputSecretConfig extends TaintTracking::Configuration {
UserInputSecretConfig() { this = "UserInputSecretConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink.asExpr() instanceof CredentialExpr }
}
/**
* A config that tracks data flow from remote user input to Equality test
*/
class UserInputInComparisonConfig extends TaintTracking2::Configuration {
UserInputInComparisonConfig() { this = "UserInputInComparisonConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) {
exists(Compare cmp, Expr left, Expr right, Cmpop cmpop |
cmpop.getSymbol() = ["==", "in", "is not", "!="] and
cmp.compares(left, cmpop, right) and
sink.asExpr() = [left, right]
)
}
}
/**
* A configuration tracing flow from a client Secret obtained by an HTTP header to a len() function.
*/
private class ExcludeLenFunc extends TaintTracking2::Configuration {
ExcludeLenFunc() { this = "ExcludeLenFunc" }
override predicate isSource(DataFlow::Node source) { source instanceof ClientSuppliedSecret }
override predicate isSink(DataFlow::Node sink) {
exists(Call call |
call.getFunc().(Name).getId() = "len" and
sink.asExpr() = call.getArg(0)
)
}
}
/**
* Holds if there is a fast-fail check.
*/
class CompareSink extends DataFlow::Node {
CompareSink() {
exists(Compare compare |
(
compare.getOp(0) instanceof Eq or
compare.getOp(0) instanceof NotEq
) and
(
compare.getLeft() = this.asExpr() and
not compare.getComparator(0).(StrConst).getText() = "bearer"
or
compare.getComparator(0) = this.asExpr() and
not compare.getLeft().(StrConst).getText() = "bearer"
)
)
or
exists(Compare compare |
compare.getOp(0) instanceof IsNot and
(
compare.getLeft() = this.asExpr() and
not compare.getComparator(0) instanceof None
or
compare.getComparator(0) = this.asExpr() and
not compare.getLeft() instanceof None
)
)
}
/**
* Holds if there is a flow to len().
*/
predicate flowtolen() {
exists(ExcludeLenFunc config, DataFlow2::PathNode source, DataFlow2::PathNode sink |
config.hasFlowPath(source, sink)
)
}
}

View File

@@ -1,9 +1,9 @@
/**
* @name Call graph
* @description An edge in the points-to call graph.
* @description An edge in the call graph.
* @kind problem
* @problem.severity recommendation
* @id py/meta/points-to-call-graph
* @id py/meta/call-graph
* @tags meta
* @precision very-low
*/
@@ -12,9 +12,9 @@ import python
import semmle.python.dataflow.new.internal.DataFlowPrivate
import meta.MetaMetrics
from DataFlowCall c, DataFlowCallableValue f
from DataFlowCall call, DataFlowCallable target
where
c.getCallable() = f and
not c.getLocation().getFile() instanceof IgnoredFile and
not f.getScope().getLocation().getFile() instanceof IgnoredFile
select c, "Call to $@", f.getScope(), f.toString()
target = viableCallable(call) and
not call.getLocation().getFile() instanceof IgnoredFile and
not target.getScope().getLocation().getFile() instanceof IgnoredFile
select call, "Call to $@", target.getScope(), target.toString()

View File

@@ -1,16 +1,55 @@
/**
* Provides predicates for measuring the quality of the call graph, that is,
* the number of calls that could be resolved to a callee.
* the number of calls that could be resolved to a target.
*/
import python
import meta.MetaMetrics
newtype TTarget =
TFunction(Function func) or
TClass(Class cls)
class Target extends TTarget {
/** Gets a textual representation of this element. */
abstract string toString();
/** Gets the location of this dataflow call. */
abstract Location getLocation();
/** Whether this target is relevant. */
predicate isRelevant() { exists(this.getLocation().getFile().getRelativePath()) }
}
class TargetFunction extends Target, TFunction {
Function func;
TargetFunction() { this = TFunction(func) }
override string toString() { result = func.toString() }
override Location getLocation() { result = func.getLocation() }
Function getFunction() { result = func }
}
class TargetClass extends Target, TClass {
Class cls;
TargetClass() { this = TClass(cls) }
override string toString() { result = cls.toString() }
override Location getLocation() { result = cls.getLocation() }
Class getClass() { result = cls }
}
/**
* A call that is (possibly) relevant for analysis quality.
* See `IgnoredFile` for details on what is excluded.
*/
class RelevantCall extends Call {
class RelevantCall extends CallNode {
RelevantCall() { not this.getLocation().getFile() instanceof IgnoredFile }
}
@@ -18,12 +57,16 @@ class RelevantCall extends Call {
module PointsToBasedCallGraph {
/** A call that can be resolved by points-to. */
class ResolvableCall extends RelevantCall {
Value callee;
Value targetValue;
ResolvableCall() { callee.getACall() = this.getAFlowNode() }
ResolvableCall() { targetValue.getACall() = this }
/** Gets a resolved callee of this call. */
Value getCallee() { result = callee }
/** Gets a resolved target of this call. */
Target getTarget() {
result.(TargetFunction).getFunction() = targetValue.(CallableValue).getScope()
or
result.(TargetClass).getClass() = targetValue.(ClassValue).getScope()
}
}
/** A call that cannot be resolved by points-to. */
@@ -32,34 +75,79 @@ module PointsToBasedCallGraph {
}
/**
* A call that can be resolved by points-to, where the resolved callee is relevant.
* Relevant callees include:
* - builtins
* - standard library
* A call that can be resolved by points-to, where the resolved target is relevant.
* Relevant targets include:
* - source code of the project
*/
class ResolvableCallRelevantCallee extends ResolvableCall {
ResolvableCallRelevantCallee() {
callee.isBuiltin()
or
exists(File file |
file = callee.(CallableValue).getScope().getLocation().getFile()
or
file = callee.(ClassValue).getScope().getLocation().getFile()
|
file.inStdlib()
or
// part of the source code of the project
exists(file.getRelativePath())
class ResolvableCallRelevantTarget extends ResolvableCall {
ResolvableCallRelevantTarget() {
exists(Target target | target = this.getTarget() |
exists(target.getLocation().getFile().getRelativePath())
)
}
}
/**
* A call that can be resolved by points-to, where the resolved callee is not considered relevant.
* See `ResolvableCallRelevantCallee` for the definition of relevance.
* A call that can be resolved by points-to, where the resolved target is not considered relevant.
* See `ResolvableCallRelevantTarget` for the definition of relevance.
*/
class ResolvableCallIrrelevantCallee extends ResolvableCall {
ResolvableCallIrrelevantCallee() { not this instanceof ResolvableCallRelevantCallee }
class ResolvableCallIrrelevantTarget extends ResolvableCall {
ResolvableCallIrrelevantTarget() { not this instanceof ResolvableCallRelevantTarget }
}
}
/** Provides classes for call-graph resolution by using type-tracking. */
module TypeTrackingBasedCallGraph {
private import semmle.python.dataflow.new.internal.DataFlowDispatch as TT
/** A call that can be resolved by type-tracking. */
class ResolvableCall extends RelevantCall {
ResolvableCall() {
exists(TT::TNormalCall(this, _, _))
or
TT::resolveClassCall(this, _)
}
/** Gets a resolved target of this call. */
Target getTarget() {
exists(TT::DataFlowCall call, TT::CallType ct, Function targetFunc |
call = TT::TNormalCall(this, targetFunc, ct) and
not ct instanceof TT::CallTypeClass and
targetFunc = result.(TargetFunction).getFunction()
)
or
// a TT::TNormalCall only exists when the call can be resolved to a function.
// Since points-to just says the call goes directly to the class itself, and
// type-tracking based wants to resolve this to the constructor, which might not
// exist. So to do a proper comparison, we don't require the call to be resolve to
// a specific function.
TT::resolveClassCall(this, result.(TargetClass).getClass())
}
}
/** A call that cannot be resolved by type-tracking. */
class UnresolvableCall extends RelevantCall {
UnresolvableCall() { not this instanceof ResolvableCall }
}
/**
* A call that can be resolved by type-tracking, where the resolved callee is relevant.
* Relevant targets include:
* - source code of the project
*/
class ResolvableCallRelevantTarget extends ResolvableCall {
ResolvableCallRelevantTarget() {
exists(Target target | target = this.getTarget() |
exists(target.getLocation().getFile().getRelativePath())
)
}
}
/**
* A call that can be resolved by type-tracking, where the resolved target is not considered relevant.
* See `ResolvableCallRelevantTarget` for the definition of relevance.
*/
class ResolvableCallIrrelevantTarget extends ResolvableCall {
ResolvableCallIrrelevantTarget() { not this instanceof ResolvableCallRelevantTarget }
}
}

View File

@@ -11,4 +11,4 @@
import python
import CallGraphQuality
select projectRoot(), count(PointsToBasedCallGraph::ResolvableCallRelevantCallee call)
select projectRoot(), count(PointsToBasedCallGraph::ResolvableCallRelevantTarget call)

View File

@@ -0,0 +1,17 @@
/**
* @name New call graph edge from using type-tracking instead of points-to
* @kind problem
* @problem.severity recommendation
* @id py/meta/type-tracking-call-graph
* @tags meta
* @precision very-low
*/
import python
import CallGraphQuality
from CallNode call, Target target
where
target.isRelevant() and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
select call, "$@ to $@", call, "Call", target, target.toString()

View File

@@ -0,0 +1,18 @@
/**
* @name Missing call graph edge from using type-tracking instead of points-to
* @kind problem
* @problem.severity recommendation
* @id py/meta/call-graph-missing
* @tags meta
* @precision very-low
*/
import python
import CallGraphQuality
from CallNode call, Target target
where
target.isRelevant() and
call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
not call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
select call, "MISSING: $@ to $@", call, "Call", target, target.toString()

View File

@@ -0,0 +1,18 @@
/**
* @name New call graph edge from using type-tracking instead of points-to
* @kind problem
* @problem.severity recommendation
* @id py/meta/call-graph-new
* @tags meta
* @precision very-low
*/
import python
import CallGraphQuality
from CallNode call, Target target
where
target.isRelevant() and
not call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
select call, "NEW: $@ to $@", call, "Call", target, target.toString()

View File

@@ -0,0 +1,19 @@
/**
* @name New call graph edge from using type-tracking instead of points-to, that is ambiguous
* @kind problem
* @problem.severity recommendation
* @id py/meta/call-graph-new-ambiguous
* @tags meta
* @precision very-low
*/
import python
import CallGraphQuality
from CallNode call, Target target
where
target.isRelevant() and
not call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target and
1 < count(call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget())
select call, "NEW: $@ to $@", call, "Call", target, target.toString()

View File

@@ -0,0 +1,35 @@
/**
* @name Call graph edge overview from using type-tracking instead of points-to
* @id py/meta/call-graph-overview
* @precision very-low
*/
import python
import CallGraphQuality
from string tag, int c
where
tag = "SHARED" and
c =
count(CallNode call, Target target |
target.isRelevant() and
call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
)
or
tag = "NEW" and
c =
count(CallNode call, Target target |
target.isRelevant() and
not call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
)
or
tag = "MISSING" and
c =
count(CallNode call, Target target |
target.isRelevant() and
call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
not call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
)
select tag, c

View File

@@ -0,0 +1,18 @@
/**
* @name Shared call graph edge from using type-tracking instead of points-to
* @kind problem
* @problem.severity recommendation
* @id py/meta/call-graph-shared
* @tags meta
* @precision very-low
*/
import python
import CallGraphQuality
from CallNode call, Target target
where
target.isRelevant() and
call.(PointsToBasedCallGraph::ResolvableCall).getTarget() = target and
call.(TypeTrackingBasedCallGraph::ResolvableCall).getTarget() = target
select call, "SHARED: $@ to $@", call, "Call", target, target.toString()

View File

@@ -1,12 +1,11 @@
name: codeql/python-queries
version: 0.6.3-dev
version: 0.7.1-dev
groups:
- python
- queries
dependencies:
codeql/python-all: ${workspace}
codeql/suite-helpers: ${workspace}
codeql/util: ${workspace}
suites: codeql-suites
extractor: python
defaultSuiteFile: codeql-suites/python-code-scanning.qls

View File

@@ -1 +0,0 @@
<queries language="python"/>