upgrade fastAPI remote sources

This commit is contained in:
amammad
2023-10-09 20:51:19 +02:00
parent 6ee5865789
commit 3175db226e

View File

@@ -376,9 +376,9 @@ module Pandas {
}
module FileAndFormRemoteFlowSource {
class FastAPI extends DataFlow::Node {
class FastAPI extends RemoteFlowSource::Range {
FastAPI() {
exists(API::Node fastApiParam |
exists(API::Node fastApiParam, Expr fastApiUploadFile |
fastApiParam =
API::moduleImport("fastapi")
.getMember("FastAPI")
@@ -387,75 +387,72 @@ module FileAndFormRemoteFlowSource {
.getReturn()
.getParameter(0)
.getKeywordParameter(_) and
API::moduleImport("fastapi")
.getMember("UploadFile")
.getASubclass*()
.getAValueReachableFromSource()
.asExpr() =
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*()
fastApiUploadFile =
API::moduleImport("fastapi")
.getMember("UploadFile")
.getASubclass*()
.getAValueReachableFromSource()
.asExpr()
|
// in the case of List of files
fastApiUploadFile =
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*() and
// Multiple Uploaded files as list of fastapi.UploadFile
exists(For f, Attribute attr, DataFlow::Node a, DataFlow::Node b |
fastApiParam.getAValueReachableFromSource().asExpr() = f.getIter().getASubExpression*()
|
// file.file in following
// def upload(files: List[UploadFile] = File(...)):
// for file in files:
// **file.file**
// thanks Arthur Baars for helping me in following
TaintTracking::localTaint(a, b) and
a.asExpr() = f.getIter() and
b.asExpr() = attr.getObject() and
TaintTracking::localExprTaint(f.getIter(), attr.getObject()) and
attr.getName() = ["filename", "content_type", "headers", "file", "read"] and
this.asExpr() = attr
)
or
// one Uploaded file as fastapi.UploadFile
this =
[
fastApiParam.asSource(),
fastApiParam.getMember(["filename", "content_type", "headers", "file"]).asSource(),
fastApiParam.getMember("read").getReturn().asSource(),
// file-like object, I'm trying to not do additional work here by using already existing file-like objs if it is possible
// fastApiParam.getMember("file").getAMember().asSource(),
fastApiParam.getMember(["filename", "content_type", "headers"]).asSource(),
fastApiParam
.getMember("file")
.getMember(["readlines", "readline", "read"])
.getReturn()
.asSource(), fastApiParam.getMember("read").getReturn().asSource()
]
)
or
exists(API::Node fastApiParam |
fastApiParam =
API::moduleImport("fastapi")
.getMember("FastAPI")
.getReturn()
.getMember("post")
.getReturn()
.getParameter(0)
.getKeywordParameter(_) and
API::moduleImport("fastapi")
.getMember("File")
.getASubclass*()
.getAValueReachableFromSource()
.asExpr() =
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*()
|
// in the case of List of files
exists(For f, Attribute attr, DataFlow::Node a, DataFlow::Node b |
fastApiParam.getAValueReachableFromSource().asExpr() = f.getIter().getASubExpression*()
|
// file.file in following
// def upload(files: List[UploadFile] = File(...)):
// for file in files:
// **file.file**
// thanks Arthur Baars for helping me in following
TaintTracking::localTaint(a, b) and
a.asExpr() = f.getIter() and
b.asExpr() = attr.getObject() and
attr.getName() = "file" and
this.asExpr() = attr
)
or
this = fastApiParam.asSource()
) and
exists(this.getLocation().getFile().getRelativePath())
}
override string getSourceType() { result = "HTTP FORM" }
}
}
module BombsConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource and
// or
// source instanceof FileAndFormRemoteFlowSource::FastAPI
exists(source.getLocation().getFile().getRelativePath()) and
not source.getLocation().getFile().getRelativePath().matches("venv")
}
predicate isSink(DataFlow::Node sink) {
(
sink =
[
ZipFile::isSink(), Gzip::isSink(), Lzma::isSink(), Bz2::isSink(), TarFile::isSink(),
Shutil::isSink(), Pandas::isSink()
] or
any()
) and
exists(sink.getLocation().getFile().getRelativePath()) and
not sink.getLocation().getFile().getRelativePath().matches("venv")
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
(
isAdditionalTaintStepTextIOWrapper(nodeFrom, nodeTo) or
ZipFile::isAdditionalTaintStep(nodeFrom, nodeTo) or
TarFile::isAdditionalTaintStep(nodeFrom, nodeTo)
) and
exists(nodeTo.getLocation().getFile().getRelativePath()) and
not nodeTo.getLocation().getFile().getRelativePath().matches("venv")
}
}
@@ -480,50 +477,6 @@ predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::
exists(nodeTo.getLocation().getFile().getRelativePath())
}
module BombsConfig implements DataFlow::ConfigSig {
// borrowed from UnsafeUnpackQuery.qll
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
or
exists(MethodCallNode args |
args = source.(AttrRead).getObject().getALocalSource() and
args =
[
API::moduleImport("argparse")
.getMember("ArgumentParser")
.getReturn()
.getMember("parse_args")
.getACall(), API::moduleImport("os").getMember("getenv").getACall(),
API::moduleImport("os").getMember("environ").getMember("get").getACall()
]
)
or
source instanceof FileAndFormRemoteFlowSource::FastAPI
or
source = TarFile::tarfileInstance().getACall()
or
source = ZipFile::zipFileClass().getACall()
}
predicate isSink(DataFlow::Node sink) {
sink =
[
ZipFile::isSink(), Gzip::isSink(), Lzma::isSink(), Bz2::isSink(), TarFile::isSink(),
Shutil::isSink(), Pandas::isSink()
] and
exists(sink.getLocation().getFile().getRelativePath())
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
(
isAdditionalTaintStepTextIOWrapper(nodeFrom, nodeTo) or
ZipFile::isAdditionalTaintStep(nodeFrom, nodeTo) or
TarFile::isAdditionalTaintStep(nodeFrom, nodeTo)
) and
exists(nodeTo.getLocation().getFile().getRelativePath())
}
}
module Bombs = TaintTracking::Global<BombsConfig>;
import Bombs::PathGraph