mirror of
https://github.com/github/codeql.git
synced 2026-04-26 01:05:15 +02:00
added inline tests
This commit is contained in:
@@ -18,57 +18,6 @@ import semmle.python.ApiGraphs
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
import semmle.python.dataflow.new.internal.DataFlowPublic
|
||||
import experimental.semmle.python.security.DecompressionBomb
|
||||
import FileAndFormRemoteFlowSource::FileAndFormRemoteFlowSource
|
||||
|
||||
/**
|
||||
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
|
||||
* ```python
|
||||
* with gzip.open(bomb_input, 'rb') as ip:
|
||||
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
|
||||
* content = decoder.read()
|
||||
* print(content)
|
||||
* ```
|
||||
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
|
||||
* it would be nice if it is added as a global AdditionalTaintStep
|
||||
*/
|
||||
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
|
||||
exists(API::CallNode textIOWrapper |
|
||||
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
||||
|
|
||||
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
|
||||
nodeTo = textIOWrapper
|
||||
)
|
||||
}
|
||||
|
||||
module BombsConfig implements DataFlow::ConfigSig {
|
||||
predicate isSource(DataFlow::Node source) {
|
||||
(
|
||||
source instanceof RemoteFlowSource
|
||||
or
|
||||
source instanceof FastAPI
|
||||
) and
|
||||
not source.getLocation().getFile().inStdlib() and
|
||||
not source.getLocation().getFile().getRelativePath().matches("%venv%")
|
||||
}
|
||||
|
||||
predicate isSink(DataFlow::Node sink) {
|
||||
sink instanceof DecompressionBomb::Sink and
|
||||
not sink.getLocation().getFile().inStdlib() and
|
||||
not sink.getLocation().getFile().getRelativePath().matches("%venv%")
|
||||
}
|
||||
|
||||
predicate isAdditionalFlowStep(DataFlow::Node pred, DataFlow::Node succ) {
|
||||
(
|
||||
any(DecompressionBomb::AdditionalTaintStep a).isAdditionalTaintStep(pred, succ) or
|
||||
isAdditionalTaintStepTextIOWrapper(pred, succ)
|
||||
) and
|
||||
not succ.getLocation().getFile().inStdlib() and
|
||||
not succ.getLocation().getFile().getRelativePath().matches("%venv%")
|
||||
}
|
||||
}
|
||||
|
||||
module BombsFlow = TaintTracking::Global<BombsConfig>;
|
||||
|
||||
import BombsFlow::PathGraph
|
||||
|
||||
from BombsFlow::PathNode source, BombsFlow::PathNode sink
|
||||
|
||||
@@ -4,6 +4,7 @@ import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.ApiGraphs
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
import semmle.python.dataflow.new.internal.DataFlowPublic
|
||||
import FileAndFormRemoteFlowSource::FileAndFormRemoteFlowSource
|
||||
|
||||
module DecompressionBomb {
|
||||
/**
|
||||
@@ -358,3 +359,42 @@ module Lzma {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
|
||||
* ```python
|
||||
* with gzip.open(bomb_input, 'rb') as ip:
|
||||
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
|
||||
* content = decoder.read()
|
||||
* print(content)
|
||||
* ```
|
||||
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
|
||||
* it would be nice if it is added as a global AdditionalTaintStep
|
||||
*/
|
||||
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
|
||||
exists(API::CallNode textIOWrapper |
|
||||
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
||||
|
|
||||
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
|
||||
nodeTo = textIOWrapper
|
||||
)
|
||||
}
|
||||
|
||||
module BombsConfig implements DataFlow::ConfigSig {
|
||||
predicate isSource(DataFlow::Node source) {
|
||||
source instanceof RemoteFlowSource
|
||||
or
|
||||
source instanceof FastAPI
|
||||
}
|
||||
|
||||
predicate isSink(DataFlow::Node sink) { sink instanceof DecompressionBomb::Sink }
|
||||
|
||||
predicate isAdditionalFlowStep(DataFlow::Node pred, DataFlow::Node succ) {
|
||||
(
|
||||
any(DecompressionBomb::AdditionalTaintStep a).isAdditionalTaintStep(pred, succ) or
|
||||
isAdditionalTaintStepTextIOWrapper(pred, succ)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
module BombsFlow = TaintTracking::Global<BombsConfig>;
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
import python
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.ApiGraphs
|
||||
|
||||
/**
|
||||
* Provides user-controllable Remote sources for file(s) upload and Multipart-Form
|
||||
*/
|
||||
module FileAndFormRemoteFlowSource {
|
||||
/**
|
||||
* A
|
||||
*/
|
||||
class FastAPI extends DataFlow::Node {
|
||||
FastAPI() {
|
||||
exists(API::Node fastApiParam, Expr fastApiUploadFile |
|
||||
fastApiParam =
|
||||
API::moduleImport("fastapi")
|
||||
.getMember("FastAPI")
|
||||
.getReturn()
|
||||
.getMember("post")
|
||||
.getReturn()
|
||||
.getParameter(0)
|
||||
.getKeywordParameter(_) and
|
||||
fastApiUploadFile =
|
||||
API::moduleImport("fastapi")
|
||||
.getMember("UploadFile")
|
||||
.getASubclass*()
|
||||
.getAValueReachableFromSource()
|
||||
.asExpr()
|
||||
|
|
||||
// Multiple uploaded files as list of fastapi.UploadFile
|
||||
// @app.post("/")
|
||||
// def upload(files: List[UploadFile] = File(...)):
|
||||
// for file in files:
|
||||
fastApiUploadFile =
|
||||
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*() and
|
||||
exists(For f, Attribute attr |
|
||||
fastApiParam.getAValueReachableFromSource().asExpr() = f.getIter().getASubExpression*()
|
||||
|
|
||||
TaintTracking::localExprTaint(f.getIter(), attr.getObject()) and
|
||||
attr.getName() = ["filename", "content_type", "headers", "file", "read"] and
|
||||
this.asExpr() = attr
|
||||
)
|
||||
or
|
||||
// One uploaded file as fastapi.UploadFile
|
||||
// @app.post("/zipbomb2")
|
||||
// async def zipbomb2(file: UploadFile):
|
||||
// print(file.filename)
|
||||
this =
|
||||
[
|
||||
fastApiParam.getMember(["filename", "content_type", "headers"]).asSource(),
|
||||
fastApiParam
|
||||
.getMember("file")
|
||||
.getMember(["readlines", "readline", "read"])
|
||||
.getReturn()
|
||||
.asSource(), fastApiParam.getMember("read").getReturn().asSource()
|
||||
]
|
||||
)
|
||||
}
|
||||
|
||||
string getSourceType() { result = "fastapi HTTP FORM files" }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
import python
|
||||
import experimental.dataflow.TestUtil.DataflowQueryTest
|
||||
import experimental.semmle.python.security.DecompressionBomb
|
||||
import FromTaintTrackingConfig<BombsConfig>
|
||||
@@ -7,70 +7,75 @@ app = FastAPI()
|
||||
|
||||
@app.post("/bomb")
|
||||
async def bomb(file_path):
|
||||
zipfile.ZipFile(file_path, "r").extract("file1")
|
||||
zipfile.ZipFile(file_path, "r").extractall()
|
||||
zipfile.ZipFile(file_path, "r").extract("file1") # $ result=BAD
|
||||
zipfile.ZipFile(file_path, "r").extractall() # $ result=BAD
|
||||
|
||||
with zipfile.ZipFile(file_path) as myzip:
|
||||
with myzip.open('ZZ') as myfile:
|
||||
with myzip.open('ZZ') as myfile: # $ result=BAD
|
||||
a = myfile.readline()
|
||||
|
||||
with zipfile.ZipFile(file_path) as myzip:
|
||||
with myzip.open('ZZ', mode="w") as myfile:
|
||||
with myzip.open('ZZ', mode="w") as myfile: # $result=OK
|
||||
myfile.write(b"tmpppp")
|
||||
|
||||
zipfile.ZipFile(file_path).read("aFileNameInTheZipFile")
|
||||
zipfile.ZipFile(file_path).read("aFileNameInTheZipFile") # $ result=BAD
|
||||
|
||||
tarfile.open(file_path).extractfile("file1.txt")
|
||||
tarfile.TarFile.open(file_path).extract("somefile")
|
||||
tarfile.TarFile.xzopen(file_path).extract("somefile")
|
||||
tarfile.TarFile.gzopen(file_path).extractall()
|
||||
tarfile.TarFile.open(file_path).extractfile("file1.txt")
|
||||
tarfile.open(file_path).extractfile("file1.txt") # $ result=BAD
|
||||
tarfile.TarFile.open(file_path).extract("somefile") # $ result=BAD
|
||||
tarfile.TarFile.xzopen(file_path).extract("somefile") # $ result=BAD
|
||||
tarfile.TarFile.gzopen(file_path).extractall() # $ result=BAD
|
||||
tarfile.TarFile.open(file_path).extractfile("file1.txt") # $ result=BAD
|
||||
|
||||
tarfile.open(file_path, mode="w")
|
||||
tarfile.TarFile.gzopen(file_path, mode="w")
|
||||
tarfile.TarFile.open(file_path, mode="r:")
|
||||
tarfile.open(file_path, mode="w") # $result=OK
|
||||
tarfile.TarFile.gzopen(file_path, mode="w") # $result=OK
|
||||
tarfile.TarFile.open(file_path, mode="r:") # $ result=BAD
|
||||
import shutil
|
||||
|
||||
shutil.unpack_archive(file_path)
|
||||
shutil.unpack_archive(file_path) # $ result=BAD
|
||||
|
||||
import lzma
|
||||
|
||||
lzma.open(file_path)
|
||||
lzma.LZMAFile(file_path).read()
|
||||
lzma.open(file_path) # $ result=BAD
|
||||
lzma.LZMAFile(file_path).read() # $ result=BAD
|
||||
|
||||
import bz2
|
||||
|
||||
bz2.open(file_path)
|
||||
bz2.BZ2File(file_path).read()
|
||||
bz2.open(file_path) # $ result=BAD
|
||||
bz2.BZ2File(file_path).read() # $ result=BAD
|
||||
|
||||
import gzip
|
||||
|
||||
gzip.open(file_path)
|
||||
gzip.GzipFile(file_path)
|
||||
gzip.open(file_path) # $ result=BAD
|
||||
gzip.GzipFile(file_path) # $ result=BAD
|
||||
|
||||
import pandas
|
||||
|
||||
pandas.read_csv(filepath_or_buffer=file_path)
|
||||
pandas.read_csv(filepath_or_buffer=file_path) # $ result=BAD
|
||||
|
||||
pandas.read_table(file_path, compression='gzip')
|
||||
pandas.read_xml(file_path, compression='gzip')
|
||||
pandas.read_table(file_path, compression='gzip') # $ result=BAD
|
||||
pandas.read_xml(file_path, compression='gzip') # $ result=BAD
|
||||
|
||||
pandas.read_csv(filepath_or_buffer=file_path, compression='gzip')
|
||||
pandas.read_json(file_path, compression='gzip')
|
||||
pandas.read_sas(file_path, compression='gzip')
|
||||
pandas.read_stata(filepath_or_buffer=file_path, compression='gzip')
|
||||
pandas.read_table(file_path, compression='gzip')
|
||||
pandas.read_xml(path_or_buffer=file_path, compression='gzip')
|
||||
pandas.read_csv(filepath_or_buffer=file_path,
|
||||
compression='gzip') # $ result=BAD
|
||||
pandas.read_json(file_path, compression='gzip') # $ result=BAD
|
||||
pandas.read_sas(file_path, compression='gzip') # $ result=BAD
|
||||
pandas.read_stata(filepath_or_buffer=file_path,
|
||||
compression='gzip') # $ result=BAD
|
||||
pandas.read_table(file_path, compression='gzip') # $ result=BAD
|
||||
pandas.read_xml(path_or_buffer=file_path,
|
||||
compression='gzip') # $ result=BAD
|
||||
|
||||
# no compression no DOS
|
||||
pandas.read_table(file_path, compression='tar')
|
||||
pandas.read_xml(file_path, compression='tar')
|
||||
pandas.read_table(file_path, compression='tar') # $result=OK
|
||||
pandas.read_xml(file_path, compression='tar') # $result=OK
|
||||
|
||||
pandas.read_csv(filepath_or_buffer=file_path, compression='tar')
|
||||
pandas.read_json(file_path, compression='tar')
|
||||
pandas.read_sas(file_path, compression='tar')
|
||||
pandas.read_stata(filepath_or_buffer=file_path, compression='tar')
|
||||
pandas.read_table(file_path, compression='tar')
|
||||
pandas.read_xml(path_or_buffer=file_path, compression='tar')
|
||||
pandas.read_csv(filepath_or_buffer=file_path,
|
||||
compression='tar') # $result=OK
|
||||
pandas.read_json(file_path, compression='tar') # $result=OK
|
||||
pandas.read_sas(file_path, compression='tar') # $result=OK
|
||||
pandas.read_stata(filepath_or_buffer=file_path,
|
||||
compression='tar') # $result=OK
|
||||
pandas.read_table(file_path, compression='tar') # $result=OK
|
||||
pandas.read_xml(path_or_buffer=file_path, compression='tar') # $result=OK
|
||||
|
||||
return {"message": "bomb"}
|
||||
|
||||
Reference in New Issue
Block a user