Files
codeql/python/ql/test/experimental/import-resolution/importflow.ql
yoff de7744950f Python: migrate remaining tests off getAFlowNode() and fix star-import SSA step
Sweep the last few uses of legacy AstNode.getAFlowNode() in tests over to
explicit ControlFlowNode joins after the shared-CFG migration. importflow.ql
needs the new Cfg::ControlFlowNode/CompareNode types because DataFlow::Node.
asCfgNode() now returns the shared-CFG node.

Also extend ImportResolution::allowedEssaImportStep to walk back through
uncertain-write SSA inputs, so that a later 'from X import *' does not hide
the preceding explicit (re)assignment from module-export resolution. Without
this, a reassigned name that survives a wildcard import was no longer
recognised as the module export. Rebless ModuleExport.expected to drop the
legacy 'ControlFlowNode for' toString prefix and pick up the two correct rows
exposed by the fix.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 21:09:49 +00:00

134 lines
4.2 KiB
Plaintext

import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
import utils.test.InlineExpectationsTest
import semmle.python.dataflow.new.internal.ImportResolution
private import semmle.python.controlflow.internal.Cfg as Cfg
/** A string that appears on the right hand side of an assignment. */
private class SourceString extends DataFlow::Node {
string contents;
SourceString() {
this.asExpr().(StringLiteral).getText() = contents and
this.asExpr().getParent() instanceof Assign
or
this.asExpr().(ClassExpr).getInnerScope().getName() = "SOURCE" and
contents = "SOURCE"
}
string getContents() { result = contents }
}
/** An argument that is checked using the `check` function. */
private class CheckArgument extends DataFlow::Node {
CheckArgument() { this = API::moduleImport("trace").getMember("check").getACall().getArg(1) }
}
/** A data-flow node that is a reference to a module. */
private class ModuleRef extends DataFlow::Node {
Module mod;
ModuleRef() {
this = ImportResolution::getModuleReference(mod) and
not mod.getName() in ["__future__", "trace"]
}
string getName() { result = mod.getName() }
}
/**
* A data-flow node that is guarded by a version check. Only supports checks of the form `if
*sys.version_info[0] == ...` where the right hand side is either `2` or `3`.
*/
private class VersionGuardedNode extends DataFlow::Node {
int version;
VersionGuardedNode() {
version in [2, 3] and
exists(If parent, Cfg::CompareNode c, Cfg::ControlFlowNode litCfg |
parent.getBody().contains(this.asExpr()) and
litCfg.getNode() = any(IntegerLiteral lit | lit.getValue() = version)
|
c.operands(API::moduleImport("sys")
.getMember("version_info")
.getASubscript()
.asSource()
.asCfgNode(), any(Eq eq), litCfg)
)
}
int getVersion() { result = version }
}
module ImportConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { source instanceof SourceString }
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("trace").getMember("check").getACall().getArg(1)
}
predicate isBarrier(DataFlow::Node node) {
exists(DataFlow::MethodCallNode call | call.calls(node, "block_flow"))
}
}
module ImportFlow = DataFlow::Global<ImportConfig>;
module ResolutionTest implements TestSig {
string getARelevantTag() { result = "prints" }
predicate hasActualResult(Location location, string element, string tag, string value) {
(
exists(ImportFlow::PathNode source, ImportFlow::PathNode sink |
ImportFlow::flowPath(source, sink) and
not sink.getNode() instanceof VersionGuardedNode and
tag = "prints" and
location = sink.getNode().getLocation() and
value = source.getNode().(SourceString).getContents() and
element = sink.getNode().toString()
)
or
exists(ModuleRef ref |
not ref instanceof VersionGuardedNode and
ref instanceof CheckArgument and
tag = "prints" and
location = ref.getLocation() and
value = "\"<module " + ref.getName() + ">\"" and
element = ref.toString()
)
)
}
}
private string getTagForVersion(int version) {
result = "prints" + version and
version = major_version()
}
module VersionSpecificResolutionTest implements TestSig {
string getARelevantTag() { result = getTagForVersion(_) }
predicate hasActualResult(Location location, string element, string tag, string value) {
(
exists(ImportFlow::PathNode source, ImportFlow::PathNode sink |
ImportFlow::flowPath(source, sink) and
tag = getTagForVersion(sink.getNode().(VersionGuardedNode).getVersion()) and
location = sink.getNode().getLocation() and
value = source.getNode().(SourceString).getContents() and
element = sink.getNode().toString()
)
or
exists(ModuleRef ref |
ref instanceof CheckArgument and
tag = getTagForVersion(ref.(VersionGuardedNode).getVersion()) and
location = ref.getLocation() and
value = "\"<module " + ref.getName() + ">\"" and
element = ref.toString()
)
)
}
}
import MakeTest<MergeTests<ResolutionTest, VersionSpecificResolutionTest>>