Crypto: To allow for graph generation to have properties informed by assessments, altering a few queries weak/vuln/bad crypto to have qll files that can be accessed for other purposes, like graph generation. Also altering weak symmetric cipher to look for non-aes algorithms to be more comprehensive.

This commit is contained in:
REDMOND\brodes
2025-10-20 15:51:07 -04:00
parent cc436e897d
commit eff94ef91f
7 changed files with 228 additions and 196 deletions

View File

@@ -0,0 +1,171 @@
import java
import experimental.quantum.Language
import codeql.util.Option
/**
* Holds when the src node is the output artifact of a decrypt operation
* that flows to the input artifact of a mac operation.
*/
predicate isDecryptToMacFlow(ArtifactFlow::PathNode src, ArtifactFlow::PathNode sink) {
ArtifactFlow::flowPath(src, sink) and
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TDecryptMode() and
cipherOp.getAnOutputArtifact().asElement() = src.getNode().asExpr()
) and
exists(Crypto::MacOperationNode macOp |
macOp.getAnInputArtifact().asElement() = sink.getNode().asExpr()
)
}
/**
* Holds when the src node is used as plaintext input to both
* an encryption operation and a mac operation, via the
* argument represented by InterimArg.
*/
predicate isPlaintextInEncryptionAndMac(
PlaintextUseAsMacAndCipherInputFlow::PathNode src,
PlaintextUseAsMacAndCipherInputFlow::PathNode sink, InterimArg arg
) {
PlaintextUseAsMacAndCipherInputFlow::flowPath(src, sink) and
arg = sink.getState().asSome()
}
module ArgToSinkConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { exists(Call c | c.getAnArgument() = source.asExpr()) }
predicate isSink(DataFlow::Node sink) { targetSinks(sink) }
// Don't go in to a known out node, this will prevent the plaintext
// from tracing out of cipher operations for example, we just want to trace
// the plaintext to uses.
// NOTE: we are not using a barrier out on input nodes, because
// that would remove 'use-use' flows, which we need
predicate isBarrierIn(DataFlow::Node node) {
node = any(Crypto::FlowAwareElement element).getOutputNode()
}
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
node1.(AdditionalFlowInputStep).getOutput() = node2
or
exists(MethodCall m |
m.getMethod().hasQualifiedName("java.lang", "String", "getBytes") and
node1.asExpr() = m.getQualifier() and
node2.asExpr() = m
)
}
}
module ArgToSinkFlow = TaintTracking::Global<ArgToSinkConfig>;
/**
* Target sinks for this query are either encryption operations or mac operation message inputs
*/
predicate targetSinks(DataFlow::Node n) {
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TEncryptMode() and
cipherOp.getAnInputArtifact().asElement() = n.asExpr()
)
or
exists(Crypto::MacOperationNode macOp | macOp.getAnInputArtifact().asElement() = n.asExpr())
}
/**
* An argument of a target sink or a parent call whose parameter flows to a target sink
*/
class InterimArg extends DataFlow::Node {
DataFlow::Node targetSink;
InterimArg() {
targetSinks(targetSink) and
(
this = targetSink
or
ArgToSinkFlow::flow(this, targetSink) and
this.getEnclosingCallable().calls+(targetSink.getEnclosingCallable())
)
}
DataFlow::Node getTargetSink() { result = targetSink }
}
/**
* A wrapper class to represent a target argument dataflow node.
*/
class TargetArg extends DataFlow::Node {
TargetArg() { targetSinks(this) }
predicate isCipher() {
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TEncryptMode() and
cipherOp.getAnInputArtifact().asElement() = this.asExpr()
)
}
predicate isMac() {
exists(Crypto::MacOperationNode macOp | macOp.getAnInputArtifact().asElement() = this.asExpr())
}
}
module PlaintextUseAsMacAndCipherInputConfig implements DataFlow::StateConfigSig {
class FlowState = Option<TargetArg>::Option;
// TODO: can we approximate a message source better?
predicate isSource(DataFlow::Node source, FlowState state) {
// TODO: can we find the 'closest' parameter to the sinks?
// i.e., use a generic source if we have it, but also isolate the
// lowest level in the flow to the closest parameter node in the call graph?
exists(Crypto::GenericSourceNode other |
other.asElement() = CryptoInput::dfn_to_element(source)
) and
state.isNone()
}
predicate isSink(DataFlow::Node sink, FlowState state) {
sink instanceof TargetArg and
(
sink.(TargetArg).isMac() and state.asSome().isCipher()
or
sink.(TargetArg).isCipher() and state.asSome().isMac()
)
}
predicate isBarrierOut(DataFlow::Node node, FlowState state) {
// Stop at the first sink for now
isSink(node, state)
}
// Don't go in to a known out node, this will prevent the plaintext
// from tracing out of cipher operations for example, we just want to trace
// the plaintext to uses.
// NOTE: we are not using a barrier out on input nodes, because
// that would remove 'use-use' flows, which we need
predicate isBarrierIn(DataFlow::Node node) {
node = any(Crypto::FlowAwareElement element).getOutputNode()
}
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
node1.(AdditionalFlowInputStep).getOutput() = node2
or
exists(MethodCall m |
m.getMethod().hasQualifiedName("java.lang", "String", "getBytes") and
node1.asExpr() = m.getQualifier() and
node2.asExpr() = m
)
}
predicate isAdditionalFlowStep(
DataFlow::Node node1, FlowState state1, DataFlow::Node node2, FlowState state2
) {
(exists(state1.asSome()) or state1.isNone()) and
targetSinks(node1) and
node1 instanceof TargetArg and
//use-use flow, either flow directly from the node1 use
//or find a parent call in the call in the call stack
//and continue flow from that parameter
node2.(InterimArg).getTargetSink() = node1 and
state2.asSome() = node1
}
}
module PlaintextUseAsMacAndCipherInputFlow =
TaintTracking::GlobalWithState<PlaintextUseAsMacAndCipherInputConfig>;

View File

@@ -11,16 +11,9 @@
import java
import experimental.quantum.Language
import ArtifactFlow::PathGraph
import BadMacOrder
from ArtifactFlow::PathNode src, ArtifactFlow::PathNode sink
where
ArtifactFlow::flowPath(src, sink) and
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TDecryptMode() and
cipherOp.getAnOutputArtifact().asElement() = src.getNode().asExpr()
) and
exists(Crypto::MacOperationNode macOp |
macOp.getAnInputArtifact().asElement() = sink.getNode().asExpr()
)
where isDecryptToMacFlow(src, sink)
select sink, src, sink,
"MAC order potentially wrong: observed a potential decrypt operation output to MAC implying the MAC is on plaintext, and not a cipher."

View File

@@ -10,156 +10,13 @@
import java
import experimental.quantum.Language
import codeql.util.Option
module ArgToSinkConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { exists(Call c | c.getAnArgument() = source.asExpr()) }
predicate isSink(DataFlow::Node sink) { targetSinks(sink) }
// Don't go in to a known out node, this will prevent the plaintext
// from tracing out of cipher operations for example, we just want to trace
// the plaintext to uses.
// NOTE: we are not using a barrier out on input nodes, because
// that would remove 'use-use' flows, which we need
predicate isBarrierIn(DataFlow::Node node) {
node = any(Crypto::FlowAwareElement element).getOutputNode()
}
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
node1.(AdditionalFlowInputStep).getOutput() = node2
or
exists(MethodCall m |
m.getMethod().hasQualifiedName("java.lang", "String", "getBytes") and
node1.asExpr() = m.getQualifier() and
node2.asExpr() = m
)
}
}
module ArgToSinkFlow = TaintTracking::Global<ArgToSinkConfig>;
/**
* Target sinks for this query are either encryption operations or mac operation message inputs
*/
predicate targetSinks(DataFlow::Node n) {
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TEncryptMode() and
cipherOp.getAnInputArtifact().asElement() = n.asExpr()
)
or
exists(Crypto::MacOperationNode macOp | macOp.getAnInputArtifact().asElement() = n.asExpr())
}
/**
* An argument of a target sink or a parent call whose parameter flows to a target sink
*/
class InterimArg extends DataFlow::Node {
DataFlow::Node targetSink;
InterimArg() {
targetSinks(targetSink) and
(
this = targetSink
or
ArgToSinkFlow::flow(this, targetSink) and
this.getEnclosingCallable().calls+(targetSink.getEnclosingCallable())
)
}
DataFlow::Node getTargetSink() { result = targetSink }
}
/**
* A wrapper class to represent a target argument dataflow node.
*/
class TargetArg extends DataFlow::Node {
TargetArg() { targetSinks(this) }
predicate isCipher() {
exists(Crypto::CipherOperationNode cipherOp |
cipherOp.getKeyOperationSubtype() = Crypto::TEncryptMode() and
cipherOp.getAnInputArtifact().asElement() = this.asExpr()
)
}
predicate isMac() {
exists(Crypto::MacOperationNode macOp | macOp.getAnInputArtifact().asElement() = this.asExpr())
}
}
module PlaintextUseAsMacAndCipherInputConfig implements DataFlow::StateConfigSig {
class FlowState = Option<TargetArg>::Option;
// TODO: can we approximate a message source better?
predicate isSource(DataFlow::Node source, FlowState state) {
// TODO: can we find the 'closest' parameter to the sinks?
// i.e., use a generic source if we have it, but also isolate the
// lowest level in the flow to the closest parameter node in the call graph?
exists(Crypto::GenericSourceNode other |
other.asElement() = CryptoInput::dfn_to_element(source)
) and
state.isNone()
}
predicate isSink(DataFlow::Node sink, FlowState state) {
sink instanceof TargetArg and
(
sink.(TargetArg).isMac() and state.asSome().isCipher()
or
sink.(TargetArg).isCipher() and state.asSome().isMac()
)
}
predicate isBarrierOut(DataFlow::Node node, FlowState state) {
// Stop at the first sink for now
isSink(node, state)
}
// Don't go in to a known out node, this will prevent the plaintext
// from tracing out of cipher operations for example, we just want to trace
// the plaintext to uses.
// NOTE: we are not using a barrier out on input nodes, because
// that would remove 'use-use' flows, which we need
predicate isBarrierIn(DataFlow::Node node) {
node = any(Crypto::FlowAwareElement element).getOutputNode()
}
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
node1.(AdditionalFlowInputStep).getOutput() = node2
or
exists(MethodCall m |
m.getMethod().hasQualifiedName("java.lang", "String", "getBytes") and
node1.asExpr() = m.getQualifier() and
node2.asExpr() = m
)
}
predicate isAdditionalFlowStep(
DataFlow::Node node1, FlowState state1, DataFlow::Node node2, FlowState state2
) {
(exists(state1.asSome()) or state1.isNone()) and
targetSinks(node1) and
node1 instanceof TargetArg and
//use-use flow, either flow directly from the node1 use
//or find a parent call in the call in the call stack
//and continue flow from that parameter
node2.(InterimArg).getTargetSink() = node1 and
state2.asSome() = node1
}
}
module PlaintextUseAsMacAndCipherInputFlow =
TaintTracking::GlobalWithState<PlaintextUseAsMacAndCipherInputConfig>;
import BadMacOrder
import PlaintextUseAsMacAndCipherInputFlow::PathGraph
from
PlaintextUseAsMacAndCipherInputFlow::PathNode src,
PlaintextUseAsMacAndCipherInputFlow::PathNode sink, InterimArg arg
where
PlaintextUseAsMacAndCipherInputFlow::flowPath(src, sink) and
arg = sink.getState().asSome()
where isPlaintextInEncryptionAndMac(src, sink, arg)
select sink, src, sink,
"Source is used as plaintext to MAC and encryption operation. Indicates possible misuse of MAC. Path shows plaintext to final use through intermediate mac or encryption operation here $@",
arg.asExpr(), arg.asExpr().toString()

View File

@@ -9,28 +9,8 @@
* experimental
*/
import java
import experimental.quantum.Language
import WeakHash
from Crypto::HashAlgorithmNode alg, Crypto::HashType htype, string msg
where
htype = alg.getHashType() and
(
(htype != Crypto::SHA2() and htype != Crypto::SHA3()) and
msg = "Use of unapproved hash algorithm or API: " + htype.toString() + "."
or
(htype = Crypto::SHA2() or htype = Crypto::SHA3()) and
not exists(alg.getDigestLength()) and
msg =
"Use of approved hash algorithm or API type " + htype.toString() + " but unknown digest size."
or
exists(int digestLength |
digestLength = alg.getDigestLength() and
(htype = Crypto::SHA2() or htype = Crypto::SHA3()) and
digestLength < 256 and
msg =
"Use of approved hash algorithm or API type " + htype.toString() + " but weak digest size ("
+ digestLength + ")."
)
)
where isUnapprovedHash(alg, htype, msg)
select alg, msg

View File

@@ -0,0 +1,23 @@
import experimental.quantum.Language
predicate isUnapprovedHash(Crypto::HashAlgorithmNode alg, Crypto::HashType htype, string msg) {
htype = alg.getHashType() and
(
(htype != Crypto::SHA2() and htype != Crypto::SHA3()) and
msg = "Use of unapproved hash algorithm or API: " + htype.toString() + "."
or
(htype = Crypto::SHA2() or htype = Crypto::SHA3()) and
not exists(alg.getDigestLength()) and
msg =
"Use of approved hash algorithm or API type " + htype.toString() + " but unknown digest size."
or
exists(int digestLength |
digestLength = alg.getDigestLength() and
(htype = Crypto::SHA2() or htype = Crypto::SHA3()) and
digestLength < 256 and
msg =
"Use of approved hash algorithm or API type " + htype.toString() + " but weak digest size ("
+ digestLength + ")."
)
)
}

View File

@@ -9,24 +9,8 @@
* experimental
*/
import java
import experimental.quantum.Language
import Crypto::KeyOpAlg as KeyOpAlg
import WeakSymmetricCipher
from Crypto::KeyOperationAlgorithmNode alg, KeyOpAlg::AlgorithmType algType
where
algType = alg.getAlgorithmType() and
// NOTE: an org may disallow all but AES we could similarly look for
// algType != KeyOpAlg::TSymmetricCipher(KeyOpAlg::AES())
// This is a more comprehensive check than looking for all weak ciphers
(
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::DES()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::TRIPLE_DES()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::DOUBLE_DES()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::RC2()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::RC4()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::IDEA()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::BLOWFISH()) or
algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::SKIPJACK())
)
select alg, "Use of unapproved symmetric cipher algorithm or API: " + algType.toString() + "."
from Crypto::KeyOperationAlgorithmNode alg, string msg
where isUnapprovedSymmetricCipher(alg, msg)
select alg, msg

View File

@@ -0,0 +1,24 @@
import experimental.quantum.Language
import Crypto::KeyOpAlg as KeyOpAlg
/**
* Holds when the given symmetric cipher algorithm is unapproved or weak.
*/
predicate isUnapprovedSymmetricCipher(Crypto::KeyOperationAlgorithmNode alg, string msg) {
exists(KeyOpAlg::AlgorithmType algType |
algType = alg.getAlgorithmType() and
msg = "Use of unapproved symmetric cipher algorithm or API: " + algType.toString() + "." and
algType != KeyOpAlg::TSymmetricCipher(KeyOpAlg::AES())
)
// NOTE: an org could decide to disallow very specific algorithms as well, shown below
// (
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::DES()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::TRIPLE_DES()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::DOUBLE_DES()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::RC2()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::RC4()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::IDEA()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::BLOWFISH()) or
// algType = KeyOpAlg::TSymmetricCipher(KeyOpAlg::SKIPJACK())
// )
}