Python: Rewrite azure query to more idiomatic ql

This commit is contained in:
Rasmus Wriedt Larsen
2023-03-27 17:37:09 +02:00
parent 691ffcd3a4
commit 7a17cd2a9e

View File

@@ -12,38 +12,32 @@
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
predicate isUnsafeClientSideAzureStorageEncryptionViaAttributes(Call call, AttrNode node) {
exists(
API::Node n, ControlFlowNode startingNode, Attribute attr, ControlFlowNode ctrlFlowNode,
Attribute attrUploadBlob, ControlFlowNode ctrlFlowNodeUploadBlob, string s1, string s2,
string s3
API::Node client, DataFlow::AttrWrite keyAttrWrite, DataFlow::MethodCallNode uploadBlobCall
|
call.getAChildNode() = attrUploadBlob and
node = ctrlFlowNode
call = uploadBlobCall.asExpr() and node = keyAttrWrite.asCfgNode()
|
s1 in ["key_encryption_key", "key_resolver_function"] and
s2 in ["ContainerClient", "BlobClient", "BlobServiceClient"] and
s3 = "upload_blob" and
n = API::moduleImport("azure").getMember("storage").getMember("blob").getMember(s2).getAMember() and
startingNode = n.getACall().getReturn().getAValueReachableFromSource().asExpr().getAFlowNode() and
startingNode.strictlyReaches(ctrlFlowNode) and
attr.getAFlowNode() = ctrlFlowNode and
attr.getName() = s1 and
ctrlFlowNode.strictlyReaches(ctrlFlowNodeUploadBlob) and
attrUploadBlob.getAFlowNode() = ctrlFlowNodeUploadBlob and
attrUploadBlob.getName() = s3 and
not exists(
Attribute attrBarrier, ControlFlowNode ctrlFlowNodeBarrier, AssignStmt astmt2, StrConst uc
|
startingNode.strictlyReaches(ctrlFlowNodeBarrier) and
attrBarrier.getAFlowNode() = ctrlFlowNodeBarrier and
attrBarrier.getName() = "encryption_version" and
uc = astmt2.getValue() and
uc.getText() in ["'2.0'", "2.0"] and
astmt2.getATarget().getAChildNode*() = attrBarrier and
ctrlFlowNodeBarrier.strictlyReaches(ctrlFlowNodeUploadBlob)
client =
API::moduleImport("azure")
.getMember("storage")
.getMember("blob")
.getMember(["ContainerClient", "BlobClient", "BlobServiceClient"])
.getAMember()
.getReturn() and
keyAttrWrite
.accesses(client.getAValueReachableFromSource(),
["key_encryption_key", "key_resolver_function"]) and
uploadBlobCall.calls(client.getAValueReachableFromSource(), "upload_blob") and
DataFlow::localFlow(keyAttrWrite.getObject(), uploadBlobCall.getObject()) and
not exists(DataFlow::AttrWrite encryptionVersionWrite |
encryptionVersionWrite.accesses(client.getAValueReachableFromSource(), "encryption_version") and
encryptionVersionWrite.getValue().asExpr().(StrConst).getText() in ["'2.0'", "2.0"] and
DataFlow::localFlow(keyAttrWrite.getObject(), encryptionVersionWrite.getObject()) and
DataFlow::localFlow(encryptionVersionWrite.getObject(), uploadBlobCall.getObject())
)
)
}