mirror of
https://github.com/github/codeql.git
synced 2026-05-03 04:39:29 +02:00
Modified the QL ref file and add TarSlip examples
This commit is contained in:
@@ -1 +1 @@
|
||||
Security/CWE-022bis/TarSlipImprov.ql
|
||||
experimental/Security/CWE-022bis/TarSlipImprov.ql
|
||||
|
||||
@@ -0,0 +1,316 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import sys
|
||||
import tarfile
|
||||
import tempfile
|
||||
import os
|
||||
from tarfile import TarFile
|
||||
from contextlib import closing, contextmanager
|
||||
import subprocess
|
||||
import os.path
|
||||
|
||||
unsafe_filename_tar = sys.argv[2]
|
||||
safe_filename_tar = "safe_path.tar"
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
result = []
|
||||
for member in tar:
|
||||
if ".." in member.name:
|
||||
raise ValueError("Path in member name !!!")
|
||||
result.append(member)
|
||||
path = unsafe_filename_tar
|
||||
tar.extractall(path=path, members=result)
|
||||
tar.close()
|
||||
|
||||
|
||||
def members_filter1(tarfile):
|
||||
result = []
|
||||
for member in tarfile:
|
||||
if '../' in member.name:
|
||||
print('Member name container directory traversal sequence')
|
||||
continue
|
||||
elif member.issym() or member.islnk():
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter1(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
with tarfile.open(unsafe_filename_tar) as tar:
|
||||
for entry in tar:
|
||||
if ".." in entry.name:
|
||||
raise ValueError("Illegal tar archive entry")
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
|
||||
def _validate_archive_name(name, target):
|
||||
if not os.path.abspath(os.path.join(target, name)).startswith(target + os.path.sep):
|
||||
raise ValueError(f"Provided language pack contains invalid name {name}")
|
||||
|
||||
with tarfile.open(unsafe_filename_tar) as tar:
|
||||
target = "/tmp/unpack"
|
||||
for entry in tar:
|
||||
_validate_archive_name(entry.name, target)
|
||||
tar.extract(entry, target)
|
||||
|
||||
|
||||
def members_filter2(tarfile):
|
||||
result = []
|
||||
for member in tarfile.getmembers():
|
||||
if '../' in member.name:
|
||||
print('Member name container directory traversal sequence')
|
||||
continue
|
||||
elif (member.issym() or member.islnk()) and ('../' in member.linkname):
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter2(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
def _validate_tar_info(info, target):
|
||||
_validate_archive_name(info.name, target)
|
||||
if not (info.isfile() or info.isdir()):
|
||||
raise ValueError("Provided language pack contains invalid file type")
|
||||
|
||||
def _validate_archive_name(name, target):
|
||||
if not os.path.abspath(os.path.join(target, name)).startswith(target + os.path.sep):
|
||||
raise ValueError(f"Provided language pack contains invalid name {name}")
|
||||
|
||||
target = "/tmp/unpack"
|
||||
with tarfile.open(unsafe_filename_tar, "r") as tar:
|
||||
for info in tar.getmembers():
|
||||
_validate_tar_info(info, target)
|
||||
tar.extractall(target)
|
||||
|
||||
|
||||
def members_filter3(tarfile):
|
||||
result = []
|
||||
for member in tarfile.getmembers():
|
||||
if '../' in member.name:
|
||||
print('Member name container directory traversal sequence')
|
||||
continue
|
||||
elif member.issym() or member.islnk():
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter3(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tarf = tar.getmembers()
|
||||
for f in tarf:
|
||||
if not f.issym():
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=[f])
|
||||
tar.close()
|
||||
|
||||
|
||||
class MKTar(TarFile):
|
||||
pass
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with MKTar.open(name=tarball) as tar:
|
||||
for entry in tar:
|
||||
tar._extract_member(entry, entry.name)
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.open(tarball) as tar:
|
||||
tar.extractall()
|
||||
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=None)
|
||||
|
||||
|
||||
class MKTar(tarfile.TarFile):
|
||||
pass
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with MKTar.open(name=tarball) as tar:
|
||||
for entry in tar:
|
||||
tar._extract_member(entry, entry.name)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def py2_tarxz(filename):
|
||||
with tempfile.TemporaryFile() as tmp:
|
||||
subprocess.check_call(["xz", "-dc", filename], stdout=tmp.fileno())
|
||||
tmp.seek(0)
|
||||
with closing(tarfile.TarFile(fileobj=tmp)) as tf:
|
||||
yield tf
|
||||
|
||||
def unpack_tarball(tar_filename, dest):
|
||||
if sys.version_info[0] < 3 and tar_filename.endswith('.xz'):
|
||||
# Py 2.7 lacks lzma support
|
||||
tar_cm = py2_tarxz(tar_filename)
|
||||
else:
|
||||
tar_cm = closing(tarfile.open(tar_filename))
|
||||
|
||||
base_dir = None
|
||||
with tar_cm as tarc:
|
||||
for member in tarc:
|
||||
base_name = member.name.split('/')[0]
|
||||
if base_dir is None:
|
||||
base_dir = base_name
|
||||
elif base_dir != base_name:
|
||||
print('Unexpected path in %s: %s' % (tar_filename, base_name))
|
||||
tarc.extractall(dest)
|
||||
return os.path.join(dest, base_dir)
|
||||
|
||||
unpack_tarball(unsafe_filename_tar, "/tmp/unpack")
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.open(name=tarball) as tar:
|
||||
for entry in tar:
|
||||
tar._extract_member(entry, entry.name)
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.open(name=tarball) as tar:
|
||||
for entry in tar:
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
tar = tarfile.open(tarball)
|
||||
tar.extractall("/tmp/unpack/")
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.open(tarball, "r") as tar:
|
||||
tar.extractall(path="/tmp/unpack/", members=tar)
|
||||
|
||||
|
||||
def members_filter4(tarfile):
|
||||
result = []
|
||||
for member in tarfile.getmembers():
|
||||
if member.issym() or member.islnk():
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter4(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
with tarfile.open(unsafe_filename_tar, "r") as tar:
|
||||
tar.extractall(path="/tmp/unpack")
|
||||
|
||||
|
||||
def members_filter5(tarfile):
|
||||
result = []
|
||||
for member in tarfile.getmembers():
|
||||
if member.issym():
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter5(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
filename = unsafe_filename_tar
|
||||
tmp_dir = "/tmp/"
|
||||
|
||||
read_type = "r:gz" if filename.endswith("tgz") else "r"
|
||||
with tarfile.open(filename, read_type) as corpus_tar:
|
||||
members = []
|
||||
for f in corpus_tar:
|
||||
if not os.path.isfile(os.path.join(tmp_dir, f.name)):
|
||||
members.append(f)
|
||||
corpus_tar.extractall(tmp_dir, members=members)
|
||||
|
||||
|
||||
def members_filter6(tarfile):
|
||||
result = []
|
||||
for member in tarfile.getmembers():
|
||||
if not member.isreg():
|
||||
print('Symlink to external resource')
|
||||
continue
|
||||
result.append(member)
|
||||
return result
|
||||
tar = tarfile.open(filename)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter6(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
archive_path = unsafe_filename_tar
|
||||
target_dir = "/tmp/unpack"
|
||||
tarfile.open(archive_path, "r").extractall(path=target_dir)
|
||||
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.open(tarball) as tar:
|
||||
for entry in tar:
|
||||
if entry.isfile():
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
|
||||
with tarfile.open(unsafe_filename_tar) as tar:
|
||||
for entry in tar:
|
||||
if entry.name.startswith("/"):
|
||||
raise ValueError("Illegal tar archive entry")
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
tarball = unsafe_filename_tar
|
||||
with tarfile.TarFile(tarball, mode="r") as tar:
|
||||
for entry in tar:
|
||||
if entry.isfile():
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
with tarfile.open(unsafe_filename_tar) as tar:
|
||||
for entry in tar:
|
||||
if os.path.isabs(entry.name):
|
||||
raise ValueError("Illegal tar archive entry")
|
||||
tar.extract(entry, "/tmp/unpack/")
|
||||
|
||||
|
||||
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar:
|
||||
tar.extractall(path="/tmp/unpack")
|
||||
|
||||
|
||||
tar = tarfile.open(filename)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=tar.getmembers())
|
||||
tar.close()
|
||||
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=None)
|
||||
|
||||
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=members_filter4(tar))
|
||||
tar.close()
|
||||
|
||||
|
||||
with tarfile.TarFile(unsafe_filename_tar, mode="r") as tar:
|
||||
tar.extractall(path="/tmp/unpack/", members=tar)
|
||||
|
||||
|
||||
tar = tarfile.open(unsafe_filename_tar)
|
||||
result = []
|
||||
for member in tar:
|
||||
if member.issym():
|
||||
raise ValueError("But it is a symlink")
|
||||
result.append(member)
|
||||
tar.extractall(path=tempfile.mkdtemp(), members=result)
|
||||
tar.close()
|
||||
|
||||
|
||||
archive_path = unsafe_filename_tar
|
||||
target_dir = "/tmp/unpack"
|
||||
tarfile.TarFile(unsafe_filename_tar, mode="r").extractall(path=target_dir)
|
||||
Reference in New Issue
Block a user