Python: Replace type tracking with global data-flow

This takes care of most of the false negatives from the preceding
commit.

Additionally, we add models for some known wrappers of `socket.socket`
from the `gevent` and `eventlet` packages.
This commit is contained in:
Taus
2026-03-26 14:53:18 +00:00
parent 1ecd9e83b8
commit c439fc5d45
7 changed files with 44 additions and 43 deletions

View File

@@ -0,0 +1,9 @@
extensions:
- addsTo:
pack: codeql/python-all
extensible: typeModel
data:
# See https://eventlet.readthedocs.io/en/latest/patching.html
- ['socket.socket', 'eventlet', 'Member[green].Member[socket].Member[socket].ReturnValue']
# eventlet also re-exports as eventlet.socket for convenience
- ['socket.socket', 'eventlet', 'Member[socket].Member[socket].ReturnValue']

View File

@@ -0,0 +1,7 @@
extensions:
- addsTo:
pack: codeql/python-all
extensible: typeModel
data:
# See https://www.gevent.org/api/gevent.socket.html
- ['socket.socket', 'gevent', 'Member[socket].Member[socket].ReturnValue']

View File

@@ -27,6 +27,8 @@ extensions:
extensible: sinkModel
data:
- ["zipfile.ZipFile","Member[extractall].Argument[0,path:]", "path-injection"]
# See https://docs.python.org/3/library/socket.html#socket.socket.bind
- ["socket.socket", "Member[bind].Argument[0,address:]", "bind-socket-all-interfaces"]
- addsTo:
pack: codeql/python-all
@@ -184,6 +186,8 @@ extensions:
pack: codeql/python-all
extensible: typeModel
data:
# See https://docs.python.org/3/library/socket.html#socket.socket
- ['socket.socket', 'socket', 'Member[socket].ReturnValue']
# See https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlparse
- ["urllib.parse.ParseResult~Subclass", 'urllib', 'Member[parse].Member[urlparse]']

View File

@@ -14,7 +14,8 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.TaintTracking
private import semmle.python.frameworks.data.ModelsAsData
/** Gets a hostname that can be used to bind to all interfaces. */
private string vulnerableHostname() {
@@ -26,45 +27,21 @@ private string vulnerableHostname() {
]
}
/** Gets a reference to a hostname that can be used to bind to all interfaces. */
private DataFlow::TypeTrackingNode vulnerableHostnameRef(DataFlow::TypeTracker t, string hostname) {
t.start() and
exists(StringLiteral allInterfacesStringLiteral | hostname = vulnerableHostname() |
allInterfacesStringLiteral.getText() = hostname and
result.asExpr() = allInterfacesStringLiteral
)
or
exists(DataFlow::TypeTracker t2 | result = vulnerableHostnameRef(t2, hostname).track(t2, t))
private module BindToAllInterfacesConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source.asExpr().(StringLiteral).getText() = vulnerableHostname()
}
predicate isSink(DataFlow::Node sink) {
ModelOutput::sinkNode(sink, "bind-socket-all-interfaces")
}
}
/** Gets a reference to a hostname that can be used to bind to all interfaces. */
DataFlow::Node vulnerableHostnameRef(string hostname) {
vulnerableHostnameRef(DataFlow::TypeTracker::end(), hostname).flowsTo(result)
}
private module BindToAllInterfacesFlow = TaintTracking::Global<BindToAllInterfacesConfig>;
/** Gets a reference to a tuple for which the first element is a hostname that can be used to bind to all interfaces. */
private DataFlow::TypeTrackingNode vulnerableAddressTuple(DataFlow::TypeTracker t, string hostname) {
t.start() and
result.asExpr() = any(Tuple tup | tup.getElt(0) = vulnerableHostnameRef(hostname).asExpr())
or
exists(DataFlow::TypeTracker t2 | result = vulnerableAddressTuple(t2, hostname).track(t2, t))
}
/** Gets a reference to a tuple for which the first element is a hostname that can be used to bind to all interfaces. */
DataFlow::Node vulnerableAddressTuple(string hostname) {
vulnerableAddressTuple(DataFlow::TypeTracker::end(), hostname).flowsTo(result)
}
/**
* Gets an instance of `socket.socket` using _some_ address family.
*
* See https://docs.python.org/3/library/socket.html
*/
API::Node socketInstance() { result = API::moduleImport("socket").getMember("socket").getReturn() }
from DataFlow::CallCfgNode bindCall, DataFlow::Node addressArg, string hostname
from DataFlow::Node source, DataFlow::Node sink, DataFlow::CallCfgNode bindCall, string hostname
where
bindCall = socketInstance().getMember("bind").getACall() and
addressArg = bindCall.getArg(0) and
addressArg = vulnerableAddressTuple(hostname)
BindToAllInterfacesFlow::flow(source, sink) and
bindCall.getArg(0) = sink and
hostname = source.asExpr().(StringLiteral).getText()
select bindCall.asExpr(), "'" + hostname + "' binds a socket to all interfaces."

View File

@@ -3,3 +3,7 @@
| BindToAllInterfaces_test.py:17:1:17:26 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:21:1:21:11 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:26:1:26:20 | Attribute() | '::' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:39:9:39:43 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:48:1:48:20 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:53:1:53:27 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |
| BindToAllInterfaces_test.py:58:1:58:27 | Attribute() | '0.0.0.0' binds a socket to all interfaces. |

View File

@@ -36,7 +36,7 @@ class Server:
def start(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((self.bind_addr, self.port)) # $ MISSING: Alert[py/bind-socket-all-network-interfaces]
s.bind((self.bind_addr, self.port)) # $ Alert[py/bind-socket-all-network-interfaces]
server = Server()
server.start()
@@ -45,14 +45,14 @@ server.start()
import os
host = os.environ.get('APP_HOST', '0.0.0.0')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, 8080)) # $ MISSING: Alert[py/bind-socket-all-network-interfaces]
s.bind((host, 8080)) # $ Alert[py/bind-socket-all-network-interfaces]
# gevent.socket (alternative socket module)
from gevent import socket as gsocket
gs = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
gs.bind(('0.0.0.0', 31137)) # $ MISSING: Alert[py/bind-socket-all-network-interfaces]
gs.bind(('0.0.0.0', 31137)) # $ Alert[py/bind-socket-all-network-interfaces]
# eventlet.green.socket (another alternative socket module)
from eventlet.green import socket as esocket
es = esocket.socket(esocket.AF_INET, esocket.SOCK_STREAM)
es.bind(('0.0.0.0', 31137)) # $ MISSING: Alert[py/bind-socket-all-network-interfaces]
es.bind(('0.0.0.0', 31137)) # $ Alert[py/bind-socket-all-network-interfaces]