Files
codeql/python/ql/test/library-tests/frameworks/socketio/taint_test.py
Joe Farebrother 16018e91a2 Minor test fix
2025-11-26 15:47:56 +00:00

69 lines
2.8 KiB
Python

import sys
import socketio
def ensure_tainted(*args):
print("tainted", args)
def ensure_not_tainted(*args):
print("not tainted", args)
sio = socketio.Server()
@sio.event
def connect(sid, environ, auth): # $ requestHandler routedParameter=environ routedParameter=auth
ensure_not_tainted(sid)
ensure_tainted(environ, # $ tainted
auth) # $ tainted
@sio.event
def event1(sid, data): # $ requestHandler routedParameter=data
ensure_not_tainted(sid)
ensure_tainted(data) # $ tainted
res = sio.call("e1", sid=sid)
ensure_tainted(res) # $ tainted
sio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
sio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
class MyNamespace(socketio.Namespace):
def on_event2(self, sid, data): # $ requestHandler routedParameter=data
ensure_not_tainted(self, sid)
ensure_tainted(data) # $ tainted
res = self.call("e1", sid=sid)
ensure_tainted(res) # $ tainted
self.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
self.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
sio.register_namespace(MyNamespace("/ns"))
asio = socketio.AsyncServer(async_mode='asgi')
@asio.event
async def event3(sid, data): # $ requestHandler routedParameter=data
ensure_not_tainted(sid)
ensure_tainted(data) # $ tainted
res = await asio.call("e1", sid=sid)
ensure_tainted(res) # $ tainted
await asio.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
await asio.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
class MyAsyncNamespace(socketio.AsyncNamespace):
async def on_event4(self, sid, data): # $ requestHandler routedParameter=data
ensure_not_tainted(self, sid)
ensure_tainted(data) # $ tainted
res = await self.call("e1", sid=sid)
ensure_tainted(res) # $ tainted
await self.emit("e2", "hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
await self.send("hi", to=sid, callback=lambda x: ensure_tainted(x)) # $ tainted $ requestHandler routedParameter=x
asio.register_namespace(MyAsyncNamespace("/ns"))
if __name__ == "__main__":
if "--async" in sys.argv: # $ threatModelSource[commandargs]=sys.argv
import uvicorn
app = socketio.ASGIApp(asio)
uvicorn.run(app, host='127.0.0.1', port=8000)
else:
import eventlet
app = socketio.WSGIApp(sio)
eventlet.wsgi.server(eventlet.listen(('', 8000)), app)