Add PEP249 connection tracking through class attribute wrappers

Introduce two new Connection::InstanceSource subclasses in PEP249.qll:

- ConnectionGetterAttributeRead: recognises self._conn reads inside
  getter methods of classes whose __init__ stores a connect() call in
  that attribute. The AttrRead node coincides with the return node, so
  the existing TypeTracker returnStep propagates the connection type to
  all call sites automatically.

- ConnectionConstructorAttributeRead: recognises ClassName()._conn
  direct attribute reads on constructor-call results.

Both classes share the classStoresConnectionInInit helper predicate
that checks for the self.attr = dbapi.connect() store pattern in __init__.

Also adds test cases for the new patterns in the hdbcli test suite
and a change note.
This commit is contained in:
copilot-swe-agent[bot]
2026-05-21 23:22:29 +00:00
committed by GitHub
parent 0ef59dffb4
commit 0ea1b8596e
3 changed files with 119 additions and 0 deletions

View File

@@ -8,6 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.dataflow.new.internal.DataFlowDispatch as DataFlowDispatch
/**
* Provides classes modeling database interfaces following PEP 249.
@@ -212,6 +213,74 @@ module PEP249 {
ConnectCall() { this.getFunction() = connect() }
}
/**
* Holds if class `cls` stores a PEP 249 database connection to `self.<attrName>`
* in its `__init__` method, via a direct call to a `connect` function.
*/
private predicate classStoresConnectionInInit(Class cls, string attrName) {
exists(Function init, DataFlow::AttrWrite store |
cls.getAMethod() = init and
init.getName() = "__init__" and
store.getAttributeName() = attrName and
store.getObject().asCfgNode().getNode().(Name).getVariable() =
init.getArg(0).asName().getVariable() and
store.getValue() instanceof ConnectCall
)
}
/**
* A read of a connection-holding attribute within a method of a class whose
* `__init__` stores a PEP 249 connection in that attribute.
*
* This recognises patterns such as:
* ```python
* class Wrapper:
* def __init__(self):
* self._conn = dbapi.connect(...)
* def get_connection(self):
* return self._conn # <-- recognised as a connection source
* ```
* Because the `AttrRead` node for `self._conn` inside `get_connection` is
* also the `ExtractedReturnNode` for that statement, the existing TypeTracker
* `returnStep` automatically propagates the connection type to all call sites
* of `get_connection`.
*/
private class ConnectionGetterAttributeRead extends InstanceSource, DataFlow::AttrRead {
ConnectionGetterAttributeRead() {
exists(Class cls, Function method, string attrName |
classStoresConnectionInInit(cls, attrName) and
cls.getAMethod() = method and
method.getName() != "__init__" and
this.getAttributeName() = attrName and
this.getObject().asCfgNode().getNode().(Name).getVariable() =
method.getArg(0).asName().getVariable()
)
}
}
/**
* An attribute access on a constructor-call result that directly reads the
* connection-holding attribute.
*
* This recognises patterns such as:
* ```python
* class Wrapper:
* def __init__(self):
* self._conn = dbapi.connect(...)
*
* conn = Wrapper()._conn # <-- recognised as a connection source
* ```
*/
private class ConnectionConstructorAttributeRead extends InstanceSource, DataFlow::AttrRead {
ConnectionConstructorAttributeRead() {
exists(Class cls, string attrName |
classStoresConnectionInInit(cls, attrName) and
this.getAttributeName() = attrName and
DataFlowDispatch::resolveClassCall(this.getObject().asCfgNode().(CallNode), cls)
)
}
}
/** Gets a reference to a database connection (following PEP 249). */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and

View File

@@ -0,0 +1,4 @@
---
category: minorAnalysis
---
* Improved detection of SQL injection and other PEP 249 database-related vulnerabilities when a database connection is stored in a class instance attribute and accessed through a getter method or direct attribute read. For example, patterns like `self._conn = dbapi.connect(...)` in `__init__` followed by `return self._conn` in a getter method, or `MyClass()._conn`, are now correctly recognised as PEP 249 connection sources.

View File

@@ -7,3 +7,49 @@ cursor.execute("some sql", (42,)) # $ getSql="some sql"
cursor.executemany("some sql", (42,)) # $ getSql="some sql"
cursor.close()
# ---------------------------------------------------------------------------
# Connection stored in a class attribute and accessed via various patterns
# ---------------------------------------------------------------------------
class WrapperA:
def __init__(self):
self._conn = dbapi.connect(address="hostname", port=300, user="username", pass_arg="testpass")
def get_connection(self):
return self._conn
# Getter called on a fresh constructor result
conn_a1 = WrapperA().get_connection()
cursor_a1 = conn_a1.cursor()
cursor_a1.execute("some sql", (42,)) # $ getSql="some sql"
# Getter called via a stored wrapper instance
wrapper_instance = WrapperA()
conn_a2 = wrapper_instance.get_connection()
cursor_a2 = conn_a2.cursor()
cursor_a2.execute("some sql", (42,)) # $ getSql="some sql"
# Direct attribute access on a fresh constructor result
conn_b = WrapperA()._conn
cursor_b = conn_b.cursor()
cursor_b.execute("some sql", (42,)) # $ getSql="some sql"
class WrapperB:
"""Stores the connection under a different attribute name."""
def __init__(self):
self._hana = dbapi.connect(address="hostname", port=300, user="username", pass_arg="testpass")
def cursor(self):
return self._hana.cursor()
# Direct attribute access on a stored instance (mirrors hdb_con3 in the issue)
conn_c = WrapperB()._hana
cursor_c = conn_c.cursor()
cursor_c.execute("some sql", (42,)) # $ getSql="some sql"