Python: Add query to handle SQLAlchemy TextClause Injection

instead of doing this via taint-steps. See description in code/tests.
This commit is contained in:
Rasmus Wriedt Larsen
2021-09-02 10:13:12 +02:00
parent 81dbe36e99
commit c34d6d1162
15 changed files with 391 additions and 127 deletions

View File

@@ -1,3 +1,2 @@
import python
import experimental.meta.ConceptsTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -1,2 +1 @@
import experimental.meta.InlineTaintTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -47,10 +47,10 @@ with engine.begin() as connection:
# Injection requiring the text() taint-step
t = text("some sql")
session.query(User).filter(t) # $ getSql=t
session.query(User).group_by(User.id).having(t) # $ getSql=User.id MISSING: getSql=t
session.query(User).group_by(t).first() # $ getSql=t
session.query(User).order_by(t).first() # $ getSql=t
session.query(User).filter(t)
session.query(User).group_by(User.id).having(t)
session.query(User).group_by(t).first()
session.query(User).order_by(t).first()
query = select(User).where(User.name == t) # $ MISSING: getSql=t
with engine.connect() as conn:

View File

@@ -189,57 +189,41 @@ text_foo = sqlalchemy.text("'FOO'")
#
# which is then called without any arguments
assert session.query(For14).filter_by(description="'FOO'").all() == []
query = session.query(For14).filter_by(description=text_foo) # $ MISSING: getSql=text_foo
query = session.query(For14).filter_by(description=text_foo)
assert query.all() == []
# Initially I wanted to add lots of additional taint steps such that the normal SQL
# injection query would find these cases where an ORM query includes a TextClause that
# includes user-input directly... But that presented 2 problems:
#
# - which part of the query construction above should be marked as SQL to fit our
# `SqlExecution` concept. Nothing really fits this well, since all the SQL execution
# happens under the hood.
# - This would require a LOT of modeling for these additional taint steps, since there
# are many many constructs we would need to have models for. (see the 2 examples below)
#
# So instead we flag user-input to a TextClause with its' own query. And so we don't
# highlight any parts of an ORM constructed query such as these as containing SQL.
# `filter` provides more general filtering
# see https://docs.sqlalchemy.org/en/14/orm/tutorial.html#common-filter-operators
# and https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.filter
assert session.query(For14).filter(For14.description == "'FOO'").all() == []
query = session.query(For14).filter(For14.description == text_foo) # $ MISSING: getSql=text_foo
query = session.query(For14).filter(For14.description == text_foo)
assert query.all() == []
assert session.query(For14).filter(For14.description.like("'FOO'")).all() == []
query = session.query(For14).filter(For14.description.like(text_foo)) # $ MISSING: getSql=text_foo
query = session.query(For14).filter(For14.description.like(text_foo))
assert query.all() == []
# `where` is alias for `filter`
assert session.query(For14).where(For14.description == "'FOO'").all() == []
query = session.query(For14).where(For14.description == text_foo) # $ MISSING: getSql=text_foo
assert query.all() == []
# not possible to do SQL injection on `.get`
try:
session.query(For14).get(text_foo)
except sqlalchemy.exc.InterfaceError:
pass
# group_by
assert session.query(For14).group_by(For14.description).all() != []
query = session.query(For14).group_by(text_foo) # $ MISSING: getSql=text_foo
assert query.all() != []
# having (only used in connection with group_by)
assert session.query(For14).group_by(For14.description).having(
sqlalchemy.func.count(For14.id) > 2
).all() == []
query = session.query(For14).group_by(For14.description).having(text_foo) # $ MISSING: getSql=text_foo
assert query.all() == []
# order_by
assert session.query(For14).order_by(For14.description).all() != []
query = session.query(For14).order_by(text_foo) # $ MISSING: getSql=text_foo
assert query.all() != []
# TODO: likewise, it should be possible to target the criterion for:
# - `join` https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.join
# - `outerjoin` https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.outerjoin
# specifying session later on
# see https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.with_session
query = sqlalchemy.orm.Query(For14).filter(For14.description == text_foo) # $ MISSING: getSql=text_foo
assert query.with_session(session).all() == []
# There are many other possibilities for ending up with SQL injection, including the
# following (not an exhaustive list):
# - `where` (alias for `filter`)
# - `group_by`
# - `having`
# - `order_by`
# - `join`
# - `outerjoin`
# ==============================================================================
# v2.0
@@ -374,6 +358,8 @@ scalar_result = session.scalar(statement=text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
# Querying (2.0)
# uses a slightly different style than 1.4 -- see note about not modeling
# ORM query construction as SQL execution at the 1.4 query tests.
class For20(Base):
__tablename__ = "for20"
@@ -397,6 +383,6 @@ statement = sqlalchemy.select(For20)
result = session.execute(statement) # $ getSql=statement
assert result.scalars().all()[0].id == 20
statement = sqlalchemy.select(For20).where(For20.description == text_foo) # $ MISSING: getSql=text_foo
statement = sqlalchemy.select(For20).where(For20.description == text_foo)
result = session.execute(statement) # $ getSql=statement
assert result.scalars().all() == []

View File

@@ -1,12 +1,28 @@
import sqlalchemy
ensure_tainted = ensure_not_tainted = print
TAINTED_STRING = "TAINTED_STRING"
def test_taint():
ts = TAINTED_STRING
ensure_tainted(
ts, # $ tainted
sqlalchemy.text(ts), # $ tainted
sqlalchemy.sql.text(ts),# $ tainted
sqlalchemy.sql.expression.text(ts),# $ tainted
sqlalchemy.sql.expression.TextClause(ts),# $ tainted
)
ensure_tainted(ts) # $ tainted
t1 = sqlalchemy.text(ts)
t2 = sqlalchemy.text(text=ts)
t3 = sqlalchemy.sql.text(ts)
t4 = sqlalchemy.sql.text(text=ts)
t5 = sqlalchemy.sql.expression.text(ts)
t6 = sqlalchemy.sql.expression.text(text=ts)
t7 = sqlalchemy.sql.expression.TextClause(ts)
t8 = sqlalchemy.sql.expression.TextClause(text=ts)
# Since we flag user-input to a TextClause with its' own query, we don't want to
# have a taint-step for it as that would lead to us also giving an alert for normal
# SQL-injection... and double alerting like this does not seem desireable.
ensure_not_tainted(t1, t2, t3, t4, t5, t6, t7, t8)
for text in [t1, t2, t3, t4, t5, t6, t7, t8]:
assert isinstance(text, sqlalchemy.sql.expression.TextClause)
test_taint()

View File

@@ -0,0 +1,34 @@
edges
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:22:28:22:87 | ControlFlowNode for Attribute() |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:26:50:26:72 | ControlFlowNode for Attribute() |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:36:26:36:33 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:37:31:37:38 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:38:30:38:37 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:39:35:39:42 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:40:41:40:48 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:41:46:41:53 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:42:47:42:54 | ControlFlowNode for username |
| test.py:18:15:18:22 | ControlFlowNode for username | test.py:43:52:43:59 | ControlFlowNode for username |
nodes
| test.py:18:15:18:22 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:22:28:22:87 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:26:50:26:72 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:36:26:36:33 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:37:31:37:38 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:38:30:38:37 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:39:35:39:42 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:40:41:40:48 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:41:46:41:53 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:42:47:42:54 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:43:52:43:59 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
#select
| test.py:22:28:22:87 | ControlFlowNode for Attribute() | test.py:18:15:18:22 | ControlFlowNode for username | test.py:22:28:22:87 | ControlFlowNode for Attribute() | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:26:50:26:72 | ControlFlowNode for Attribute() | test.py:18:15:18:22 | ControlFlowNode for username | test.py:26:50:26:72 | ControlFlowNode for Attribute() | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:36:26:36:33 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:36:26:36:33 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:37:31:37:38 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:37:31:37:38 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:38:30:38:37 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:38:30:38:37 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:39:35:39:42 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:39:35:39:42 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:40:41:40:48 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:40:41:40:48 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:41:46:41:53 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:41:46:41:53 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:42:47:42:54 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:42:47:42:54 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |
| test.py:43:52:43:59 | ControlFlowNode for username | test.py:18:15:18:22 | ControlFlowNode for username | test.py:43:52:43:59 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:18:15:18:22 | ControlFlowNode for username | a user-provided value |

View File

@@ -0,0 +1 @@
Security/CWE-089/SQLAlchemyTextClauseInjection.ql

View File

@@ -0,0 +1,43 @@
from flask import Flask, request
import sqlalchemy
import sqlalchemy.orm
app = Flask(__name__)
engine = sqlalchemy.create_engine(...)
Base = sqlalchemy.orm.declarative_base()
class User(Base):
__tablename__ = "users"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
username = sqlalchemy.Column(sqlalchemy.String)
@app.route("/users/<username>")
def show_user(username):
session = sqlalchemy.orm.Session(engine)
# BAD, normal SQL injection
stmt = sqlalchemy.text("SELECT * FROM users WHERE username = '{}'".format(username))
results = session.execute(stmt).fetchall()
# BAD, allows SQL injection
username_formatted_for_sql = sqlalchemy.text("'{}'".format(username))
stmt = sqlalchemy.select(User).where(User.username == username_formatted_for_sql)
results = session.execute(stmt).scalars().all()
# GOOD, does not allow for SQL injection
stmt = sqlalchemy.select(User).where(User.username == username)
results = session.execute(stmt).scalars().all()
# All of these should be flagged by query
t1 = sqlalchemy.text(username)
t2 = sqlalchemy.text(text=username)
t3 = sqlalchemy.sql.text(username)
t4 = sqlalchemy.sql.text(text=username)
t5 = sqlalchemy.sql.expression.text(username)
t6 = sqlalchemy.sql.expression.text(text=username)
t7 = sqlalchemy.sql.expression.TextClause(username)
t8 = sqlalchemy.sql.expression.TextClause(text=username)