mirror of
https://github.com/github/codeql.git
synced 2025-12-23 12:16:33 +01:00
After researching SqlAlchemy and it's various query methods, I discovered several types of SQL injection possibilities. The SQLExecution.py file contains these examples and can be broken up into two types of injections. Injections requiring the text() taint-step and injections NOT requiring the text() taint step.
57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
import sqlalchemy
|
|
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.pool import StaticPool
|
|
from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload
|
|
from sqlalchemy.sql import text
|
|
|
|
engine = create_engine(
|
|
'sqlite:///:memory:',
|
|
echo=True,
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool
|
|
)
|
|
|
|
Base = declarative_base()
|
|
|
|
class User(Base):
|
|
__tablename__ = 'users'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String)
|
|
|
|
Base.metadata.create_all(engine)
|
|
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
ed_user = User(name='ed')
|
|
ed_user2 = User(name='george')
|
|
|
|
session.add(ed_user)
|
|
session.add(ed_user2)
|
|
|
|
session.commit()
|
|
|
|
# Injection without requiring the text() taint-step
|
|
session.query(User).filter_by(name="some sql") # $getSql="some sql"
|
|
session.scalar("some sql") # $getSql="some sql"
|
|
engine.scalar("some sql") # $getSql="some sql"
|
|
session.execute("some sql") # $getSql="some sql"
|
|
|
|
with engine.connect() as connection:
|
|
connection.execute("some sql") # $getSql="some sql"
|
|
|
|
with engine.begin() as connection:
|
|
connection.execute("some sql") # $getSql="some sql"
|
|
|
|
# Injection requiring the text() taint-step
|
|
session.query(User).filter(text("some sql")) # $getSql="some sql"
|
|
session.query(User).group_by( User.id ).having(text("some sql")) # $getSql="some sql"
|
|
session.query(User).group_by(text("name='some sql'")).first() # $getSql="some sql"
|
|
session.query(User).order_by(text("name='some sql'")).first() # $getSql="some sql"
|
|
|
|
query = select(User).where(User.name == text("some sql")) # $getSql="some sql"
|
|
with engine.connect() as conn:
|
|
conn.execute(query)
|