Add TarSlip Improv query

This commit is contained in:
ALJI Mohamed
2022-10-19 14:01:40 +01:00
parent caaee26ae5
commit d6fa745279
38 changed files with 672 additions and 0 deletions

View File

@@ -0,0 +1,60 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>Extracting files from a malicious tarball without validating that the destination file path
is within the destination directory can cause files outside the destination directory to be
overwritten, due to the possible presence of directory traversal elements (<code>..</code>) in
archive path names.</p>
<p>Tarball contain archive entries representing each file in the archive. These entries
include a file path for the entry, but these file paths are not restricted and may contain
unexpected special elements such as the directory traversal element (<code>..</code>). If these
file paths are used to determine an output file to write the contents of the archive item to, then
the file may be written to an unexpected location. This can result in sensitive information being
revealed or deleted, or an attacker being able to influence behavior by modifying unexpected
files.</p>
<p>For example, if a tarball contains a file entry <code>../sneaky-file</code>, and the tarball
is extracted to the directory <code>/tmp/tmp123</code>, then naively combining the paths would result
in an output file path of <code>/tmp/tmp123/../sneaky-file</code>, which would cause the file to be
written to <code>/tmp/</code>.</p>
</overview>
<recommendation>
<p>Ensure that output paths constructed from tarball entries are validated
to prevent writing files to unexpected locations.</p>
<p>The recommended way of writing an output file from a tarball entry is to call <code>extract()</code> or <code>extractall()</code>.
</p>
</recommendation>
<example>
<p>
In this example an archive is extracted without validating file paths.
</p>
<sample src="examples/TarSlip_1.py" />
<p>To fix this vulnerability, we need to call the function <code>extractall()</code>.
</p>
<sample src="examples/NoHIT_TarSlip_1.py" />
</example>
<references>
<li>
Snyk:
<a href="https://snyk.io/research/zip-slip-vulnerability">Zip Slip Vulnerability</a>.
</li>
<li>
Tarfile documentation
<a href="https://docs.python.org/3/library/tarfile.html#tarfile.TarFile.extractall">extractall() warning</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,108 @@
/**
* @name Arbitrary file write during tarfile extraction
* @description Extracting files from a malicious tar archive without validating that the
* destination file path is within the destination directory can cause files outside
* the destination directory to be overwritten.
* @kind path-problem
* @id py/tarslip
* @problem.severity error
* @security-severity 7.5
* @precision high
* @tags security
* external/cwe/cwe-022
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import DataFlow::PathGraph
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.internal.Attributes
import semmle.python.dataflow.new.BarrierGuards
import semmle.python.dataflow.new.RemoteFlowSources
/**
* Handle those three cases of Tarfile opens:
* - `tarfile.open()`
* - `tarfile.TarFile()`
* - `MKtarfile.Tarfile.open()`
*/
API::Node tarfileOpen() {
result in [
API::moduleImport("tarfile").getMember(["open", "TarFile"]),
API::moduleImport("tarfile").getMember("TarFile").getASubclass().getMember("open")
]
}
/**
* Handle the previous three cases, plus the use of `closing` in the previous cases
*/
class AllTarfileOpens extends API::CallNode {
AllTarfileOpens() {
this = tarfileOpen().getACall()
or
exists(API::Node closing, Node arg |
closing = API::moduleImport("contextlib").getMember("closing") and
this = closing.getACall() and
arg = this.getArg(0) and
arg = tarfileOpen().getACall()
)
}
}
/**
* A taint-tracking configuration for detecting more "TarSlip" vulnerabilities.
*/
class Configuration extends TaintTracking::Configuration {
Configuration() { this = "TarSlip" }
override predicate isSource(DataFlow::Node source) { source instanceof AllTarfileOpens }
override predicate isSink(DataFlow::Node sink) {
// A sink capturing method calls to `extractall` without `members` argument.
// For a call to `file.extractall` without `members` argument, `file` is considered a sink.
exists(MethodCallNode call , AllTarfileOpens atfo|
call = atfo.getReturn().getMember("extractall").getACall() and
not exists(Node arg | arg = call.getArgByName("members")) and
sink = call.getObject()
)
or
// A sink capturing method calls to `extractall` with `members` argument.
// For a call to `file.extractall` with `members` argument, `file` is considered a sink if not
// a the `members` argument contains a NameConstant as None, a List or call to the method `getmembers`.
// Otherwise, the argument of `members` is considered a sink.
exists(MethodCallNode call, Node arg, AllTarfileOpens atfo|
call = atfo.getReturn().getMember("extractall").getACall() and
arg = call.getArgByName("members") and
if
arg.asCfgNode() instanceof NameConstantNode or
arg.asCfgNode() instanceof ListNode
then sink = call.getObject()
else
if arg.(MethodCallNode).getMethodName() = "getmembers"
then sink = arg.(MethodCallNode).getObject()
else sink = call.getArgByName("members")
)
or
// An argument to `extract` is considered a sink.
exists(AllTarfileOpens atfo | sink = atfo.getReturn().getMember("extract").getACall().getArg(0))
or
//An argument to `_extract_member` is considered a sink.
exists(MethodCallNode call, AllTarfileOpens atfo |
call = atfo.getReturn().getMember("_extract_member").getACall() and
call.getArg(1).(AttrRead).accesses(sink, "name")
)
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(AttrRead attr, MethodCallNode call |
attr.accesses(nodeFrom, "getmembers") and
nodeFrom = call.getObject() and
nodeTo = call
)
}
}
from Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink, source, sink, "Extraction of tarfile from $@ to a potentially untrusted source $@.",
source.getNode(), source.getNode().toString(), sink.getNode(), sink.getNode().toString()

View File

@@ -0,0 +1,19 @@
import sys
import tarfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
result = []
for member in tar:
if ".." in member.name:
raise ValueError("Path in member name !!!")
result.append(member)
path = sys.argv[2]
#print("files are extracted to: ", path)
tar.extractall(path=path, members=result)
tar.close()
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,27 @@
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile:
if '../' in member.name:
print('Member name container directory traversal sequence')
continue
elif member.issym() or member.islnk():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,8 @@
import tarfile
import sys
with tarfile.open(sys.argv[1]) as tar:
for entry in tar:
if ".." in entry.name:
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,13 @@
import tarfile
import sys
import os
def _validate_archive_name(name, target):
if not os.path.abspath(os.path.join(target, name)).startswith(target + os.path.sep):
raise ValueError(f"Provided language pack contains invalid name {name}")
with tarfile.open(sys.argv[1]) as tar:
target = "/tmp/unpack"
for entry in tar:
_validate_archive_name(entry.name, target)
tar.extract(entry, target)

View File

@@ -0,0 +1,29 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile.getmembers():
if '../' in member.name:
print('Member name container directory traversal sequence')
continue
elif (member.issym() or member.islnk()) and ('../' in member.linkname):
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,26 @@
# https://github.com/OctoPrint/OctoPrint/
import sys
import tarfile
import os
def _validate_tar_info(info, target):
_validate_archive_name(info.name, target)
if not (info.isfile() or info.isdir()):
raise ValueError("Provided language pack contains invalid file type")
def _validate_archive_name(name, target):
if not os.path.abspath(os.path.join(target, name)).startswith(target + os.path.sep):
raise ValueError(f"Provided language pack contains invalid name {name}")
target = "/tmp/unpack"
with tarfile.open(sys.argv[1], "r") as tar:
# sanity check
for info in tar.getmembers():
_validate_tar_info(info, target)
# unpack everything
tar.extractall(target)

View File

@@ -0,0 +1,29 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile.getmembers():
if '../' in member.name:
print('Member name container directory traversal sequence')
continue
elif member.issym() or member.islnk():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,18 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tarf = tar.getmembers()
for f in tarf:
if not f.issym():
tar.extractall(path=tempfile.mkdtemp(), members=[f])
tar.close()
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,6 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.open(tarball, "r") as tar:
tar.extractall(path="/tmp/unpack/", members=tar)

View File

@@ -0,0 +1,26 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile.getmembers():
if member.issym() or member.islnk():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,5 @@
import tarfile
import sys
with tarfile.open(sys.argv[1], "r") as tar:
tar.extractall(path="/tmp/unpack")

View File

@@ -0,0 +1,26 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile.getmembers():
if member.issym():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,19 @@
# https://github.com/tensorflow/tensor2tensor
import tarfile
import os.path
import sys
# provie the malign archive
filename = sys.argv[1]
tmp_dir = "/tmp/"
read_type = "r:gz" if filename.endswith("tgz") else "r"
with tarfile.open(filename, read_type) as corpus_tar:
# Create a subset of files that don't already exist.
# tarfile.extractall errors when encountering an existing file
# and tarfile.extract is extremely slow
members = []
for f in corpus_tar:
if not os.path.isfile(os.path.join(tmp_dir, f.name)):
members.append(f)
corpus_tar.extractall(tmp_dir, members=members)

View File

@@ -0,0 +1,26 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile.getmembers():
if not member.isreg():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,8 @@
from fileinput import filename
import tarfile
import os.path
import sys
archive_path = sys.argv[1]
target_dir = "/tmp/unpack"
tarfile.open(archive_path, "r").extractall(path=target_dir)

View File

@@ -0,0 +1,8 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.open(tarball) as tar:
for entry in tar:
if entry.isfile():
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,8 @@
import tarfile
import sys
with tarfile.open(sys.argv[1]) as tar:
for entry in tar:
if entry.name.startswith("/"):
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,8 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.TarFile(tarball, mode="r") as tar:
for entry in tar:
if entry.isfile():
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,10 @@
from tarfile import TarFile
import sys
class MKTar(TarFile):
pass
tarball = sys.argv[1]
with MKTar.open(name=tarball) as tar:
for entry in tar:
tar._extract_member(entry, entry.name)

View File

@@ -0,0 +1,9 @@
import tarfile
import sys
import os
with tarfile.open(sys.argv[1]) as tar:
for entry in tar:
if os.path.isabs(entry.name):
raise ValueError("Illegal tar archive entry")
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,5 @@
import tarfile
import sys
with tarfile.TarFile(sys.argv[1], mode="r") as tar:
tar.extractall(path="/tmp/unpack")

View File

@@ -0,0 +1,13 @@
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=tar.getmembers())
tar.close()
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,6 @@
import tarfile
import sys
import tempfile
tar = tarfile.open(sys.argv[1])
tar.extractall(path=tempfile.mkdtemp(), members=None)

View File

@@ -0,0 +1,26 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
tar.extractall(path=tempfile.mkdtemp(), members=members_filter(tar))
tar.close()
def members_filter(tarfile):
result = []
for member in tarfile:
if member.issym() or member.islnk():
print('Symlink to external resource')
continue
result.append(member)
return result
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,5 @@
import tarfile
import sys
with tarfile.TarFile(sys.argv[1], mode="r") as tar:
tar.extractall(path="/tmp/unpack/", members=tar)

View File

@@ -0,0 +1,20 @@
# https://github.com/PyCQA/bandit
import sys
import tarfile
import tempfile
def managed_members_archive_handler(filename):
tar = tarfile.open(filename)
result = []
for member in tar:
if member.issym():
raise ValueError("But it is a symlink")
result.append(member)
tar.extractall(path=tempfile.mkdtemp(), members=result)
tar.close()
if __name__ == "__main__":
if len(sys.argv) > 1:
filename = sys.argv[1]
managed_members_archive_handler(filename)

View File

@@ -0,0 +1,8 @@
from fileinput import filename
import tarfile
import os.path
import sys
archive_path = sys.argv[1]
target_dir = "/tmp/unpack"
tarfile.TarFile(sys.argv[1], mode="r").extractall(path=target_dir)

View File

@@ -0,0 +1,6 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.open(tarball) as tar:
tar.extractall()

View File

@@ -0,0 +1,6 @@
import tarfile
import sys
import tempfile
tar = tarfile.open(sys.argv[1])
tar.extractall(path=tempfile.mkdtemp(), members=None)

View File

@@ -0,0 +1,10 @@
import tarfile
import sys
class MKTar(tarfile.TarFile):
pass
tarball = sys.argv[1]
with MKTar.open(name=tarball) as tar:
for entry in tar:
tar._extract_member(entry, entry.name)

View File

@@ -0,0 +1,36 @@
import tarfile
import sys
from contextlib import closing, contextmanager
import subprocess
import os
@contextmanager
def py2_tarxz(filename):
import tempfile
with tempfile.TemporaryFile() as tmp:
subprocess.check_call(["xz", "-dc", filename], stdout=tmp.fileno())
tmp.seek(0)
with closing(tarfile.TarFile(fileobj=tmp)) as tf:
yield tf
def unpack_tarball(tar_filename, dest):
# print('Unpacking %s into %s' % (os.path.basename(tar_filename), dest))
# if sys.version_info[0] < 3 and tar_filename.endswith('.xz'):
# # Py 2.7 lacks lzma support
# tar_cm = py2_tarxz(tar_filename)
# else:
#
tar_cm = closing(tarfile.open(tar_filename))
base_dir = None
with tar_cm as tarc:
for member in tarc:
base_name = member.name.split('/')[0]
if base_dir is None:
base_dir = base_name
elif base_dir != base_name:
print('Unexpected path in %s: %s' % (tar_filename, base_name))
tarc.extractall(dest)
return os.path.join(dest, base_dir)
unpack_tarball(sys.argv[1], "/tmp/unpack")

View File

@@ -0,0 +1,7 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.open(name=tarball) as tar:
for entry in tar:
tar._extract_member(entry, entry.name)

View File

@@ -0,0 +1,7 @@
import tarfile
import sys
tarball = sys.argv[1]
with tarfile.open(name=tarball) as tar:
for entry in tar:
tar.extract(entry, "/tmp/unpack/")

View File

@@ -0,0 +1,6 @@
import tarfile
import sys
tarball = sys.argv[1]
tar = tarfile.open(tarball)
tar.extractall("/tmp/unpack/")

View File

@@ -0,0 +1,15 @@
#!/bin/bash
mkdir -p /tmp/Sim4n6/
FILE=/tmp/Sim4n6/sim4n6.txt
for f in $(ls TarSlip_*.py); do
echo "$f";
python3 "$f" archive_malign.tar;
if test -f "$FILE"; then
echo -e "\e[32mOK\e[0m"
rm "$FILE";
else
echo -e "\e[31mNot OK\e[0m";
fi
done;