diff --git a/python/ql/lib/semmle/python/frameworks/Socketio.qll b/python/ql/lib/semmle/python/frameworks/Socketio.qll index ca873d8ecf2..3df1901a1dd 100644 --- a/python/ql/lib/semmle/python/frameworks/Socketio.qll +++ b/python/ql/lib/semmle/python/frameworks/Socketio.qll @@ -88,7 +88,11 @@ module SocketIO { } /** Gets a reference to an instance of a subclass of `socketio.Namespace` or `socketio.AsyncNamespace`. */ - API::Node instance() { result = subclassRef().getAnInstance() } + API::Node instance() { + result = subclassRef().getAnInstance() + or + result = subclassRef().getAMember().getSelfParameter() + } /** A socketio Namespace class. */ class NamespaceClass extends Class { diff --git a/python/ql/test/library-tests/frameworks/socketio/taint_test.py b/python/ql/test/library-tests/frameworks/socketio/taint_test.py index 0073e36ff60..343c093feeb 100644 --- a/python/ql/test/library-tests/frameworks/socketio/taint_test.py +++ b/python/ql/test/library-tests/frameworks/socketio/taint_test.py @@ -22,34 +22,45 @@ def event1(sid, data): # $ requestHandler routedParameter=data ensure_tainted(data) # $ tainted res = sio.call("e1", sid=sid) ensure_tainted(res) # $ tainted - sio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted - sio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted + sio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + sio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x class MyNamespace(socketio.Namespace): def on_event2(self, sid, data): # $ requestHandler routedParameter=data ensure_not_tainted(self, sid) - ensure_tainted(data) + ensure_tainted(data) # $ tainted res = self.call("e1", sid=sid) ensure_tainted(res) # $ tainted - self.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted - self.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted + self.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + self.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x sio.register_namespace(MyNamespace("/ns")) asio = socketio.AsyncServer(async_mode='asgi') @asio.event -async def event3(sid, data): # $ requestHandler routedParameter=sid routedParameter=data +async def event3(sid, data): # $ requestHandler routedParameter=data ensure_not_tainted(sid) ensure_tainted(data) # $ tainted res = await asio.call("e1", sid=sid) ensure_tainted(res) # $ tainted - await asio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted - await asio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted + await asio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + await asio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + +class MyAsyncNamespace(socketio.AsyncNamespace): + async def on_event4(self, sid, data): # $ requestHandler routedParameter=data + ensure_not_tainted(self, sid) + ensure_tainted(data) # $ tainted + res = await self.call("e1", sid=sid) + ensure_tainted(res) # $ tainted + await self.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + await self.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x + +asio.register_namespace(MyAsyncNamespace("/ns")) if __name__ == "__main__": - if "--async" in sys.argv: + if "--async" in sys.argv: # $ threatModelSource[commandargs]=sys.argv import uvicorn app = socketio.ASGIApp(asio) uvicorn.run(app, host='127.0.0.1', port=8000)