Merge pull request #6589 from RasmusWL/promote-sqlalchemy

Python: Promote modeling of SQLAlchemy
This commit is contained in:
yoff
2021-09-21 11:08:41 +02:00
committed by GitHub
31 changed files with 1186 additions and 170 deletions

View File

@@ -13,6 +13,7 @@ private import semmle.python.frameworks.Dill
private import semmle.python.frameworks.Django
private import semmle.python.frameworks.Fabric
private import semmle.python.frameworks.Flask
private import semmle.python.frameworks.FlaskSqlAlchemy
private import semmle.python.frameworks.Idna
private import semmle.python.frameworks.Invoke
private import semmle.python.frameworks.Jmespath
@@ -20,13 +21,14 @@ private import semmle.python.frameworks.MarkupSafe
private import semmle.python.frameworks.Multidict
private import semmle.python.frameworks.Mysql
private import semmle.python.frameworks.MySQLdb
private import semmle.python.frameworks.Peewee
private import semmle.python.frameworks.Psycopg2
private import semmle.python.frameworks.PyMySQL
private import semmle.python.frameworks.Rsa
private import semmle.python.frameworks.Simplejson
private import semmle.python.frameworks.SqlAlchemy
private import semmle.python.frameworks.Stdlib
private import semmle.python.frameworks.Tornado
private import semmle.python.frameworks.Peewee
private import semmle.python.frameworks.Twisted
private import semmle.python.frameworks.Ujson
private import semmle.python.frameworks.Yaml

View File

@@ -0,0 +1,56 @@
/**
* Provides classes modeling security-relevant aspects of the `Flask-SQLAlchemy` PyPI package
* (imported by `flask_sqlalchemy`).
* See
* - https://pypi.org/project/Flask-SQLAlchemy/
* - https://flask-sqlalchemy.palletsprojects.com/en/2.x/
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.ApiGraphs
private import semmle.python.Concepts
private import semmle.python.frameworks.SqlAlchemy
/**
* INTERNAL: Do not use.
*
* Provides models for the `Flask-SQLAlchemy` PyPI package (imported by `flask_sqlalchemy`).
* See
* - https://pypi.org/project/Flask-SQLAlchemy/
* - https://flask-sqlalchemy.palletsprojects.com/en/2.x/
*/
private module FlaskSqlAlchemy {
/** Gets an instance of `flask_sqlalchemy.SQLAlchemy` */
private API::Node dbInstance() {
result = API::moduleImport("flask_sqlalchemy").getMember("SQLAlchemy").getReturn()
}
/** A call to the `text` method on a DB. */
private class DbTextCall extends SqlAlchemy::TextClause::TextClauseConstruction {
DbTextCall() { this = dbInstance().getMember("text").getACall() }
}
/** Access on a DB resulting in an Engine */
private class DbEngine extends SqlAlchemy::Engine::InstanceSource {
DbEngine() {
this = dbInstance().getMember("engine").getAUse()
or
this = dbInstance().getMember("get_engine").getACall()
}
}
/** Access on a DB resulting in a Session */
private class DbSession extends SqlAlchemy::Session::InstanceSource {
DbSession() {
this = dbInstance().getMember("session").getAUse()
or
this = dbInstance().getMember("create_session").getReturn().getACall()
or
this = dbInstance().getMember("create_session").getReturn().getMember("begin").getACall()
or
this = dbInstance().getMember("create_scoped_session").getACall()
}
}
}

View File

@@ -0,0 +1,345 @@
/**
* Provides classes modeling security-relevant aspects of the `SQLAlchemy` PyPI package.
* See
* - https://pypi.org/project/SQLAlchemy/
* - https://docs.sqlalchemy.org/en/14/index.html
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.ApiGraphs
private import semmle.python.Concepts
// This import is done like this to avoid importing the deprecated top-level things that
// would pollute the namespace
private import semmle.python.frameworks.PEP249::PEP249 as PEP249
/**
* INTERNAL: Do not use.
*
* Provides models for the `SQLAlchemy` PyPI package.
* See
* - https://pypi.org/project/SQLAlchemy/
* - https://docs.sqlalchemy.org/en/14/index.html
*/
module SqlAlchemy {
/**
* Provides models for the `sqlalchemy.engine.Engine` and `sqlalchemy.future.Engine` classes.
*
* These are so similar that we model both in the same way.
*
* See
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine
* - https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Engine
*/
module Engine {
/** Gets a reference to a SQLAlchemy Engine class. */
private API::Node classRef() {
result = API::moduleImport("sqlalchemy").getMember("engine").getMember("Engine")
or
result = API::moduleImport("sqlalchemy").getMember("future").getMember("Engine")
}
/**
* A source of instances of a SQLAlchemy Engine, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `Engine::instance()` to get references to instances of a SQLAlchemy Engine.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
private class EngineConstruction extends InstanceSource, DataFlow::CallCfgNode {
EngineConstruction() {
this = classRef().getACall()
or
this = API::moduleImport("sqlalchemy").getMember("create_engine").getACall()
or
this =
API::moduleImport("sqlalchemy").getMember("future").getMember("create_engine").getACall()
or
this.(DataFlow::MethodCallNode).calls(instance(), "execution_options")
}
}
/** Gets a reference to an instance of a SQLAlchemy Engine. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of a SQLAlchemy Engine. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
}
/**
* Provides models for the `sqlalchemy.engine.base.Connection` and `sqlalchemy.future.Connection` classes.
*
* These are so similar that we model both in the same way.
*
* See
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection
* - https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection
*/
module Connection {
/** Gets a reference to a SQLAlchemy Connection class. */
private API::Node classRef() {
result =
API::moduleImport("sqlalchemy")
.getMember("engine")
.getMember("base")
.getMember("Connection")
or
result = API::moduleImport("sqlalchemy").getMember("future").getMember("Connection")
}
/**
* A source of instances of a SQLAlchemy Connection, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `Connection::instance()` to get references to instances of a SQLAlchemy Connection.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
private class ConnectionConstruction extends InstanceSource, DataFlow::CallCfgNode {
ConnectionConstruction() {
this = classRef().getACall()
or
this.(DataFlow::MethodCallNode).calls(Engine::instance(), ["begin", "connect"])
or
this.(DataFlow::MethodCallNode).calls(instance(), "connect")
or
this.(DataFlow::MethodCallNode).calls(instance(), "execution_options")
}
}
/** Gets a reference to an instance of a SQLAlchemy Connection. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of a SQLAlchemy Connection. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
}
/**
* Provides models for the underlying DB-API Connection of a SQLAlchemy Connection.
*
* See https://docs.sqlalchemy.org/en/14/core/connections.html#dbapi-connections.
*/
module DBAPIConnection {
/**
* A source of instances of DB-API Connections, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `DBAPIConnection::instance()` to get references to instances of DB-API Connections.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
private class DBAPIConnectionSources extends InstanceSource, PEP249::Connection::InstanceSource {
DBAPIConnectionSources() {
this.(DataFlow::MethodCallNode).calls(Engine::instance(), "raw_connection")
or
this.(DataFlow::AttrRead).accesses(Connection::instance(), "connection")
}
}
/** Gets a reference to an instance of DB-API Connections. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of DB-API Connections. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
}
/**
* Provides models for the `sqlalchemy.orm.Session` class
*
* See
* - https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
* - https://docs.sqlalchemy.org/en/14/orm/session_basics.html
*/
module Session {
/** Gets a reference to the `sqlalchemy.orm.Session` class. */
private API::Node classRef() {
result = API::moduleImport("sqlalchemy").getMember("orm").getMember("Session")
}
/**
* A source of instances of `sqlalchemy.orm.Session`, extend this class to model new instances.
*
* This can include instantiations of the class, return values from function
* calls, or a special parameter that will be set when functions are called by an external
* library.
*
* Use the predicate `Session::instance()` to get references to instances of `sqlalchemy.orm.Session`.
*/
abstract class InstanceSource extends DataFlow::LocalSourceNode { }
private class SessionConstruction extends InstanceSource, DataFlow::CallCfgNode {
SessionConstruction() {
this = classRef().getACall()
or
this =
API::moduleImport("sqlalchemy")
.getMember("orm")
.getMember("sessionmaker")
.getReturn()
.getACall()
or
this =
API::moduleImport("sqlalchemy")
.getMember("orm")
.getMember("sessionmaker")
.getReturn()
.getMember("begin")
.getACall()
}
}
/** Gets a reference to an instance of `sqlalchemy.orm.Session`. */
private DataFlow::TypeTrackingNode instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of `sqlalchemy.orm.Session`. */
DataFlow::Node instance() { instance(DataFlow::TypeTracker::end()).flowsTo(result) }
}
/**
* A call to `execute` on a SQLAlchemy Engine, Connection, or Session.
* See
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine.execute
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.execute
* - https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection.execute
* - https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session.execute
*/
private class SqlAlchemyExecuteCall extends DataFlow::MethodCallNode, SqlExecution::Range {
SqlAlchemyExecuteCall() {
this.calls(Engine::instance(), "execute")
or
this.calls(Connection::instance(), "execute")
or
this.calls(Session::instance(), "execute")
}
override DataFlow::Node getSql() { result in [this.getArg(0), this.getArgByName("statement")] }
}
/**
* A call to `exec_driver_sql` on a SQLAlchemy Connection.
* See
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.exec_driver_sql
* - https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection.exec_driver_sql
*/
private class SqlAlchemyExecDriverSqlCall extends DataFlow::MethodCallNode, SqlExecution::Range {
SqlAlchemyExecDriverSqlCall() { this.calls(Connection::instance(), "exec_driver_sql") }
override DataFlow::Node getSql() { result in [this.getArg(0), this.getArgByName("statement")] }
}
/**
* A call to `scalar` on a SQLAlchemy Engine, Connection, or Session.
* See
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine.scalar
* - https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection.scalar
* - https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection.scalar
* - https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session.scalar
*/
private class SqlAlchemyScalarCall extends DataFlow::MethodCallNode, SqlExecution::Range {
SqlAlchemyScalarCall() {
this.calls(Engine::instance(), "scalar")
or
this.calls(Connection::instance(), "scalar")
or
this.calls(Session::instance(), "scalar")
}
override DataFlow::Node getSql() {
result in [this.getArg(0), this.getArgByName("statement"), this.getArgByName("object_")]
}
}
/**
* Provides models for the `sqlalchemy.sql.expression.TextClause` class,
* which represents a textual SQL string directly.
*
* ```py
* session.query(For14).filter_by(description=sqlalchemy.text(f"'{user_input}'")).all()
* ```
*
* Initially I wanted to add lots of additional taint steps for such that the normal
* SQL injection query would be able to find cases as the one above where an ORM query
* includes a TextClause that includes user-input directly... But that presented 2
* problems:
*
* - which part of the query construction above should be marked as SQL to fit our
* `SqlExecution` concept. Nothing really fits this well, since all the SQL
* execution happens under the hood.
* - This would require a LOT of modeling for these additional taint steps, since
* there are many many constructs we would need to have models for. (see the 2
* examples below)
*
* So instead we flag user-input to a TextClause with its' own query
* (`py/sqlalchemy-textclause-injection`). And so we don't highlight any parts of an
* ORM constructed query such as these as containing SQL, and don't need the additional
* taint steps either.
*
* See
* - https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.TextClause.
* - https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.text
*/
module TextClause {
/**
* A construction of a `sqlalchemy.sql.expression.TextClause`, which represents a
* textual SQL string directly.
*/
abstract class TextClauseConstruction extends DataFlow::CallCfgNode {
/** Gets the argument that specifies the SQL text. */
DataFlow::Node getTextArg() { result in [this.getArg(0), this.getArgByName("text")] }
}
/** `TextClause` constructions from the `sqlalchemy` package. */
private class DefaultTextClauseConstruction extends TextClauseConstruction {
DefaultTextClauseConstruction() {
this = API::moduleImport("sqlalchemy").getMember("text").getACall()
or
this = API::moduleImport("sqlalchemy").getMember("sql").getMember("text").getACall()
or
this =
API::moduleImport("sqlalchemy")
.getMember("sql")
.getMember("expression")
.getMember("text")
.getACall()
or
this =
API::moduleImport("sqlalchemy")
.getMember("sql")
.getMember("expression")
.getMember("TextClause")
.getACall()
}
}
}
}

View File

@@ -0,0 +1,35 @@
/**
* Provides a taint-tracking configuration for detecting "SQLAlchemy TextClause injection" vulnerabilities.
*
* Note, for performance reasons: only import this file if
* `SQLAlchemyTextClause::Configuration` is needed, otherwise
* `SQLAlchemyTextClauseCustomizations` should be imported instead.
*/
private import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
/**
* Provides a taint-tracking configuration for detecting "SQLAlchemy TextClause injection" vulnerabilities.
*/
module SQLAlchemyTextClause {
import SQLAlchemyTextClauseCustomizations::SQLAlchemyTextClause
/**
* A taint-tracking configuration for detecting "SQLAlchemy TextClause injection" vulnerabilities.
*/
class Configuration extends TaintTracking::Configuration {
Configuration() { this = "SQLAlchemyTextClause" }
override predicate isSource(DataFlow::Node source) { source instanceof Source }
override predicate isSink(DataFlow::Node sink) { sink instanceof Sink }
override predicate isSanitizer(DataFlow::Node node) { node instanceof Sanitizer }
override predicate isSanitizerGuard(DataFlow::BarrierGuard guard) {
guard instanceof SanitizerGuard
}
}
}

View File

@@ -0,0 +1,56 @@
/**
* Provides default sources, sinks and sanitizers for detecting
* "SQLAlchemy TextClause injection"
* vulnerabilities, as well as extension points for adding your own.
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.Concepts
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.dataflow.new.BarrierGuards
private import semmle.python.frameworks.SqlAlchemy
/**
* Provides default sources, sinks and sanitizers for detecting
* "SQLAlchemy TextClause injection"
* vulnerabilities, as well as extension points for adding your own.
*/
module SQLAlchemyTextClause {
/**
* A data flow source for "SQLAlchemy TextClause injection" vulnerabilities.
*/
abstract class Source extends DataFlow::Node { }
/**
* A data flow sink for "SQLAlchemy TextClause injection" vulnerabilities.
*/
abstract class Sink extends DataFlow::Node { }
/**
* A sanitizer for "SQLAlchemy TextClause injection" vulnerabilities.
*/
abstract class Sanitizer extends DataFlow::Node { }
/**
* A sanitizer guard for "SQLAlchemy TextClause injection" vulnerabilities.
*/
abstract class SanitizerGuard extends DataFlow::BarrierGuard { }
/**
* A source of remote user input, considered as a flow source.
*/
class RemoteFlowSourceAsSource extends Source, RemoteFlowSource { }
/**
* The text argument of a SQLAlchemy TextClause construction, considered as a flow sink.
*/
class TextArgAsSink extends Sink {
TextArgAsSink() { this = any(SqlAlchemy::TextClause::TextClauseConstruction tcc).getTextArg() }
}
/**
* A comparison with a constant string, considered as a sanitizer-guard.
*/
class StringConstCompareAsSanitizerGuard extends SanitizerGuard, StringConstCompare { }
}

View File

@@ -0,0 +1,53 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>
The <code>TextClause</code> class in the <code>SQLAlchemy</code> PyPI package represents
a textual SQL string directly. If user-input is added to it without sufficient
sanitization, a user may be able to run malicious database queries, since the
<code>TextClause</code> is inserted directly into the final SQL.
</p>
</overview>
<recommendation>
<p>
Don't allow user-input to be added to a <code>TextClause</code>, instead construct your
full query with constructs from the ORM, or use query parameters for user-input.
</p>
</recommendation>
<example>
<p>
In the following snippet, a user is fetched from the database using three
different queries.
</p>
<p>
In the first case, the final query string is built by directly including a user-supplied
input. The parameter may include quote characters, so this code is vulnerable to a SQL
injection attack.
</p>
<p>
In the second case, the query is built using ORM models, but part of it is using a
<code>TextClause</code> directly including a user-supplied input. Since the
<code>TextClause</code> is inserted directly into the final SQL, this code is vulnerable
to a SQL injection attack.
</p>
<p>
In the third case, the query is built fully using the ORM models, so in the end, the
user-supplied input will be passed to the database using query parameters. The
database connector library will take care of escaping and inserting quotes as needed.
</p>
<sample src="examples/sqlalchemy_textclause_injection.py" />
</example>
<references>
<li><a href="https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.text.params.text">Official documentation of the text parameter</a>.</li>
</references>
</qhelp>

View File

@@ -0,0 +1,23 @@
/**
* @name SQLAlchemy TextClause built from user-controlled sources
* @description Building a TextClause query from user-controlled sources is vulnerable to insertion of
* malicious SQL code by the user.
* @kind path-problem
* @problem.severity error
* @security-severity 8.8
* @precision high
* @id py/sqlalchemy-textclause-injection
* @tags security
* external/cwe/cwe-089
* external/owasp/owasp-a1
*/
import python
import semmle.python.security.dataflow.SQLAlchemyTextClause
import DataFlow::PathGraph
from SQLAlchemyTextClause::Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink,
"This SQLAlchemy TextClause depends on $@, which could lead to SQL injection.", source.getNode(),
"a user-provided value"

View File

@@ -0,0 +1,34 @@
from flask import Flask, request
import sqlalchemy
import sqlalchemy.orm
app = Flask(__name__)
engine = sqlalchemy.create_engine(...)
Base = sqlalchemy.orm.declarative_base()
class User(Base):
__tablename__ = "users"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
username = sqlalchemy.Column(sqlalchemy.String)
@app.route("/users/<username>")
def show_user(username):
session = sqlalchemy.orm.Session(engine)
# BAD, normal SQL injection
stmt = sqlalchemy.text("SELECT * FROM users WHERE username = '{}'".format(username))
results = session.execute(stmt).fetchall()
# BAD, allows SQL injection
username_formatted_for_sql = sqlalchemy.text("'{}'".format(username))
stmt = sqlalchemy.select(User).where(User.username == username_formatted_for_sql)
results = session.execute(stmt).scalars().all()
# GOOD, does not allow for SQL injection
stmt = sqlalchemy.select(User).where(User.username == username)
results = session.execute(stmt).scalars().all()
...

View File

@@ -1,148 +0,0 @@
/**
* Provides classes modeling security-relevant aspects of the 'SqlAlchemy' package.
* See https://pypi.org/project/SQLAlchemy/.
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.ApiGraphs
private import semmle.python.Concepts
private import experimental.semmle.python.Concepts
private module SqlAlchemy {
/**
* Returns an instantization of a SqlAlchemy Session object.
* See https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session and
* https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.sessionmaker
*/
private API::Node getSqlAlchemySessionInstance() {
result = API::moduleImport("sqlalchemy.orm").getMember("Session").getReturn() or
result = API::moduleImport("sqlalchemy.orm").getMember("sessionmaker").getReturn().getReturn()
}
/**
* Returns an instantization of a SqlAlchemy Engine object.
* See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine
*/
private API::Node getSqlAlchemyEngineInstance() {
result = API::moduleImport("sqlalchemy").getMember("create_engine").getReturn()
}
/**
* Returns an instantization of a SqlAlchemy Query object.
* See https://docs.sqlalchemy.org/en/14/orm/query.html?highlight=query#sqlalchemy.orm.Query
*/
private API::Node getSqlAlchemyQueryInstance() {
result = getSqlAlchemySessionInstance().getMember("query").getReturn()
}
/**
* A call to `execute` meant to execute an SQL expression
* See the following links:
* - https://docs.sqlalchemy.org/en/14/core/connections.html?highlight=execute#sqlalchemy.engine.Connection.execute
* - https://docs.sqlalchemy.org/en/14/core/connections.html?highlight=execute#sqlalchemy.engine.Engine.execute
* - https://docs.sqlalchemy.org/en/14/orm/session_api.html?highlight=execute#sqlalchemy.orm.Session.execute
*/
private class SqlAlchemyExecuteCall extends DataFlow::CallCfgNode, SqlExecution::Range {
SqlAlchemyExecuteCall() {
// new way
this = getSqlAlchemySessionInstance().getMember("execute").getACall() or
this =
getSqlAlchemyEngineInstance()
.getMember(["connect", "begin"])
.getReturn()
.getMember("execute")
.getACall()
}
override DataFlow::Node getSql() { result = this.getArg(0) }
}
/**
* A call to `scalar` meant to execute an SQL expression
* See https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session.scalar and
* https://docs.sqlalchemy.org/en/14/core/connections.html?highlight=scalar#sqlalchemy.engine.Engine.scalar
*/
private class SqlAlchemyScalarCall extends DataFlow::CallCfgNode, SqlExecution::Range {
SqlAlchemyScalarCall() {
this =
[getSqlAlchemySessionInstance(), getSqlAlchemyEngineInstance()]
.getMember("scalar")
.getACall()
}
override DataFlow::Node getSql() { result = this.getArg(0) }
}
/**
* A call on a Query object
* See https://docs.sqlalchemy.org/en/14/orm/query.html?highlight=query#sqlalchemy.orm.Query
*/
private class SqlAlchemyQueryCall extends DataFlow::CallCfgNode, SqlExecution::Range {
SqlAlchemyQueryCall() {
this =
getSqlAlchemyQueryInstance()
.getMember(any(SqlAlchemyVulnerableMethodNames methodName))
.getACall()
}
override DataFlow::Node getSql() { result = this.getArg(0) }
}
/**
* This class represents a list of methods vulnerable to sql injection.
*
* See https://github.com/jty-team/codeql/pull/2#issue-611592361
*/
private class SqlAlchemyVulnerableMethodNames extends string {
SqlAlchemyVulnerableMethodNames() { this in ["filter", "filter_by", "group_by", "order_by"] }
}
/**
* Additional taint-steps for `sqlalchemy.text()`
*
* See https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.text
* See https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.TextClause
*/
class SqlAlchemyTextAdditionalTaintSteps extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(DataFlow::CallCfgNode call |
(
call = API::moduleImport("sqlalchemy").getMember("text").getACall()
or
call = API::moduleImport("sqlalchemy").getMember("sql").getMember("text").getACall()
or
call =
API::moduleImport("sqlalchemy")
.getMember("sql")
.getMember("expression")
.getMember("text")
.getACall()
or
call =
API::moduleImport("sqlalchemy")
.getMember("sql")
.getMember("expression")
.getMember("TextClause")
.getACall()
) and
nodeFrom in [call.getArg(0), call.getArgByName("text")] and
nodeTo = call
)
}
}
/**
* Gets a reference to `sqlescapy.sqlescape`.
*
* See https://pypi.org/project/sqlescapy/
*/
class SQLEscapySanitizerCall extends DataFlow::CallCfgNode, SQLEscape::Range {
SQLEscapySanitizerCall() {
this = API::moduleImport("sqlescapy").getMember("sqlescape").getACall()
}
override DataFlow::Node getAnInput() { result = this.getArg(0) }
}
}

View File

@@ -1,3 +0,0 @@
import python
import experimental.meta.ConceptsTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -1,2 +0,0 @@
import experimental.meta.InlineTaintTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -1,12 +0,0 @@
import sqlalchemy
def test_taint():
ts = TAINTED_STRING
ensure_tainted(
ts, # $ tainted
sqlalchemy.text(ts), # $ tainted
sqlalchemy.sql.text(ts),# $ tainted
sqlalchemy.sql.expression.text(ts),# $ tainted
sqlalchemy.sql.expression.TextClause(ts),# $ tainted
)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -0,0 +1,51 @@
# pip install Flask-SQLAlchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import sqlalchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite+pysqlite:///:memory:"
db = SQLAlchemy(app)
# re-exports all things from `sqlalchemy` and `sqlalchemy.orm` under instances of `SQLAlchemy`
# see
# - https://github.com/pallets/flask-sqlalchemy/blob/931ec00d1e27f51508e05706eef41cc4419a0b32/src/flask_sqlalchemy/__init__.py#L765
# - https://github.com/pallets/flask-sqlalchemy/blob/931ec00d1e27f51508e05706eef41cc4419a0b32/src/flask_sqlalchemy/__init__.py#L99-L109
assert str(type(db.text("Foo"))) == "<class 'sqlalchemy.sql.elements.TextClause'>"
# also has engine/session instantiated
raw_sql = "SELECT 'Foo'"
conn = db.engine.connect()
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
conn = db.get_engine().connect()
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
result = db.session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
Session = db.create_session(options={})
session = Session()
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
Session = db.create_session(options={})
with Session.begin() as session:
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
result = db.create_scoped_session().execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
# text
t = db.text("foo")
assert isinstance(t, sqlalchemy.sql.expression.TextClause)
t = db.text(text="foo")
assert isinstance(t, sqlalchemy.sql.expression.TextClause)

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1 @@
import experimental.meta.InlineTaintTest

View File

@@ -47,10 +47,10 @@ with engine.begin() as connection:
# Injection requiring the text() taint-step
t = text("some sql")
session.query(User).filter(t) # $ getSql=t
session.query(User).group_by(User.id).having(t) # $ getSql=User.id MISSING: getSql=t
session.query(User).group_by(t).first() # $ getSql=t
session.query(User).order_by(t).first() # $ getSql=t
session.query(User).filter(t)
session.query(User).group_by(User.id).having(t)
session.query(User).group_by(t).first()
session.query(User).order_by(t).first()
query = select(User).where(User.name == t) # $ MISSING: getSql=t
with engine.connect() as conn:

View File

@@ -0,0 +1,388 @@
import sqlalchemy
import sqlalchemy.orm
# SQLAlchemy is slowly migrating to a 2.0 version, and as part of 1.4 release have a 2.0
# style (forwards compatible) API that _can_ be adopted. So these tests are marked with
# either v1.4 or v2.0, such that we cover both.
raw_sql = "select 'FOO'"
text_sql = sqlalchemy.text(raw_sql)
Base = sqlalchemy.orm.declarative_base()
# ==============================================================================
# v1.4
# ==============================================================================
print("v1.4")
# Engine see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:", echo=True)
result = engine.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = engine.execute(statement=raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = engine.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
scalar_result = engine.scalar(raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = engine.scalar(statement=raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
# engine with custom execution options
# see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine.execution_options
engine_with_custom_exe_opts = engine.execution_options(foo=42)
result = engine_with_custom_exe_opts.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
even_more_opts = engine_with_custom_exe_opts.execution_options(bar=43)
result = even_more_opts.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# Connection see https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Connection
conn = engine.connect()
conn: sqlalchemy.engine.base.Connection
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = conn.execute(statement=raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
result = conn.execute(statement=text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# scalar
scalar_result = conn.scalar(raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = conn.scalar(object_=raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = conn.scalar(text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
scalar_result = conn.scalar(object_=text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
# exec_driver_sql
result = conn.exec_driver_sql(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# construction by object
conn = sqlalchemy.engine.base.Connection(engine)
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# branched connection
branched_conn = conn.connect()
result = branched_conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# raw connection
raw_conn = conn.connection
result = raw_conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
cursor = raw_conn.cursor()
cursor.execute(raw_sql) # $ getSql=raw_sql
assert cursor.fetchall() == [("FOO",)]
cursor.close()
raw_conn = engine.raw_connection()
result = raw_conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# connection with custom execution options
conn_with_custom_exe_opts = conn.execution_options(bar=1337)
result = conn_with_custom_exe_opts.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# Session -- is what you use to work with the ORM layer
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html
# and https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
session = sqlalchemy.orm.Session(engine)
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(statement=raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(statement=text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# scalar
scalar_result = session.scalar(raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(statement=raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(statement=text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
# other ways to construct a session
with sqlalchemy.orm.Session(engine) as session:
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
Session = sqlalchemy.orm.sessionmaker(engine)
session = Session()
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
with Session() as session:
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
with Session.begin() as session:
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# Querying (1.4)
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html#querying-1-x-style
# to do so we first need a model
class For14(Base):
__tablename__ = "for14"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
description = sqlalchemy.Column(sqlalchemy.String)
Base.metadata.create_all(engine)
# add a test-entry
test_entry = For14(id=14, description="test")
session = sqlalchemy.orm.Session(engine)
session.add(test_entry)
session.commit()
assert session.query(For14).all()[0].id == 14
# and now we can do the actual querying
text_foo = sqlalchemy.text("'FOO'")
# filter_by is only vulnerable to injection if sqlalchemy.text is used, which is evident
# from the logs produced if this file is run
# that is, first filter_by results in the SQL
#
# SELECT for14.id AS for14_id, for14.description AS for14_description
# FROM for14
# WHERE for14.description = ?
#
# which is then called with the argument `'FOO'`
#
# and the second filter_by results in the SQL
#
# SELECT for14.id AS for14_id, for14.description AS for14_description
# FROM for14
# WHERE for14.description = 'FOO'
#
# which is then called without any arguments
assert session.query(For14).filter_by(description="'FOO'").all() == []
query = session.query(For14).filter_by(description=text_foo)
assert query.all() == []
# Initially I wanted to add lots of additional taint steps such that the normal SQL
# injection query would find these cases where an ORM query includes a TextClause that
# includes user-input directly... But that presented 2 problems:
#
# - which part of the query construction above should be marked as SQL to fit our
# `SqlExecution` concept. Nothing really fits this well, since all the SQL execution
# happens under the hood.
# - This would require a LOT of modeling for these additional taint steps, since there
# are many many constructs we would need to have models for. (see the 2 examples below)
#
# So instead we flag user-input to a TextClause with its' own query. And so we don't
# highlight any parts of an ORM constructed query such as these as containing SQL.
# `filter` provides more general filtering
# see https://docs.sqlalchemy.org/en/14/orm/tutorial.html#common-filter-operators
# and https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.filter
assert session.query(For14).filter(For14.description == "'FOO'").all() == []
query = session.query(For14).filter(For14.description == text_foo)
assert query.all() == []
assert session.query(For14).filter(For14.description.like("'FOO'")).all() == []
query = session.query(For14).filter(For14.description.like(text_foo))
assert query.all() == []
# There are many other possibilities for ending up with SQL injection, including the
# following (not an exhaustive list):
# - `where` (alias for `filter`)
# - `group_by`
# - `having`
# - `order_by`
# - `join`
# - `outerjoin`
# ==============================================================================
# v2.0
# ==============================================================================
import sqlalchemy.future
print("-"*80)
print("v2.0 style")
# For Engine, see https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Engine
engine = sqlalchemy.create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
future_engine = sqlalchemy.future.create_engine("sqlite+pysqlite:///:memory:", echo=True)
# in 2.0 you are not allowed to execute things directly on the engine
try:
engine.execute(raw_sql) # $ SPURIOUS: getSql=raw_sql
raise Exception("above not allowed in 2.0")
except NotImplementedError:
pass
try:
engine.execute(text_sql) # $ SPURIOUS: getSql=text_sql
raise Exception("above not allowed in 2.0")
except NotImplementedError:
pass
# `connect` returns a new Connection object.
# see https://docs.sqlalchemy.org/en/14/core/future.html#sqlalchemy.future.Connection
print("v2.0 engine.connect")
with engine.connect() as conn:
conn: sqlalchemy.future.Connection
# in 2.0 you are not allowed to use raw strings like this:
try:
conn.execute(raw_sql) # $ SPURIOUS: getSql=raw_sql
raise Exception("above not allowed in 2.0")
except sqlalchemy.exc.ObjectNotExecutableError:
pass
result = conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
result = conn.execute(statement=text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
result = conn.exec_driver_sql(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
raw_conn = conn.connection
result = raw_conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
# branching not allowed in 2.0
try:
branched_conn = conn.connect()
raise Exception("above not allowed in 2.0")
except NotImplementedError:
pass
# connection with custom execution options
conn_with_custom_exe_opts = conn.execution_options(bar=1337)
result = conn_with_custom_exe_opts.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# `scalar` is shorthand helper
try:
conn.scalar(raw_sql) # $ SPURIOUS: getSql=raw_sql
except sqlalchemy.exc.ObjectNotExecutableError:
pass
scalar_result = conn.scalar(text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
scalar_result = conn.scalar(statement=text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
# This is a contrived example
select = sqlalchemy.select(sqlalchemy.text("'BAR'"))
result = conn.execute(select) # $ getSql=select
assert result.fetchall() == [("BAR",)]
# This is a contrived example
select = sqlalchemy.select(sqlalchemy.literal_column("'BAZ'"))
result = conn.execute(select) # $ getSql=select
assert result.fetchall() == [("BAZ",)]
with future_engine.connect() as conn:
result = conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# `begin` returns a new Connection object with a transaction begun.
print("v2.0 engine.begin")
with engine.begin() as conn:
result = conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# construction by object
conn = sqlalchemy.future.Connection(engine)
result = conn.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# raw_connection
raw_conn = engine.raw_connection()
result = raw_conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
cursor = raw_conn.cursor()
cursor.execute(raw_sql) # $ getSql=raw_sql
assert cursor.fetchall() == [("FOO",)]
cursor.close()
# Session (2.0)
session = sqlalchemy.orm.Session(engine, future=True)
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(statement=raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
result = session.execute(statement=text_sql) # $ getSql=text_sql
assert result.fetchall() == [("FOO",)]
# scalar
scalar_result = session.scalar(raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(statement=raw_sql) # $ getSql=raw_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
scalar_result = session.scalar(statement=text_sql) # $ getSql=text_sql
assert scalar_result == "FOO"
# Querying (2.0)
# uses a slightly different style than 1.4 -- see note about not modeling
# ORM query construction as SQL execution at the 1.4 query tests.
class For20(Base):
__tablename__ = "for20"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
description = sqlalchemy.Column(sqlalchemy.String)
For20.metadata.create_all(engine)
# add a test-entry
test_entry = For20(id=20, description="test")
session = sqlalchemy.orm.Session(engine, future=True)
session.add(test_entry)
session.commit()
assert session.query(For20).all()[0].id == 20
# and now we can do the actual querying
# see https://docs.sqlalchemy.org/en/14/orm/session_basics.html#querying-2-0-style
statement = sqlalchemy.select(For20)
result = session.execute(statement) # $ getSql=statement
assert result.scalars().all()[0].id == 20
statement = sqlalchemy.select(For20).where(For20.description == text_foo)
result = session.execute(statement) # $ getSql=statement
assert result.scalars().all() == []

View File

@@ -0,0 +1,28 @@
import sqlalchemy
ensure_tainted = ensure_not_tainted = print
TAINTED_STRING = "TAINTED_STRING"
def test_taint():
ts = TAINTED_STRING
ensure_tainted(ts) # $ tainted
t1 = sqlalchemy.text(ts)
t2 = sqlalchemy.text(text=ts)
t3 = sqlalchemy.sql.text(ts)
t4 = sqlalchemy.sql.text(text=ts)
t5 = sqlalchemy.sql.expression.text(ts)
t6 = sqlalchemy.sql.expression.text(text=ts)
t7 = sqlalchemy.sql.expression.TextClause(ts)
t8 = sqlalchemy.sql.expression.TextClause(text=ts)
# Since we flag user-input to a TextClause with its' own query, we don't want to
# have a taint-step for it as that would lead to us also giving an alert for normal
# SQL-injection... and double alerting like this does not seem desireable.
ensure_not_tainted(t1, t2, t3, t4, t5, t6, t7, t8)
for text in [t1, t2, t3, t4, t5, t6, t7, t8]:
assert isinstance(text, sqlalchemy.sql.expression.TextClause)
test_taint()

View File

@@ -0,0 +1,41 @@
edges
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:27:28:27:87 | ControlFlowNode for Attribute() |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:31:50:31:72 | ControlFlowNode for Attribute() |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:41:26:41:33 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:42:31:42:38 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:43:30:43:37 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:44:35:44:42 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:45:41:45:48 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:46:46:46:53 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:47:47:47:54 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:48:52:48:59 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:50:18:50:25 | ControlFlowNode for username |
| test.py:23:15:23:22 | ControlFlowNode for username | test.py:51:24:51:31 | ControlFlowNode for username |
nodes
| test.py:23:15:23:22 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:27:28:27:87 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:31:50:31:72 | ControlFlowNode for Attribute() | semmle.label | ControlFlowNode for Attribute() |
| test.py:41:26:41:33 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:42:31:42:38 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:43:30:43:37 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:44:35:44:42 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:45:41:45:48 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:46:46:46:53 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:47:47:47:54 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:48:52:48:59 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:50:18:50:25 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
| test.py:51:24:51:31 | ControlFlowNode for username | semmle.label | ControlFlowNode for username |
subpaths
#select
| test.py:27:28:27:87 | ControlFlowNode for Attribute() | test.py:23:15:23:22 | ControlFlowNode for username | test.py:27:28:27:87 | ControlFlowNode for Attribute() | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:31:50:31:72 | ControlFlowNode for Attribute() | test.py:23:15:23:22 | ControlFlowNode for username | test.py:31:50:31:72 | ControlFlowNode for Attribute() | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:41:26:41:33 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:41:26:41:33 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:42:31:42:38 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:42:31:42:38 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:43:30:43:37 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:43:30:43:37 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:44:35:44:42 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:44:35:44:42 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:45:41:45:48 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:45:41:45:48 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:46:46:46:53 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:46:46:46:53 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:47:47:47:54 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:47:47:47:54 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:48:52:48:59 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:48:52:48:59 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:50:18:50:25 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:50:18:50:25 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |
| test.py:51:24:51:31 | ControlFlowNode for username | test.py:23:15:23:22 | ControlFlowNode for username | test.py:51:24:51:31 | ControlFlowNode for username | This SQLAlchemy TextClause depends on $@, which could lead to SQL injection. | test.py:23:15:23:22 | ControlFlowNode for username | a user-provided value |

View File

@@ -0,0 +1 @@
Security/CWE-089/SQLAlchemyTextClauseInjection.ql

View File

@@ -0,0 +1,51 @@
from flask import Flask, request
import sqlalchemy
import sqlalchemy.orm
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
engine = sqlalchemy.create_engine(...)
Base = sqlalchemy.orm.declarative_base()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite+pysqlite:///:memory:"
db = SQLAlchemy(app)
class User(Base):
__tablename__ = "users"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
username = sqlalchemy.Column(sqlalchemy.String)
@app.route("/users/<username>")
def show_user(username):
session = sqlalchemy.orm.Session(engine)
# BAD, normal SQL injection
stmt = sqlalchemy.text("SELECT * FROM users WHERE username = '{}'".format(username))
results = session.execute(stmt).fetchall()
# BAD, allows SQL injection
username_formatted_for_sql = sqlalchemy.text("'{}'".format(username))
stmt = sqlalchemy.select(User).where(User.username == username_formatted_for_sql)
results = session.execute(stmt).scalars().all()
# GOOD, does not allow for SQL injection
stmt = sqlalchemy.select(User).where(User.username == username)
results = session.execute(stmt).scalars().all()
# All of these should be flagged by query
t1 = sqlalchemy.text(username)
t2 = sqlalchemy.text(text=username)
t3 = sqlalchemy.sql.text(username)
t4 = sqlalchemy.sql.text(text=username)
t5 = sqlalchemy.sql.expression.text(username)
t6 = sqlalchemy.sql.expression.text(text=username)
t7 = sqlalchemy.sql.expression.TextClause(username)
t8 = sqlalchemy.sql.expression.TextClause(text=username)
t9 = db.text(username)
t10 = db.text(text=username)