Files
codeql/python/ql/test/experimental/import-resolution/importflow.ql
Copilot db1e5035b4 Python: deprecate AstNode.getAFlowNode() and rewrite internal callers
Preparatory refactor for the shared-CFG dataflow migration.

Deprecates the AstNode.getAFlowNode() cached predicate on the public
Python QL API and rewrites all ~140 internal callers across lib/, src/,
test/, and tools/ from `expr.getAFlowNode() = cfgNode` to
`cfgNode.getNode() = expr`, using ControlFlowNode.getNode() which
already exists in Flow.qll.

The predicate itself is preserved (with a deprecation note pointing at
the new pattern) so external users do not experience churn — they can
migrate at their own pace and the AST/CFG hierarchies still get the
intended untangling once the deprecation eventually elapses.

Semantic noop verified by:
- All 361 lib/ + src/ queries compile clean.
- All 122 ControlFlow + PointsTo library-tests pass.
- All 64 dataflow library-tests pass.
- All 113 Variables/Exceptions/Expressions/Statements/Functions/Imports/
  Security/CWE-798/ModificationOfParameterWithDefault query-tests pass.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-05 07:57:42 +00:00

133 lines
4.1 KiB
Plaintext

import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
import utils.test.InlineExpectationsTest
import semmle.python.dataflow.new.internal.ImportResolution
/** A string that appears on the right hand side of an assignment. */
private class SourceString extends DataFlow::Node {
string contents;
SourceString() {
this.asExpr().(StringLiteral).getText() = contents and
this.asExpr().getParent() instanceof Assign
or
this.asExpr().(ClassExpr).getInnerScope().getName() = "SOURCE" and
contents = "SOURCE"
}
string getContents() { result = contents }
}
/** An argument that is checked using the `check` function. */
private class CheckArgument extends DataFlow::Node {
CheckArgument() { this = API::moduleImport("trace").getMember("check").getACall().getArg(1) }
}
/** A data-flow node that is a reference to a module. */
private class ModuleRef extends DataFlow::Node {
Module mod;
ModuleRef() {
this = ImportResolution::getModuleReference(mod) and
not mod.getName() in ["__future__", "trace"]
}
string getName() { result = mod.getName() }
}
/**
* A data-flow node that is guarded by a version check. Only supports checks of the form `if
*sys.version_info[0] == ...` where the right hand side is either `2` or `3`.
*/
private class VersionGuardedNode extends DataFlow::Node {
int version;
VersionGuardedNode() {
version in [2, 3] and
exists(If parent, CompareNode c, ControlFlowNode litCfg |
parent.getBody().contains(this.asExpr()) and
litCfg.getNode() = any(IntegerLiteral lit | lit.getValue() = version)
|
c.operands(API::moduleImport("sys")
.getMember("version_info")
.getASubscript()
.asSource()
.asCfgNode(), any(Eq eq), litCfg)
)
}
int getVersion() { result = version }
}
module ImportConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { source instanceof SourceString }
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("trace").getMember("check").getACall().getArg(1)
}
predicate isBarrier(DataFlow::Node node) {
exists(DataFlow::MethodCallNode call | call.calls(node, "block_flow"))
}
}
module ImportFlow = DataFlow::Global<ImportConfig>;
module ResolutionTest implements TestSig {
string getARelevantTag() { result = "prints" }
predicate hasActualResult(Location location, string element, string tag, string value) {
(
exists(ImportFlow::PathNode source, ImportFlow::PathNode sink |
ImportFlow::flowPath(source, sink) and
not sink.getNode() instanceof VersionGuardedNode and
tag = "prints" and
location = sink.getNode().getLocation() and
value = source.getNode().(SourceString).getContents() and
element = sink.getNode().toString()
)
or
exists(ModuleRef ref |
not ref instanceof VersionGuardedNode and
ref instanceof CheckArgument and
tag = "prints" and
location = ref.getLocation() and
value = "\"<module " + ref.getName() + ">\"" and
element = ref.toString()
)
)
}
}
private string getTagForVersion(int version) {
result = "prints" + version and
version = major_version()
}
module VersionSpecificResolutionTest implements TestSig {
string getARelevantTag() { result = getTagForVersion(_) }
predicate hasActualResult(Location location, string element, string tag, string value) {
(
exists(ImportFlow::PathNode source, ImportFlow::PathNode sink |
ImportFlow::flowPath(source, sink) and
tag = getTagForVersion(sink.getNode().(VersionGuardedNode).getVersion()) and
location = sink.getNode().getLocation() and
value = source.getNode().(SourceString).getContents() and
element = sink.getNode().toString()
)
or
exists(ModuleRef ref |
ref instanceof CheckArgument and
tag = getTagForVersion(ref.(VersionGuardedNode).getVersion()) and
location = ref.getLocation() and
value = "\"<module " + ref.getName() + ">\"" and
element = ref.toString()
)
)
}
}
import MakeTest<MergeTests<ResolutionTest, VersionSpecificResolutionTest>>