Python: Refactor to allow customizations

Also use new DataFlow API
This commit is contained in:
Rasmus Lerchedahl Petersen
2023-08-23 15:03:37 +02:00
parent db0459739f
commit 087961d179
3 changed files with 90 additions and 45 deletions

View File

@@ -0,0 +1,50 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.Concepts
module NoSqlInjection {
private newtype TFlowState =
TStringInput() or
TDictInput()
abstract class FlowState extends TFlowState {
abstract string toString();
}
class StringInput extends FlowState, TStringInput {
override string toString() { result = "StringInput" }
}
class DictInput extends FlowState, TDictInput {
override string toString() { result = "DictInput" }
}
abstract class StringSource extends DataFlow::Node { }
abstract class DictSource extends DataFlow::Node { }
abstract class StringSink extends DataFlow::Node { }
abstract class DictSink extends DataFlow::Node { }
abstract class StringToDictConversion extends DataFlow::Node {
abstract DataFlow::Node getAnInput();
abstract DataFlow::Node getOutput();
}
class RemoteFlowSourceAsStringSource extends RemoteFlowSource, StringSource { }
class NoSqlQueryAsDictSink extends DictSink {
NoSqlQueryAsDictSink() { this = any(NoSqlQuery noSqlQuery).getQuery() }
}
class JsonDecoding extends Decoding, StringToDictConversion {
JsonDecoding() { this.getFormat() = "JSON" }
override DataFlow::Node getAnInput() { result = Decoding.super.getAnInput() }
override DataFlow::Node getOutput() { result = Decoding.super.getOutput() }
}
}

View File

@@ -1,53 +1,48 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.Concepts
private import NoSQLInjectionCustomizations::NoSqlInjection as C
module NoSqlInjection {
class Configuration extends TaintTracking::Configuration {
Configuration() { this = "NoSQLInjection" }
module Config implements DataFlow::StateConfigSig {
class FlowState = C::FlowState;
override predicate isSource(DataFlow::Node source, DataFlow::FlowState state) {
source instanceof RemoteFlowSource and
state instanceof RemoteInput
}
override predicate isSink(DataFlow::Node sink, DataFlow::FlowState state) {
sink = any(NoSqlQuery noSqlQuery).getQuery() and
state instanceof ConvertedToDict
}
override predicate isSanitizer(DataFlow::Node node, DataFlow::FlowState state) {
// Block `RemoteInput` paths here, since they change state to `ConvertedToDict`
exists(Decoding decoding | decoding.getFormat() = "JSON" and node = decoding.getOutput()) and
state instanceof RemoteInput
}
override predicate isAdditionalTaintStep(
DataFlow::Node nodeFrom, DataFlow::FlowState stateFrom, DataFlow::Node nodeTo,
DataFlow::FlowState stateTo
) {
exists(Decoding decoding | decoding.getFormat() = "JSON" |
nodeFrom = decoding.getAnInput() and
nodeTo = decoding.getOutput()
) and
stateFrom instanceof RemoteInput and
stateTo instanceof ConvertedToDict
}
override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = any(NoSqlSanitizer noSqlSanitizer).getAnInput()
}
predicate isSource(DataFlow::Node source, FlowState state) {
source instanceof C::StringSource and
state instanceof C::StringInput
or
source instanceof C::DictSource and
state instanceof C::DictInput
}
/** A flow state signifying remote input. */
class RemoteInput extends DataFlow::FlowState {
RemoteInput() { this = "RemoteInput" }
predicate isSink(DataFlow::Node source, FlowState state) {
source instanceof C::StringSink and
state instanceof C::StringInput
or
source instanceof C::DictSink and
state instanceof C::DictInput
}
/** A flow state signifying remote input converted to a dictionary. */
class ConvertedToDict extends DataFlow::FlowState {
ConvertedToDict() { this = "ConvertedToDict" }
predicate isBarrier(DataFlow::Node node, FlowState state) {
// Block `StringInput` paths here, since they change state to `DictInput`
exists(C::StringToDictConversion c | node = c.getOutput()) and
state instanceof C::StringInput
}
predicate isAdditionalFlowStep(
DataFlow::Node nodeFrom, FlowState stateFrom, DataFlow::Node nodeTo, FlowState stateTo
) {
exists(C::StringToDictConversion c |
nodeFrom = c.getAnInput() and
nodeTo = c.getOutput()
) and
stateFrom instanceof C::StringInput and
stateTo instanceof C::DictInput
}
predicate isBarrier(DataFlow::Node node) {
node = any(NoSqlSanitizer noSqlSanitizer).getAnInput()
}
}
module Flow = TaintTracking::GlobalWithState<Config>;

View File

@@ -12,9 +12,9 @@
import python
import semmle.python.security.dataflow.NoSQLInjectionQuery
import DataFlow::PathGraph
import Flow::PathGraph
from NoSqlInjection::Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink, source, sink, "This NoSQL query contains an unsanitized $@.", source,
from Flow::PathNode source, Flow::PathNode sink
where Flow::flowPath(source, sink)
select sink.getNode(), source, sink, "This NoSQL query contains an unsanitized $@.", source,
"user-provided value"