mirror of
https://github.com/github/codeql.git
synced 2025-12-18 01:33:15 +01:00
Merge pull request #11570 from Sim4n6/UnsafeUnpack
Python: Unsafe unpacking using `shutil.unpack_archive()` query and tests
This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
<!DOCTYPE qhelp PUBLIC
|
||||
"-//Semmle//qhelp//EN"
|
||||
"qhelp.dtd">
|
||||
<qhelp>
|
||||
|
||||
<overview>
|
||||
<p>Extracting files from a malicious tarball without validating that the destination file path
|
||||
is within the destination directory using <code>shutil.unpack_archive()</code> can cause files outside the
|
||||
destination directory to be overwritten, due to the possible presence of directory traversal elements
|
||||
(<code>..</code>) in archive path names.</p>
|
||||
|
||||
<p>Tarball contain archive entries representing each file in the archive. These entries
|
||||
include a file path for the entry, but these file paths are not restricted and may contain
|
||||
unexpected special elements such as the directory traversal element (<code>..</code>). If these
|
||||
file paths are used to determine an output file to write the contents of the archive item to, then
|
||||
the file may be written to an unexpected location. This can result in sensitive information being
|
||||
revealed or deleted, or an attacker being able to influence behavior by modifying unexpected
|
||||
files.</p>
|
||||
|
||||
<p>For example, if a tarball contains a file entry <code>../sneaky-file.txt</code>, and the tarball
|
||||
is extracted to the directory <code>/tmp/tmp123</code>, then naively combining the paths would result
|
||||
in an output file path of <code>/tmp/tmp123/../sneaky-file.txt</code>, which would cause the file to be
|
||||
written to <code>/tmp/</code>.</p>
|
||||
|
||||
</overview>
|
||||
<recommendation>
|
||||
|
||||
<p>Ensure that output paths constructed from tarball entries are validated
|
||||
to prevent writing files to unexpected locations.</p>
|
||||
|
||||
<p>Consider using a safer module, such as: <code>zipfile</code></p>
|
||||
|
||||
</recommendation>
|
||||
|
||||
<example>
|
||||
<p>
|
||||
In this example an archive is extracted without validating file paths.
|
||||
</p>
|
||||
|
||||
<sample src="examples/HIT_UnsafeUnpack.py" />
|
||||
|
||||
<p>To fix this vulnerability, we need to call the function <code>tarfile.extract()</code>
|
||||
on each <code>member</code> after verifying that it does not contain either <code>..</code> or startswith <code>/</code>.
|
||||
</p>
|
||||
|
||||
<sample src="examples/NoHIT_UnsafeUnpack.py" />
|
||||
|
||||
</example>
|
||||
<references>
|
||||
|
||||
<li>
|
||||
Shutil official documentation
|
||||
<a href="https://docs.python.org/3/library/shutil.html?highlight=unpack_archive#shutil.unpack_archive">shutil.unpack_archive() warning.</a>
|
||||
</li>
|
||||
</references>
|
||||
</qhelp>
|
||||
@@ -0,0 +1,24 @@
|
||||
/**
|
||||
* @name Arbitrary file write during a tarball extraction from a user controlled source
|
||||
* @description Extracting files from a potentially malicious tarball using `shutil.unpack_archive()` without validating
|
||||
* that the destination file path is within the destination directory can cause files outside
|
||||
* the destination directory to be overwritten. More precisely, if the tarball comes from a user controlled
|
||||
* location either a remote one or cli argument.
|
||||
* @kind path-problem
|
||||
* @id py/unsafe-unpacking
|
||||
* @problem.severity error
|
||||
* @security-severity 7.5
|
||||
* @precision high
|
||||
* @tags security
|
||||
* experimental
|
||||
* external/cwe/cwe-022
|
||||
*/
|
||||
|
||||
import python
|
||||
import experimental.Security.UnsafeUnpackQuery
|
||||
import DataFlow::PathGraph
|
||||
|
||||
from UnsafeUnpackingConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
|
||||
where config.hasFlowPath(source, sink)
|
||||
select sink.getNode(), source, sink,
|
||||
"Unsafe extraction from a malicious tarball retrieved from a remote location."
|
||||
@@ -0,0 +1,12 @@
|
||||
import requests
|
||||
import shutil
|
||||
|
||||
url = "https://www.someremote.location/tarball.tar.gz"
|
||||
response = requests.get(url, stream=True)
|
||||
|
||||
tarpath = "/tmp/tmp456/tarball.tar.gz"
|
||||
with open(tarpath, "wb") as f:
|
||||
f.write(response.raw.read())
|
||||
|
||||
untarredpath = "/tmp/tmp123"
|
||||
shutil.unpack_archive(tarpath, untarredpath)
|
||||
@@ -0,0 +1,17 @@
|
||||
import requests
|
||||
import tarfile
|
||||
|
||||
url = "https://www.someremote.location/tarball.tar.gz"
|
||||
response = requests.get(url, stream=True)
|
||||
|
||||
tarpath = "/tmp/tmp456/tarball.tar.gz"
|
||||
with open(tarpath, "wb") as f:
|
||||
f.write(response.raw.read())
|
||||
|
||||
untarredpath = "/tmp/tmp123"
|
||||
with tarfile.open(tarpath) as tar:
|
||||
for member in tar.getmembers():
|
||||
if member.name.startswith("/") or ".." in member.name:
|
||||
raise Exception("Path traversal identified in tarball")
|
||||
|
||||
tar.extract(untarredpath, member)
|
||||
213
python/ql/src/experimental/Security/UnsafeUnpackQuery.qll
Normal file
213
python/ql/src/experimental/Security/UnsafeUnpackQuery.qll
Normal file
@@ -0,0 +1,213 @@
|
||||
/**
|
||||
* Provides a taint-tracking configuration for detecting "UnsafeUnpacking" vulnerabilities.
|
||||
*/
|
||||
|
||||
import python
|
||||
import semmle.python.Concepts
|
||||
import semmle.python.dataflow.new.internal.DataFlowPublic
|
||||
import semmle.python.ApiGraphs
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.frameworks.Stdlib
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
|
||||
/**
|
||||
* Handle those three cases of Tarfile opens:
|
||||
* - `tarfile.open()`
|
||||
* - `tarfile.TarFile()`
|
||||
* - `MKtarfile.Tarfile.open()`
|
||||
*/
|
||||
API::Node tarfileOpen() {
|
||||
result in [
|
||||
API::moduleImport("tarfile").getMember(["open", "TarFile"]),
|
||||
API::moduleImport("tarfile").getMember("TarFile").getASubclass().getMember("open")
|
||||
]
|
||||
}
|
||||
|
||||
/**
|
||||
* A class for handling the previous three cases, plus the use of `closing` in with the previous cases
|
||||
*/
|
||||
class AllTarfileOpens extends API::CallNode {
|
||||
AllTarfileOpens() {
|
||||
this = tarfileOpen().getACall()
|
||||
or
|
||||
exists(API::Node closing, Node arg |
|
||||
closing = API::moduleImport("contextlib").getMember("closing") and
|
||||
this = closing.getACall() and
|
||||
arg = this.getArg(0) and
|
||||
arg = tarfileOpen().getACall()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class UnsafeUnpackingConfig extends TaintTracking::Configuration {
|
||||
UnsafeUnpackingConfig() { this = "UnsafeUnpackingConfig" }
|
||||
|
||||
override predicate isSource(DataFlow::Node source) {
|
||||
// A source coming from a remote location
|
||||
source instanceof RemoteFlowSource
|
||||
or
|
||||
// A source coming from a CLI argparse module
|
||||
// see argparse: https://docs.python.org/3/library/argparse.html
|
||||
exists(MethodCallNode args |
|
||||
args = source.(AttrRead).getObject().getALocalSource() and
|
||||
args =
|
||||
API::moduleImport("argparse")
|
||||
.getMember("ArgumentParser")
|
||||
.getReturn()
|
||||
.getMember("parse_args")
|
||||
.getACall()
|
||||
)
|
||||
or
|
||||
// A source catching an S3 file download
|
||||
// see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_file
|
||||
source =
|
||||
API::moduleImport("boto3")
|
||||
.getMember("client")
|
||||
.getReturn()
|
||||
.getMember(["download_file", "download_fileobj"])
|
||||
.getACall()
|
||||
.getArg(2)
|
||||
or
|
||||
// A source catching an S3 file download
|
||||
// see boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
|
||||
source =
|
||||
API::moduleImport("boto3")
|
||||
.getMember("Session")
|
||||
.getReturn()
|
||||
.getMember("client")
|
||||
.getReturn()
|
||||
.getMember(["download_file", "download_fileobj"])
|
||||
.getACall()
|
||||
.getArg(2)
|
||||
or
|
||||
// A source download a file using wget
|
||||
// see wget: https://pypi.org/project/wget/
|
||||
exists(API::CallNode mcn |
|
||||
mcn = API::moduleImport("wget").getMember("download").getACall() and
|
||||
if exists(mcn.getArg(1)) then source = mcn.getArg(1) else source = mcn.getReturn().asSource()
|
||||
)
|
||||
or
|
||||
// catch the Django uploaded files as a source
|
||||
// see HttpRequest.FILES: https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.HttpRequest.FILES
|
||||
source.(AttrRead).getAttributeName() = "FILES"
|
||||
}
|
||||
|
||||
override predicate isSink(DataFlow::Node sink) {
|
||||
(
|
||||
// A sink capturing method calls to `unpack_archive`.
|
||||
sink = API::moduleImport("shutil").getMember("unpack_archive").getACall().getArg(0)
|
||||
or
|
||||
// A sink capturing method calls to `extractall` without `members` argument.
|
||||
// For a call to `file.extractall` without `members` argument, `file` is considered a sink.
|
||||
exists(MethodCallNode call, AllTarfileOpens atfo |
|
||||
call = atfo.getReturn().getMember("extractall").getACall() and
|
||||
not exists(call.getArgByName("members")) and
|
||||
sink = call.getObject()
|
||||
)
|
||||
or
|
||||
// A sink capturing method calls to `extractall` with `members` argument.
|
||||
// For a call to `file.extractall` with `members` argument, `file` is considered a sink if not
|
||||
// a the `members` argument contains a NameConstant as None, a List or call to the method `getmembers`.
|
||||
// Otherwise, the argument of `members` is considered a sink.
|
||||
exists(MethodCallNode call, Node arg, AllTarfileOpens atfo |
|
||||
call = atfo.getReturn().getMember("extractall").getACall() and
|
||||
arg = call.getArgByName("members") and
|
||||
if
|
||||
arg.asCfgNode() instanceof NameConstantNode or
|
||||
arg.asCfgNode() instanceof ListNode
|
||||
then sink = call.getObject()
|
||||
else
|
||||
if arg.(MethodCallNode).getMethodName() = "getmembers"
|
||||
then sink = arg.(MethodCallNode).getObject()
|
||||
else sink = call.getArgByName("members")
|
||||
)
|
||||
or
|
||||
// An argument to `extract` is considered a sink.
|
||||
exists(AllTarfileOpens atfo |
|
||||
sink = atfo.getReturn().getMember("extract").getACall().getArg(0)
|
||||
)
|
||||
or
|
||||
//An argument to `_extract_member` is considered a sink.
|
||||
exists(MethodCallNode call, AllTarfileOpens atfo |
|
||||
call = atfo.getReturn().getMember("_extract_member").getACall() and
|
||||
call.getArg(1).(AttrRead).accesses(sink, "name")
|
||||
)
|
||||
) and
|
||||
not sink.getScope().getLocation().getFile().inStdlib()
|
||||
}
|
||||
|
||||
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
|
||||
// Reading the response
|
||||
nodeTo.(MethodCallNode).calls(nodeFrom, "read")
|
||||
or
|
||||
// Open a file for access
|
||||
exists(MethodCallNode cn |
|
||||
cn.calls(nodeTo, "open") and
|
||||
cn.flowsTo(nodeFrom)
|
||||
)
|
||||
or
|
||||
// Open a file for access using builtin
|
||||
exists(API::CallNode cn |
|
||||
cn = API::builtin("open").getACall() and
|
||||
nodeTo = cn.getArg(0) and
|
||||
cn.flowsTo(nodeFrom)
|
||||
)
|
||||
or
|
||||
// Write access
|
||||
exists(MethodCallNode cn |
|
||||
cn.calls(nodeTo, "write") and
|
||||
nodeFrom = cn.getArg(0)
|
||||
)
|
||||
or
|
||||
// Retrieve Django uploaded files
|
||||
// see getlist(): https://docs.djangoproject.com/en/4.1/ref/request-response/#django.http.QueryDict.getlist
|
||||
// see chunks(): https://docs.djangoproject.com/en/4.1/ref/files/uploads/#django.core.files.uploadedfile.UploadedFile.chunks
|
||||
nodeTo.(MethodCallNode).calls(nodeFrom, ["getlist", "get", "chunks"])
|
||||
or
|
||||
// Considering the use of "fs"
|
||||
// see fs: https://docs.djangoproject.com/en/4.1/ref/files/storage/#the-filesystemstorage-class
|
||||
nodeTo =
|
||||
API::moduleImport("django")
|
||||
.getMember("core")
|
||||
.getMember("files")
|
||||
.getMember("storage")
|
||||
.getMember("FileSystemStorage")
|
||||
.getReturn()
|
||||
.getMember(["save", "path"])
|
||||
.getACall() and
|
||||
nodeFrom = nodeTo.(MethodCallNode).getArg(0)
|
||||
or
|
||||
// Accessing the name or raw content
|
||||
nodeTo.(AttrRead).accesses(nodeFrom, ["name", "raw"])
|
||||
or
|
||||
// Join the base_dir to the filename
|
||||
nodeTo = API::moduleImport("os").getMember("path").getMember("join").getACall() and
|
||||
nodeFrom = nodeTo.(API::CallNode).getArg(1)
|
||||
or
|
||||
// Go through an Open for a Tarfile
|
||||
nodeTo = tarfileOpen().getACall() and nodeFrom = nodeTo.(MethodCallNode).getArg(0)
|
||||
or
|
||||
// Handle the case where the getmembers is used.
|
||||
nodeTo.(MethodCallNode).calls(nodeFrom, "getmembers") and
|
||||
nodeFrom instanceof AllTarfileOpens
|
||||
or
|
||||
// To handle the case of `with closing(tarfile.open()) as file:`
|
||||
// we add a step from the first argument of `closing` to the call to `closing`,
|
||||
// whenever that first argument is a return of `tarfile.open()`.
|
||||
nodeTo = API::moduleImport("contextlib").getMember("closing").getACall() and
|
||||
nodeFrom = nodeTo.(API::CallNode).getArg(0) and
|
||||
nodeFrom = tarfileOpen().getReturn().getAValueReachableFromSource()
|
||||
or
|
||||
// see Path : https://docs.python.org/3/library/pathlib.html#pathlib.Path
|
||||
nodeTo = API::moduleImport("pathlib").getMember("Path").getACall() and
|
||||
nodeFrom = nodeTo.(API::CallNode).getArg(0)
|
||||
or
|
||||
// Use of absolutepath
|
||||
// see absolute : https://docs.python.org/3/library/pathlib.html#pathlib.Path.absolute
|
||||
exists(API::CallNode mcn |
|
||||
mcn = API::moduleImport("pathlib").getMember("Path").getACall() and
|
||||
nodeTo = mcn.getAMethodCall("absolute") and
|
||||
nodeFrom = mcn.getArg(0)
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user