Considering the use of contextlib.closing() method

This commit is contained in:
ALJI Mohamed
2022-12-08 12:26:59 +01:00
parent 2801b8495a
commit 9336f4f1a2
3 changed files with 59 additions and 4 deletions

View File

@@ -34,8 +34,8 @@ class UnsafeUnpackingConfig extends TaintTracking::Configuration {
}
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// Writing the response data to the archive
(
// Writing the response data to the archive
exists(Stdlib::FileLikeObject::InstanceSource is, Node f, MethodCallNode mc |
is.flowsTo(f) and
mc.getMethodName() = "write" and
@@ -48,11 +48,18 @@ class UnsafeUnpackingConfig extends TaintTracking::Configuration {
exists(MethodCallNode mc |
nodeFrom = mc.getObject() and
mc.getMethodName() = "read" and
nodeTo = mc
mc.flowsTo(nodeTo)
)
or
// Accessing the name
exists(AttrRead ar | ar.accesses(nodeFrom, "name") and nodeTo = ar)
or
// Considering closing use
exists(API::Node closing |
closing = API::moduleImport("contextlib").getMember("closing") and
closing.getACall().flowsTo(nodeTo) and
nodeFrom = closing.getACall().getArg(0)
)
)
}
}

View File

@@ -1,10 +1,14 @@
edges
| UnsafeUnpack.py:5:12:5:41 | ControlFlowNode for Attribute() | UnsafeUnpack.py:9:15:9:26 | ControlFlowNode for Attribute |
| UnsafeUnpack.py:9:15:9:26 | ControlFlowNode for Attribute | UnsafeUnpack.py:12:23:12:29 | ControlFlowNode for tarpath |
| UnsafeUnpack.py:36:24:36:43 | ControlFlowNode for Attribute() | UnsafeUnpack.py:55:31:55:37 | ControlFlowNode for to_path |
nodes
| UnsafeUnpack.py:5:12:5:41 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| UnsafeUnpack.py:9:15:9:26 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute |
| UnsafeUnpack.py:12:23:12:29 | ControlFlowNode for tarpath | semmle.label | ControlFlowNode for tarpath |
| UnsafeUnpack.py:36:24:36:43 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| UnsafeUnpack.py:55:31:55:37 | ControlFlowNode for to_path | semmle.label | ControlFlowNode for to_path |
subpaths
#select
| UnsafeUnpack.py:5:12:5:41 | ControlFlowNode for Attribute() | UnsafeUnpack.py:5:12:5:41 | ControlFlowNode for Attribute() | UnsafeUnpack.py:12:23:12:29 | ControlFlowNode for tarpath | Unsafe extraction from a malicious tarball, is used in a $@ | PathNode | during archive unpacking. |
| UnsafeUnpack.py:12:23:12:29 | ControlFlowNode for tarpath | UnsafeUnpack.py:5:12:5:41 | ControlFlowNode for Attribute() | UnsafeUnpack.py:12:23:12:29 | ControlFlowNode for tarpath | Unsafe extraction from a malicious tarball retrieved from a remote location. |
| UnsafeUnpack.py:55:31:55:37 | ControlFlowNode for to_path | UnsafeUnpack.py:36:24:36:43 | ControlFlowNode for Attribute() | UnsafeUnpack.py:55:31:55:37 | ControlFlowNode for to_path | Unsafe extraction from a malicious tarball retrieved from a remote location. |

View File

@@ -9,4 +9,48 @@ with open(tarpath, "wb") as f:
f.write(response.raw.read())
untarredpath = "/tmp/tmp123"
shutil.unpack_archive(tarpath, untarredpath)
shutil.unpack_archive(tarpath, untarredpath)
import tempfile
import os
from urllib import request
import contextlib
import shutil
unpack = True
to_path = "/tmp/tmp123"
uri = "https://www.goog.com/zzz.tar.gz"
scheme = "https"
with tempfile.TemporaryDirectory() as temp_dir:
if unpack and (str(uri).endswith("zip") or str(uri).endswith("tar.gz")):
unpack_path = to_path
to_path = temp_dir
else:
unpack_path = None
if scheme in ["http", "https", "ftp"]:
if os.path.isdir(to_path):
to_path = os.path.join(to_path, os.path.basename(uri))
url = uri
url_response = request.urlopen(url)
with contextlib.closing(url_response) as fp:
with open(to_path, "wb") as out_file:
block_size = DEFAULT_BUFFER_SIZE * 8
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
else:
if scheme == "oci" and not storage_options:
storage_options = default_signer()
fs = fsspec.filesystem(scheme, **storage_options)
if os.path.isdir(to_path):
to_path = os.path.join(
to_path, os.path.basename(str(uri).rstrip("/"))
)
fs.get(uri, to_path, recursive=True)
if unpack_path:
shutil.unpack_archive(to_path, unpack_path)
to_path = unpack_path