modularize

This commit is contained in:
amammad
2023-10-09 23:07:52 +02:00
parent 3175db226e
commit 1318afdb27
3 changed files with 461 additions and 394 deletions

View File

@@ -0,0 +1,430 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.internal.DataFlowPublic
module DecompressionBomb {
/**
* The Sinks of uncontrolled data decompression, use this class in your queries
*/
class Sink extends DataFlow::Node {
Sink() { this = any(Range r).sink() }
}
/**
* The additional taint steps that need for creating taint tracking or dataflow.
*/
abstract class AdditionalTaintStep extends string {
AdditionalTaintStep() { this = "AdditionalTaintStep" }
/**
* Holds if there is a additional taint step between pred and succ.
*/
abstract predicate isAdditionalTaintStep(DataFlow::Node pred, DataFlow::Node succ);
}
/**
* A abstract class responsible for extending new decompression sinks
*/
abstract class Range extends API::Node {
/**
* Gets the sink of responsible for decompression node
*
* it can be a path, stream of compressed data,
* or a call to function that use pipe
*/
abstract DataFlow::Node sink();
}
}
module ZipFile {
/**
* A `zipfile` Instance
*
* ```python
* zipfile.ZipFile()
* ```
*/
API::Node zipFileClass() {
result =
[
API::moduleImport("zipfile").getMember("ZipFile"),
API::moduleImport("zipfile").getMember("PyZipFile")
]
}
/**
* The Decompression Sinks of `zipfile` library
*
* ```python
* myzip = zipfile.ZipFile("zipfileName.zip")
* myzip.open('eggs.txt',"r").read()
* myzip.extractall()
* ```
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = zipFileClass() }
/**
* An function call of tarfile for extracting compressed data
*
* `tarfile.open(filepath).extractall()` or `tarfile.open(filepath).extract()`or `tarfile.open(filepath).extractfile()`
* or `tarfile.Tarfile.xzopen()` or `tarfile.Tarfile.gzopen()` or `tarfile.Tarfile.bz2open()`
*/
override DataFlow::Node sink() {
(
result = this.getReturn().getMember(["extractall", "read", "extract", "testzip"]).getACall()
or
exists(API::Node openInstance |
openInstance = this.getReturn().getMember("open") and
(
not exists(
openInstance
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
openInstance
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "r"
) and
(
not exists(
this.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
this.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "r"
) and
not zipFileDecompressionBombSanitizer(this) and
result = openInstance.getACall()
)
)
}
}
/**
* Gets a `zipfile.ZipFile` and checks if there is a size controlled read or not
* ```python
* with zipfile.ZipFile(zipFileName) as myzip:
* with myzip.open(fileinfo.filename, mode="r") as myfile:
* while chunk:
* chunk = myfile.read(buffer_size)
* total_size += buffer_size
* if total_size > SIZE_THRESHOLD:
* ...
* ```
*/
predicate zipFileDecompressionBombSanitizer(API::Node n) {
TaintTracking::localExprTaint(n.getReturn().getMember("read").getParameter(0).asSink().asExpr(),
any(Compare i).getASubExpression*())
}
/**
* The Additional taint steps that are necessary for data flow query
*
* ```python
* nodeFrom = "zipFileName.zip"
* myZip = zipfile.ZipFile(nodeFrom)
* nodeTo = myZip.open('eggs.txt',"r")
* nodeTo = myZip.extractall()
* nodeTo = myZip.read()
* nodeTo = myZip.extract()
* # testzip not a RAM consumer but it uses as much CPU as possible
* nodeTo = myZip.testzip()
* ```
*/
class DecompressionAdditionalTaintStep extends DecompressionBomb::AdditionalTaintStep {
DecompressionAdditionalTaintStep() { this = "AdditionalTaintStep" }
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(DecompressionSink zipFileInstance |
nodeFrom =
[zipFileInstance.getACall().getParameter(0, "file").asSink(), zipFileInstance.getACall()] and
nodeTo =
[
zipFileInstance.sink(),
zipFileInstance
.getACall()
.getReturn()
.getMember(["extractall", "read", "extract", "testzip"])
.getACall()
]
)
}
}
}
/**
* Provides Sinks and additional taint steps related to `tarfile` library
*/
module TarFile {
/**
* The Decompression Sinks of `tarfile` library
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = tarfileExtractMember() }
/**
* An function call of tarfile for extracting compressed data
* `tarfile.open(filepath).extractall()` or `tarfile.open(filepath).extract()`or `tarfile.open(filepath).extractfile()`
* or `tarfile.Tarfile.xzopen()` or `tarfile.Tarfile.gzopen()` or `tarfile.Tarfile.bz2open()`
*/
override DataFlow::Node sink() {
result = this.getReturn().getMember(["extractall", "extract", "extractfile"]).getACall()
}
}
/**
* A tarfile instance for extracting compressed data
*/
API::Node tarfileExtractMember() {
result =
[
API::moduleImport("tarfile").getMember("open"),
API::moduleImport("tarfile")
.getMember("TarFile")
.getMember(["xzopen", "gzopen", "bz2open", "open"])
] and
(
not exists(
result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
not result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("r:%")
)
}
/**
* The Additional taint steps that are necessary for data flow query
*/
class DecompressionAdditionalTaintStep extends DecompressionBomb::AdditionalTaintStep {
DecompressionAdditionalTaintStep() { this = "AdditionalTaintStep" }
override predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::Node tarfileInstance | tarfileInstance = tarfileExtractMember() |
nodeFrom = tarfileInstance.getACall().getParameter(0, "name").asSink() and
nodeTo =
tarfileInstance.getReturn().getMember(["extractall", "extract", "extractfile"]).getACall()
)
}
}
}
/**
* Provides Sinks and additional taint steps related to `pandas` library
*/
module Pandas {
/**
* The Decompression Sinks of `pandas` library
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = API::moduleImport("pandas") }
override DataFlow::Node sink() {
exists(API::CallNode calltoPandasMethods |
(
calltoPandasMethods =
this.getMember([
"read_csv", "read_json", "read_sas", "read_stata", "read_table", "read_xml"
]).getACall() and
result = calltoPandasMethods.getArg(0)
or
calltoPandasMethods =
this.getMember(["read_csv", "read_sas", "read_stata", "read_table"]).getACall() and
result = calltoPandasMethods.getArgByName("filepath_or_buffer")
or
calltoPandasMethods = this.getMember("read_json").getACall() and
result = calltoPandasMethods.getArgByName("path_or_buf")
or
calltoPandasMethods = this.getMember("read_xml").getACall() and
result = calltoPandasMethods.getArgByName("path_or_buffer")
) and
(
not exists(calltoPandasMethods.getArgByName("compression"))
or
not calltoPandasMethods
.getKeywordParameter("compression")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "tar"
)
)
}
}
}
/**
* Provides Sinks and additional taint steps related to `shutil` library
*/
module Shutil {
/**
* The Decompression Sinks of `shutil` library
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = API::moduleImport("shutil").getMember("unpack_archive") }
override DataFlow::Node sink() { result = this.getACall().getParameter(0, "filename").asSink() }
}
}
/**
* Provides Sinks and additional taint steps related to `gzip` library
*/
module Gzip {
private API::Node gzipInstance() {
result = API::moduleImport("gzip").getMember(["GzipFile", "open"])
}
/**
* The Decompression Sinks of `gzip` library
*
* `gzip.open(sink)`
* `gzip.GzipFile(sink)`
*
* only read mode is sink
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = gzipInstance() }
override DataFlow::Node sink() {
exists(API::CallNode gzipCall | gzipCall = this.getACall() |
result = gzipCall.getParameter(0, "filename").asSink() and
(
not exists(
gzipCall.getParameter(1, "mode").getAValueReachingSink().asExpr().(StrConst).getText()
) or
gzipCall
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
}
/**
* Provides Sinks and additional taint steps related to `bz2` library
*/
module Bz2 {
private API::Node bz2Instance() {
result = API::moduleImport("bz2").getMember(["BZ2File", "open"])
}
/**
* The Decompression Sinks of `bz2` library
*
* `bz2.open(sink)`
* `bz2.BZ2File(sink)`
*
* only read mode is sink
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = bz2Instance() }
override DataFlow::Node sink() {
exists(API::CallNode bz2Call | bz2Call = this.getACall() |
result = bz2Call.getParameter(0, "filename").asSink() and
(
not exists(
bz2Call.getParameter(1, "mode").getAValueReachingSink().asExpr().(StrConst).getText()
) or
bz2Call
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
}
/**
* Provides Sinks and additional taint steps related to `lzma` library
*/
module Lzma {
private API::Node lzmaInstance() {
result = API::moduleImport("lzma").getMember(["LZMAFile", "open"])
}
/**
* The Decompression Sinks of `bz2` library
*
* `lzma.open(sink)`
* `lzma.LZMAFile(sink)`
*
* only read mode is sink
*/
class DecompressionSink extends DecompressionBomb::Range {
override string toString() { result = "DecompressionSink" }
DecompressionSink() { this = lzmaInstance() }
override DataFlow::Node sink() {
exists(API::CallNode lzmaCall | lzmaCall = this.getACall() |
result = lzmaCall.getParameter(0, "filename").asSink() and
(
not exists(
lzmaCall.getParameter(1, "mode").getAValueReachingSink().asExpr().(StrConst).getText()
) or
lzmaCall
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
}

View File

@@ -17,362 +17,27 @@ import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.internal.DataFlowPublic
import DecompressionBomb
// /**
// * Same as ZipFile
// * I can made PyZipFile separated from ZipFile
// * as in future this will be more compatible if it has more differences from ZipFile
// * and we can add new changes easier.
// */
// module PyZipFile { }
module Lzma {
private API::Node lzmaClass() {
result = API::moduleImport("lzma").getMember(["LZMAFile", "open"])
}
/**
* `lzma.open(sink)`
* `lzma.LZMAFile(sink)`
* only read mode is sink
*/
DataFlow::Node isSink() {
exists(API::Node lzmaClass | lzmaClass = lzmaClass() |
result = lzmaClass.getACall().getParameter(0, "filename").asSink() and
(
not exists(
lzmaClass
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
lzmaClass
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
module Bz2 {
private API::Node bz2Class() { result = API::moduleImport("bz2").getMember(["BZ2File", "open"]) }
/**
* `bz2.open(sink)`
* `bz2.BZ2File(sink)`
* only read mode is sink
*/
DataFlow::Node isSink() {
exists(API::Node bz2Class | bz2Class = bz2Class() |
result = bz2Class.getACall().getParameter(0, "filename").asSink() and
(
not exists(
bz2Class
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
bz2Class
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
module Gzip {
private API::Node gzipClass() {
result = API::moduleImport("gzip").getMember(["GzipFile", "open"])
}
/**
* `gzip.open(sink)`
* `gzip.GzipFile(sink)`
* only read mode is sink
*/
DataFlow::Node isSink() {
exists(API::Node gzipClass | gzipClass = gzipClass() |
result = gzipClass.getACall().getParameter(0, "filename").asSink() and
(
not exists(
gzipClass
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
gzipClass
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("%r%")
)
)
}
}
module ZipFile {
/**
* ```python
* zipfile.ZipFile()
* ```
*/
API::Node zipFileClass() {
result =
[
API::moduleImport("zipfile").getMember("ZipFile"),
API::moduleImport("zipfile").getMember("PyZipFile")
]
}
/**
* ```python
* zipfile.ZipFile("zipfileName.zip")
* # read() or one of ["read", "readline", "readlines", "seek", "tell", "__iter__", "__next__"]
* myzip.open('eggs.txt',"r").read()
* # I decided to choice open method with "r" mode as sink
* # because opening zipfile with "r" mode mostly is for reading content of that file
* # so we have a very few of FP here
* next(myzip.open('eggs.txt'))
* myzip.extractall()
* myzip.read()
* myzip.extract()
* # testzip not a RAM consumer but it uses as much CPU as possible
* myzip.testzip()
*
* ```
*/
private API::Node sink(API::Node zipFileInstance) {
// we can go forward one more step and check whether we call the required methods for read
// or just opening zipfile for reading is enough ( mode = "r")
// result =
// zipfileReturnIOFile()
// .getReturn()
// .getMember(["read", "readline", "readlines", "seek", "tell", "__iter__", "__next__"])
// or
(
result = zipFileInstance.getReturn().getMember(["extractall", "read", "extract", "testzip"])
or
result = zipFileInstance.getReturn().getMember("open") and
(
not exists(
result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "r"
) and
(
not exists(
zipFileInstance
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
zipFileInstance
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "r"
) and
zipFileSanitizer(result)
) and
exists(result.getACall().getLocation().getFile().getRelativePath())
}
/**
* a sanitizers which check if there is a managed read
* ```python
* with zipfile.ZipFile(zipFileName) as myzip:
* with myzip.open(fileinfo.filename, mode="r") as myfile:
* while chunk:
* chunk = myfile.read(buffer_size)
* total_size += buffer_size
* if total_size > SIZE_THRESHOLD:
* ...
* ```
*/
predicate zipFileSanitizer(API::Node n) {
not TaintTracking::localExprTaint(n.getReturn()
.getMember("read")
.getParameter(0)
.asSink()
.asExpr(), any(Compare i).getASubExpression*())
}
DataFlow::Node isSink() { result = sink(zipFileClass()).getACall() }
/**
* ```python
* nodeFrom = "zipFileName.zip"
* myZip = zipfile.ZipFile(nodeFrom)
* nodeTo2 = myZip.open('eggs.txt',"r")
*
* nodeTo = myZip.extractall()
* nodeTo = myZip.read()
* nodeTo = myZip.extract()
* # testzip not a RAM consumer but it uses as much CPU as possible
* nodeTo = myZip.testzip()
*
* ```
*/
predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::Node zipFileInstance | zipFileInstance = zipFileClass() |
nodeFrom =
[zipFileInstance.getACall().getParameter(0, "file").asSink(), zipFileInstance.getACall()] and
nodeTo =
[
sink(zipFileInstance).getACall(),
zipFileInstance
.getACall()
.getReturn()
.getMember(["extractall", "read", "extract", "testzip"])
.getACall()
]
) and
exists(nodeTo.getLocation().getFile().getRelativePath())
}
}
module TarFile {
/**
* tarfile.open
*
* tarfile.Tarfile.open/xzopen/gzopen/bz2open
* and not mode="r:" which means no compression accepted
*/
API::Node tarfileInstance() {
result =
[
API::moduleImport("tarfile").getMember("open"),
API::moduleImport("tarfile")
.getMember("TarFile")
.getMember(["xzopen", "gzopen", "bz2open", "open"])
] and
(
not exists(
result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
) or
not result
.getACall()
.getParameter(1, "mode")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText()
.matches("r:%")
)
}
/**
* a Call of
* `tarfile.open(filepath).extractall()/extract()/extractfile()`
* or
* `tarfile.Tarfile.xzopen()/gzopen()/bz2open()`
*/
DataFlow::Node isSink() {
result =
tarfileInstance().getReturn().getMember(["extractall", "extract", "extractfile"]).getACall()
}
predicate isAdditionalTaintStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::Node tarfileInstance | tarfileInstance = tarfileInstance() |
nodeFrom =
[tarfileInstance.getACall().getParameter(0, "name").asSink(), tarfileInstance.getACall()] and
nodeTo =
tarfileInstance.getReturn().getMember(["extractall", "extract", "extractfile"]).getACall()
)
}
}
module Shutil {
DataFlow::Node isSink() {
result =
API::moduleImport("shutil")
.getMember("unpack_archive")
.getACall()
.getParameter(0, "filename")
.asSink()
}
}
module Pandas {
DataFlow::Node isSink() {
exists(API::CallNode calltoPandasMethods |
(
calltoPandasMethods =
API::moduleImport("pandas")
.getMember([
"read_csv", "read_json", "read_sas", "read_stata", "read_table", "read_xml"
])
.getACall() and
result = calltoPandasMethods.getArg(0)
or
calltoPandasMethods =
API::moduleImport("pandas")
.getMember(["read_csv", "read_sas", "read_stata", "read_table"])
.getACall() and
result = calltoPandasMethods.getArgByName("filepath_or_buffer")
or
calltoPandasMethods = API::moduleImport("pandas").getMember("read_json").getACall() and
result = calltoPandasMethods.getArgByName("path_or_buf")
or
calltoPandasMethods = API::moduleImport("pandas").getMember("read_xml").getACall() and
result = calltoPandasMethods.getArgByName("path_or_buffer")
) and
(
not exists(calltoPandasMethods.getArgByName("compression"))
or
not calltoPandasMethods
.getKeywordParameter("compression")
.getAValueReachingSink()
.asExpr()
.(StrConst)
.getText() = "tar"
)
)
}
/**
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
* ```python
* with gzip.open(bomb_input, 'rb') as ip:
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
* content = decoder.read()
* print(content)
* ```
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
* it would be nice if it is added as a global AdditionalTaintStep
*/
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::CallNode textIOWrapper |
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
nodeTo = textIOWrapper
) and
exists(nodeTo.getLocation().getFile().getRelativePath())
}
module FileAndFormRemoteFlowSource {
@@ -429,58 +94,30 @@ module BombsConfig implements DataFlow::ConfigSig {
// or
// source instanceof FileAndFormRemoteFlowSource::FastAPI
exists(source.getLocation().getFile().getRelativePath()) and
not source.getLocation().getFile().getRelativePath().matches("venv")
not source.getLocation().getFile().getRelativePath().matches("%venv%")
}
predicate isSink(DataFlow::Node sink) {
(
sink =
[
ZipFile::isSink(), Gzip::isSink(), Lzma::isSink(), Bz2::isSink(), TarFile::isSink(),
Shutil::isSink(), Pandas::isSink()
] or
any()
) and
sink instanceof DecompressionBomb::Sink and
exists(sink.getLocation().getFile().getRelativePath()) and
not sink.getLocation().getFile().getRelativePath().matches("venv")
not sink.getLocation().getFile().getRelativePath().matches("%venv%")
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
predicate isAdditionalFlowStep(DataFlow::Node pred, DataFlow::Node succ) {
(
isAdditionalTaintStepTextIOWrapper(nodeFrom, nodeTo) or
ZipFile::isAdditionalTaintStep(nodeFrom, nodeTo) or
TarFile::isAdditionalTaintStep(nodeFrom, nodeTo)
any(DecompressionBomb::AdditionalTaintStep a).isAdditionalTaintStep(pred, succ) or
isAdditionalTaintStepTextIOWrapper(pred, succ)
) and
exists(nodeTo.getLocation().getFile().getRelativePath()) and
not nodeTo.getLocation().getFile().getRelativePath().matches("venv")
not succ.getLocation().getFile().inStdlib() and
not succ.getLocation().getFile().getRelativePath().matches("%venv%")
}
}
/**
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
* ```python
* with gzip.open(bomb_input, 'rb') as ip:
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
* content = decoder.read()
* print(content)
* ```
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
* it would be nice if it is added as a global AdditionalTaintStep
*/
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::CallNode textIOWrapper |
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
nodeTo = textIOWrapper
) and
exists(nodeTo.getLocation().getFile().getRelativePath())
}
module Bombs = TaintTracking::Global<BombsConfig>;
import Bombs::PathGraph
from Bombs::PathNode source, Bombs::PathNode sink
where Bombs::flowPath(source, sink)
select sink.getNode(), source, sink, "This file extraction is $@.", source.getNode(), "uncontrolled"
select sink.getNode(), source, sink, "This uncontrolled file extraction is $@.", source.getNode(),
"depends on this user controlled data"

View File

@@ -2,4 +2,4 @@ import zipfile
def Bad(zip_path):
zipfile.ZipFile(zip_path, "r").extract("filename", "./tmp/")
zipfile.ZipFile(zip_path, "r").extractall()