mirror of
https://github.com/github/codeql.git
synced 2026-04-30 11:15:13 +02:00
Python: Add more SQLAlchemy tests
This commit is contained in:
@@ -0,0 +1,402 @@
|
||||
import sqlalchemy
|
||||
import sqlalchemy.orm
|
||||
|
||||
# SQLAlchemy is slowly migrating to a 2.0 version, and as part of 1.4 release have a 2.0
|
||||
# style (forwards compatible) API that _can_ be adopted. So these tests are marked with
|
||||
# either v1.4 or v2.0, such that we cover both.
|
||||
|
||||
raw_sql = "select 'FOO'"
|
||||
text_sql = sqlalchemy.text(raw_sql)
|
||||
|
||||
Base = sqlalchemy.orm.declarative_base()
|
||||
|
||||
# ==============================================================================
|
||||
# v1.4
|
||||
# ==============================================================================
|
||||
|
||||
print("v1.4")
|
||||
|
||||
# Engine see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine
|
||||
|
||||
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:", echo=True)
|
||||
|
||||
result = engine.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = engine.execute(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = engine.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
scalar_result = engine.scalar(raw_sql) # $ getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = engine.scalar(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
# engine with custom execution options
|
||||
# see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine.execution_options
|
||||
engine_with_custom_exe_opts = engine.execution_options(foo=42)
|
||||
result = engine_with_custom_exe_opts.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
even_more_opts = engine_with_custom_exe_opts.execution_options(bar=43)
|
||||
result = even_more_opts.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# Connection see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection
|
||||
conn = engine.connect()
|
||||
conn: sqlalchemy.engine.base.Connection
|
||||
|
||||
result = conn.execute(raw_sql) # $ getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = conn.execute(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
result = conn.execute(text_sql) # $ getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = conn.execute(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# scalar
|
||||
scalar_result = conn.scalar(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = conn.scalar(object_=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
scalar_result = conn.scalar(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = conn.scalar(object_=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
|
||||
# exec_driver_sql
|
||||
result = conn.exec_driver_sql(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# construction by object
|
||||
conn = sqlalchemy.engine.base.Connection(engine)
|
||||
result = conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# branched connection
|
||||
branched_conn = conn.connect()
|
||||
result = branched_conn.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# raw connection
|
||||
raw_conn = conn.connection
|
||||
result = raw_conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
cursor = raw_conn.cursor()
|
||||
cursor.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert cursor.fetchall() == [("FOO",)]
|
||||
cursor.close()
|
||||
|
||||
raw_conn = engine.raw_connection()
|
||||
result = raw_conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# connection with custom execution options
|
||||
conn_with_custom_exe_opts = conn.execution_options(bar=1337)
|
||||
result = conn_with_custom_exe_opts.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# Session -- is what you use to work with the ORM layer
|
||||
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html
|
||||
# and https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
|
||||
|
||||
session = sqlalchemy.orm.Session(engine)
|
||||
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = session.execute(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
result = session.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = session.execute(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# scalar
|
||||
scalar_result = session.scalar(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = session.scalar(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
scalar_result = session.scalar(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = session.scalar(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
# other ways to construct a session
|
||||
with sqlalchemy.orm.Session(engine) as session:
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
Session = sqlalchemy.orm.sessionmaker(engine)
|
||||
session = Session()
|
||||
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
with Session() as session:
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
with Session.begin() as session:
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# Querying (1.4)
|
||||
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html#querying-1-x-style
|
||||
|
||||
# to do so we first need a model
|
||||
|
||||
class For14(Base):
|
||||
__tablename__ = "for14"
|
||||
|
||||
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
|
||||
description = sqlalchemy.Column(sqlalchemy.String)
|
||||
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
# add a test-entry
|
||||
test_entry = For14(id=14, description="test")
|
||||
session = sqlalchemy.orm.Session(engine)
|
||||
session.add(test_entry)
|
||||
session.commit()
|
||||
assert session.query(For14).all()[0].id == 14
|
||||
|
||||
# and now we can do the actual querying
|
||||
|
||||
text_foo = sqlalchemy.text("'FOO'")
|
||||
|
||||
# filter_by is only vulnerable to injection if sqlalchemy.text is used, which is evident
|
||||
# from the logs produced if this file is run
|
||||
# that is, first filter_by results in the SQL
|
||||
#
|
||||
# SELECT for14.id AS for14_id, for14.description AS for14_description
|
||||
# FROM for14
|
||||
# WHERE for14.description = ?
|
||||
#
|
||||
# which is then called with the argument `'FOO'`
|
||||
#
|
||||
# and the second filter_by results in the SQL
|
||||
#
|
||||
# SELECT for14.id AS for14_id, for14.description AS for14_description
|
||||
# FROM for14
|
||||
# WHERE for14.description = 'FOO'
|
||||
#
|
||||
# which is then called without any arguments
|
||||
assert session.query(For14).filter_by(description="'FOO'").all() == []
|
||||
query = session.query(For14).filter_by(description=text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() == []
|
||||
|
||||
# `filter` provides more general filtering
|
||||
# see https://docs.sqlalchemy.org/en/14/orm/tutorial.html#common-filter-operators
|
||||
# and https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.filter
|
||||
assert session.query(For14).filter(For14.description == "'FOO'").all() == []
|
||||
query = session.query(For14).filter(For14.description == text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() == []
|
||||
|
||||
assert session.query(For14).filter(For14.description.like("'FOO'")).all() == []
|
||||
query = session.query(For14).filter(For14.description.like(text_foo)) # $ MISSING: getSql=text_foo
|
||||
assert query.all() == []
|
||||
|
||||
# `where` is alias for `filter`
|
||||
assert session.query(For14).where(For14.description == "'FOO'").all() == []
|
||||
query = session.query(For14).where(For14.description == text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() == []
|
||||
|
||||
|
||||
# not possible to do SQL injection on `.get`
|
||||
try:
|
||||
session.query(For14).get(text_foo)
|
||||
except sqlalchemy.exc.InterfaceError:
|
||||
pass
|
||||
|
||||
# group_by
|
||||
assert session.query(For14).group_by(For14.description).all() != []
|
||||
query = session.query(For14).group_by(text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() != []
|
||||
|
||||
# having (only used in connection with group_by)
|
||||
assert session.query(For14).group_by(For14.description).having(
|
||||
sqlalchemy.func.count(For14.id) > 2
|
||||
).all() == []
|
||||
query = session.query(For14).group_by(For14.description).having(text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() == []
|
||||
|
||||
# order_by
|
||||
assert session.query(For14).order_by(For14.description).all() != []
|
||||
query = session.query(For14).order_by(text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.all() != []
|
||||
|
||||
# TODO: likewise, it should be possible to target the criterion for:
|
||||
# - `join` https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.join
|
||||
# - `outerjoin` https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.outerjoin
|
||||
|
||||
# specifying session later on
|
||||
# see https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.with_session
|
||||
query = sqlalchemy.orm.Query(For14).filter(For14.description == text_foo) # $ MISSING: getSql=text_foo
|
||||
assert query.with_session(session).all() == []
|
||||
|
||||
# ==============================================================================
|
||||
# v2.0
|
||||
# ==============================================================================
|
||||
import sqlalchemy.future
|
||||
|
||||
print("-"*80)
|
||||
print("v2.0 style")
|
||||
|
||||
# For Engine, see https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Engine
|
||||
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
|
||||
future_engine = sqlalchemy.future.create_engine("sqlite+pysqlite:///:memory:", echo=True)
|
||||
|
||||
# in 2.0 you are not allowed to execute things directly on the engine
|
||||
try:
|
||||
engine.execute(raw_sql)
|
||||
raise Exception("above not allowed in 2.0")
|
||||
except NotImplementedError:
|
||||
pass
|
||||
try:
|
||||
engine.execute(text_sql)
|
||||
raise Exception("above not allowed in 2.0")
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
|
||||
# `connect` returns a new Connection object.
|
||||
# see https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection
|
||||
print("v2.0 engine.connect")
|
||||
with engine.connect() as conn:
|
||||
conn: sqlalchemy.future.Connection
|
||||
|
||||
# in 2.0 you are not allowed to use raw strings like this:
|
||||
try:
|
||||
conn.execute(raw_sql) # $ SPURIOUS: getSql=raw_sql
|
||||
raise Exception("above not allowed in 2.0")
|
||||
except sqlalchemy.exc.ObjectNotExecutableError:
|
||||
pass
|
||||
|
||||
result = conn.execute(text_sql) # $ getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = conn.execute(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
result = conn.exec_driver_sql(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
raw_conn = conn.connection
|
||||
result = raw_conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# branching not allowed in 2.0
|
||||
try:
|
||||
branched_conn = conn.connect()
|
||||
raise Exception("above not allowed in 2.0")
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
# connection with custom execution options
|
||||
conn_with_custom_exe_opts = conn.execution_options(bar=1337)
|
||||
result = conn_with_custom_exe_opts.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# `scalar` is shorthand helper
|
||||
try:
|
||||
conn.scalar(raw_sql)
|
||||
except sqlalchemy.exc.ObjectNotExecutableError:
|
||||
pass
|
||||
scalar_result = conn.scalar(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = conn.scalar(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
# This is a contrived example
|
||||
select = sqlalchemy.select(sqlalchemy.text("'BAR'"))
|
||||
result = conn.execute(select) # $ getSql=select
|
||||
assert result.fetchall() == [("BAR",)]
|
||||
|
||||
# This is a contrived example
|
||||
select = sqlalchemy.select(sqlalchemy.literal_column("'BAZ'"))
|
||||
result = conn.execute(select) # $ getSql=select
|
||||
assert result.fetchall() == [("BAZ",)]
|
||||
|
||||
with future_engine.connect() as conn:
|
||||
result = conn.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# `begin` returns a new Connection object with a transaction begun.
|
||||
print("v2.0 engine.begin")
|
||||
with engine.begin() as conn:
|
||||
result = conn.execute(text_sql) # $ getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# construction by object
|
||||
conn = sqlalchemy.future.Connection(engine)
|
||||
result = conn.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# raw_connection
|
||||
|
||||
raw_conn = engine.raw_connection()
|
||||
result = raw_conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
cursor = raw_conn.cursor()
|
||||
cursor.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert cursor.fetchall() == [("FOO",)]
|
||||
cursor.close()
|
||||
|
||||
# Session (2.0)
|
||||
session = sqlalchemy.orm.Session(engine, future=True)
|
||||
|
||||
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = session.execute(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
result = session.execute(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
result = session.execute(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert result.fetchall() == [("FOO",)]
|
||||
|
||||
# scalar
|
||||
scalar_result = session.scalar(raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = session.scalar(statement=raw_sql) # $ MISSING: getSql=raw_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
scalar_result = session.scalar(text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
scalar_result = session.scalar(statement=text_sql) # $ MISSING: getSql=text_sql
|
||||
assert scalar_result == "FOO"
|
||||
|
||||
# Querying (2.0)
|
||||
|
||||
class For20(Base):
|
||||
__tablename__ = "for20"
|
||||
|
||||
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
|
||||
description = sqlalchemy.Column(sqlalchemy.String)
|
||||
|
||||
For20.metadata.create_all(engine)
|
||||
|
||||
# add a test-entry
|
||||
test_entry = For20(id=20, description="test")
|
||||
session = sqlalchemy.orm.Session(engine, future=True)
|
||||
session.add(test_entry)
|
||||
session.commit()
|
||||
assert session.query(For20).all()[0].id == 20
|
||||
|
||||
# and now we can do the actual querying
|
||||
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html#querying-2-0-style
|
||||
|
||||
statement = sqlalchemy.select(For20)
|
||||
result = session.execute(statement)
|
||||
assert result.scalars().all()[0].id == 20
|
||||
|
||||
statement = sqlalchemy.select(For20).where(For20.description == text_foo) # $ MISSING: getSql=text_foo
|
||||
result = session.execute(statement)
|
||||
assert result.scalars().all() == []
|
||||
Reference in New Issue
Block a user