Merge pull request #4354 from RasmusWL/python-command-execution-modeling

Python: Better command execution modeling
This commit is contained in:
Taus
2020-10-02 16:14:34 +02:00
committed by GitHub
10 changed files with 521 additions and 56 deletions

View File

@@ -27,7 +27,34 @@ class CommandInjectionConfiguration extends TaintTracking::Configuration {
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) {
sink = any(SystemCommandExecution e).getCommand()
sink = any(SystemCommandExecution e).getCommand() and
// Since the implementation of standard library functions such `os.popen` looks like
// ```py
// def popen(cmd, mode="r", buffering=-1):
// ...
// proc = subprocess.Popen(cmd, ...)
// ```
// any time we would report flow to the `os.popen` sink, we can ALSO report the flow
// from the `cmd` parameter to the `subprocess.Popen` sink -- obviously we don't
// want that.
//
// However, simply removing taint edges out of a sink is not a good enough solution,
// since we would only flag one of the `os.system` calls in the following example
// due to use-use flow
// ```py
// os.system(cmd)
// os.system(cmd)
// ```
//
// Best solution I could come up with is to exclude all sinks inside the `os` and
// `subprocess` modules. This does have a downside: If we have overlooked a function
// in any of these, that internally runs a command, we no longer give an alert :|
//
// This does not only affect `os.popen`, but also the helper functions in
// `subprocess`. See:
// https://github.com/python/cpython/blob/fa7ce080175f65d678a7d5756c94f82887fc9803/Lib/os.py#L974
// https://github.com/python/cpython/blob/fa7ce080175f65d678a7d5756c94f82887fc9803/Lib/subprocess.py#L341
not sink.getScope().getEnclosingModule().getName() in ["os", "subprocess"]
}
}

View File

@@ -11,8 +11,11 @@ private import experimental.semmle.python.Concepts
/** Provides models for the Python standard library. */
private module Stdlib {
// ---------------------------------------------------------------------------
// os
// ---------------------------------------------------------------------------
/** Gets a reference to the `os` module. */
DataFlow::Node os(DataFlow::TypeTracker t) {
private DataFlow::Node os(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importModule("os")
or
@@ -22,53 +25,60 @@ private module Stdlib {
/** Gets a reference to the `os` module. */
DataFlow::Node os() { result = os(DataFlow::TypeTracker::end()) }
/**
* Gets a reference to the attribute `attr_name` of the `os` module.
* WARNING: Only holds for a few predefined attributes.
*
* For example, using `attr_name = "system"` will get all uses of `os.system`.
*/
private DataFlow::Node os_attr(DataFlow::TypeTracker t, string attr_name) {
attr_name in ["system", "popen",
// exec
"execl", "execle", "execlp", "execlpe", "execv", "execve", "execvp", "execvpe",
// spawn
"spawnl", "spawnle", "spawnlp", "spawnlpe", "spawnv", "spawnve", "spawnvp", "spawnvpe",
"posix_spawn", "posix_spawnp",
// modules
"path"] and
(
t.start() and
result = DataFlow::importMember("os", attr_name)
or
t.startInAttr(attr_name) and
result = DataFlow::importModule("os")
)
or
// Due to bad performance when using normal setup with `os_attr(t2, attr_name).track(t2, t)`
// we have inlined that code and forced a join
exists(DataFlow::TypeTracker t2 |
exists(DataFlow::StepSummary summary |
os_attr_first_join(t2, attr_name, result, summary) and
t = t2.append(summary)
)
)
}
pragma[nomagic]
private predicate os_attr_first_join(
DataFlow::TypeTracker t2, string attr_name, DataFlow::Node res, DataFlow::StepSummary summary
) {
DataFlow::StepSummary::step(os_attr(t2, attr_name), res, summary)
}
/**
* Gets a reference to the attribute `attr_name` of the `os` module.
* WARNING: Only holds for a few predefined attributes.
*
* For example, using `"system"` will get all uses of `os.system`.
*/
private DataFlow::Node os_attr(string attr_name) {
result = os_attr(DataFlow::TypeTracker::end(), attr_name)
}
/** Provides models for the `os` module. */
module os {
/** Gets a reference to the `os.system` function. */
DataFlow::Node system(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importMember("os", "system")
or
t.startInAttr("system") and
result = os()
or
exists(DataFlow::TypeTracker t2 | result = os::system(t2).track(t2, t))
}
/** Gets a reference to the `os.system` function. */
DataFlow::Node system() { result = os::system(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `os.popen` function. */
DataFlow::Node popen(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importMember("os", "popen")
or
t.startInAttr("popen") and
result = os()
or
exists(DataFlow::TypeTracker t2 | result = os::popen(t2).track(t2, t))
}
/** Gets a reference to the `os.popen` function. */
DataFlow::Node popen() { result = os::popen(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `os.path` module. */
private DataFlow::Node path(DataFlow::TypeTracker t) {
t.start() and
(
result = DataFlow::importMember("os", "path")
or
result = DataFlow::importModule("os.path")
)
or
t.startInAttr("path") and
result = os()
or
exists(DataFlow::TypeTracker t2 | result = path(t2).track(t2, t))
}
/** Gets a reference to the `os.path` module. */
DataFlow::Node path() { result = path(DataFlow::TypeTracker::end()) }
DataFlow::Node path() { result = os_attr("path") }
/** Provides models for the `os.path` module */
module path {
@@ -83,7 +93,7 @@ private module Stdlib {
exists(DataFlow::TypeTracker t2 | result = join(t2).track(t2, t))
}
/** Gets a reference to the `os.join` module. */
/** Gets a reference to the `os.path.join` function. */
DataFlow::Node join() { result = join(DataFlow::TypeTracker::end()) }
}
}
@@ -93,7 +103,7 @@ private module Stdlib {
* See https://docs.python.org/3/library/os.html#os.system
*/
private class OsSystemCall extends SystemCommandExecution::Range {
OsSystemCall() { this.asCfgNode().(CallNode).getFunction() = os::system().asCfgNode() }
OsSystemCall() { this.asCfgNode().(CallNode).getFunction() = os_attr("system").asCfgNode() }
override DataFlow::Node getCommand() {
result.asCfgNode() = this.asCfgNode().(CallNode).getArg(0)
@@ -105,7 +115,57 @@ private module Stdlib {
* See https://docs.python.org/3/library/os.html#os.popen
*/
private class OsPopenCall extends SystemCommandExecution::Range {
OsPopenCall() { this.asCfgNode().(CallNode).getFunction() = os::popen().asCfgNode() }
OsPopenCall() { this.asCfgNode().(CallNode).getFunction() = os_attr("popen").asCfgNode() }
override DataFlow::Node getCommand() {
result.asCfgNode() = this.asCfgNode().(CallNode).getArg(0)
}
}
/**
* A call to any of the `os.exec*` functions
* See https://docs.python.org/3.8/library/os.html#os.execl
*/
private class OsExecCall extends SystemCommandExecution::Range {
OsExecCall() {
exists(string name |
name in ["execl", "execle", "execlp", "execlpe", "execv", "execve", "execvp", "execvpe"] and
this.asCfgNode().(CallNode).getFunction() = os_attr(name).asCfgNode()
)
}
override DataFlow::Node getCommand() {
result.asCfgNode() = this.asCfgNode().(CallNode).getArg(0)
}
}
/**
* A call to any of the `os.spawn*` functions
* See https://docs.python.org/3.8/library/os.html#os.spawnl
*/
private class OsSpawnCall extends SystemCommandExecution::Range {
OsSpawnCall() {
exists(string name |
name in ["spawnl", "spawnle", "spawnlp", "spawnlpe", "spawnv", "spawnve", "spawnvp",
"spawnvpe"] and
this.asCfgNode().(CallNode).getFunction() = os_attr(name).asCfgNode()
)
}
override DataFlow::Node getCommand() {
result.asCfgNode() = this.asCfgNode().(CallNode).getArg(1)
}
}
/**
* A call to any of the `os.posix_spawn*` functions
* See https://docs.python.org/3.8/library/os.html#os.posix_spawn
*/
private class OsPosixSpawnCall extends SystemCommandExecution::Range {
OsPosixSpawnCall() {
this.asCfgNode().(CallNode).getFunction() =
os_attr(["posix_spawn", "posix_spawnp"]).asCfgNode()
}
override DataFlow::Node getCommand() {
result.asCfgNode() = this.asCfgNode().(CallNode).getArg(0)
@@ -123,4 +183,148 @@ private module Stdlib {
// TODO: Handle pathlib (like we do for os.path.join)
}
}
// ---------------------------------------------------------------------------
// subprocess
// ---------------------------------------------------------------------------
/** Gets a reference to the `subprocess` module. */
private DataFlow::Node subprocess(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importModule("subprocess")
or
exists(DataFlow::TypeTracker t2 | result = subprocess(t2).track(t2, t))
}
/** Gets a reference to the `subprocess` module. */
DataFlow::Node subprocess() { result = subprocess(DataFlow::TypeTracker::end()) }
/**
* Gets a reference to the attribute `attr_name` of the `subprocess` module.
* WARNING: Only holds for a few predefined attributes.
*
* For example, using `attr_name = "Popen"` will get all uses of `subprocess.Popen`.
*/
private DataFlow::Node subprocess_attr(DataFlow::TypeTracker t, string attr_name) {
attr_name in ["Popen", "call", "check_call", "check_output", "run"] and
(
t.start() and
result = DataFlow::importMember("subprocess", attr_name)
or
t.startInAttr(attr_name) and
result = DataFlow::importModule("subprocess")
)
or
// Due to bad performance when using normal setup with `subprocess_attr(t2, attr_name).track(t2, t)`
// we have inlined that code and forced a join
exists(DataFlow::TypeTracker t2 |
exists(DataFlow::StepSummary summary |
subprocess_attr_first_join(t2, attr_name, result, summary) and
t = t2.append(summary)
)
)
}
pragma[nomagic]
private predicate subprocess_attr_first_join(
DataFlow::TypeTracker t2, string attr_name, DataFlow::Node res, DataFlow::StepSummary summary
) {
DataFlow::StepSummary::step(subprocess_attr(t2, attr_name), res, summary)
}
/**
* Gets a reference to the attribute `attr_name` of the `subprocess` module.
* WARNING: Only holds for a few predefined attributes.
*
* For example, using `attr_name = "Popen"` will get all uses of `subprocess.Popen`.
*/
private DataFlow::Node subprocess_attr(string attr_name) {
result = subprocess_attr(DataFlow::TypeTracker::end(), attr_name)
}
/**
* A call to `subprocess.Popen` or helper functions (call, check_call, check_output, run)
* See https://docs.python.org/3.8/library/subprocess.html#subprocess.Popen
*/
private class SubprocessPopenCall extends SystemCommandExecution::Range {
CallNode call;
SubprocessPopenCall() {
call = this.asCfgNode() and
exists(string name |
name in ["Popen", "call", "check_call", "check_output", "run"] and
call.getFunction() = subprocess_attr(name).asCfgNode()
)
}
/** Gets the ControlFlowNode for the `args` argument, if any. */
private ControlFlowNode get_args_arg() {
result = call.getArg(0)
or
result = call.getArgByName("args")
}
/** Gets the ControlFlowNode for the `shell` argument, if any. */
private ControlFlowNode get_shell_arg() {
result = call.getArg(8)
or
result = call.getArgByName("shell")
}
private boolean get_shell_arg_value() {
not exists(this.get_shell_arg()) and
result = false
or
exists(ControlFlowNode shell_arg | shell_arg = this.get_shell_arg() |
result = shell_arg.getNode().(ImmutableLiteral).booleanValue()
or
// TODO: Track the "shell" argument to determine possible values
not shell_arg.getNode() instanceof ImmutableLiteral and
(
result = true
or
result = false
)
)
}
/** Gets the ControlFlowNode for the `executable` argument, if any. */
private ControlFlowNode get_executable_arg() {
result = call.getArg(2)
or
result = call.getArgByName("executable")
}
override DataFlow::Node getCommand() {
// TODO: Track arguments ("args" and "shell")
// TODO: Handle using `args=["sh", "-c", <user-input>]`
result.asCfgNode() = this.get_executable_arg()
or
exists(ControlFlowNode arg_args, boolean shell |
arg_args = get_args_arg() and
shell = get_shell_arg_value()
|
// When "executable" argument is set, and "shell" argument is `False`, the
// "args" argument will only be used to set the program name and arguments to
// the program, so we should not consider any of them as command execution.
not (
exists(this.get_executable_arg()) and
shell = false
) and
(
// When the "args" argument is an iterable, first element is the command to
// run, so if we're able to, we only mark the first element as the command
// (and not the arguments to the command).
//
result.asCfgNode() = arg_args.(SequenceNode).getElement(0)
or
// Either the "args" argument is not a sequence (which is valid) or we where
// just not able to figure it out. Simply mark the "args" argument as the
// command.
//
not arg_args instanceof SequenceNode and
result.asCfgNode() = arg_args
)
)
}
}
}