Python: Add modeling of Flask-SQLAlchemy

This commit is contained in:
Rasmus Wriedt Larsen
2021-09-02 10:48:24 +02:00
parent f1744890b1
commit d55f18f8e3
6 changed files with 73 additions and 11 deletions

View File

@@ -0,0 +1,2 @@
lgtm,codescanning
* Added modeling of SQL execution in the `Flask-SQLAlchemy` PyPI package, resulting in additional sinks for the SQL Injection query (`py/sql-injection`).

View File

@@ -13,6 +13,7 @@ private import semmle.python.frameworks.Dill
private import semmle.python.frameworks.Django
private import semmle.python.frameworks.Fabric
private import semmle.python.frameworks.Flask
private import semmle.python.frameworks.FlaskSqlAlchemy
private import semmle.python.frameworks.Idna
private import semmle.python.frameworks.Invoke
private import semmle.python.frameworks.Jmespath

View File

@@ -0,0 +1,56 @@
/**
* Provides classes modeling security-relevant aspects of the `Flask-SQLAlchemy` PyPI package
* (imported by `flask_sqlalchemy`).
* See
* - https://pypi.org/project/Flask-SQLAlchemy/
* - https://flask-sqlalchemy.palletsprojects.com/en/2.x/
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.ApiGraphs
private import semmle.python.Concepts
private import semmle.python.frameworks.SqlAlchemy
/**
* INTERNAL: Do not use.
*
* Provides models for the `Flask-SQLAlchemy` PyPI package (imported by `flask_sqlalchemy`).
* See
* - https://pypi.org/project/Flask-SQLAlchemy/
* - https://flask-sqlalchemy.palletsprojects.com/en/2.x/
*/
private module FlaskSqlAlchemy {
/** Gets an instance of `flask_sqlalchemy.SQLAlchemy` */
private API::Node dbInstance() {
result = API::moduleImport("flask_sqlalchemy").getMember("SQLAlchemy").getReturn()
}
/** A call to the `text` method on a DB. */
private class DbTextCall extends SqlAlchemy::TextClause::TextClauseConstruction {
DbTextCall() { this = dbInstance().getMember("text").getACall() }
}
/** Access on a DB resulting in an Engine */
private class DbEngine extends SqlAlchemy::Engine::InstanceSource {
DbEngine() {
this = dbInstance().getMember("engine").getAUse()
or
this = dbInstance().getMember("get_engine").getACall()
}
}
/** Access on a DB resulting in a Session */
private class DbSession extends SqlAlchemy::Session::InstanceSource {
DbSession() {
this = dbInstance().getMember("session").getAUse()
or
this = dbInstance().getMember("create_session").getReturn().getACall()
or
this = dbInstance().getMember("create_session").getReturn().getMember("begin").getACall()
or
this = dbInstance().getMember("create_scoped_session").getACall()
}
}
}

View File

@@ -314,8 +314,13 @@ module SqlAlchemy {
* A construction of a `sqlalchemy.sql.expression.TextClause`, which represents a
* textual SQL string directly.
*/
class TextClauseConstruction extends DataFlow::CallCfgNode {
TextClauseConstruction() {
abstract class TextClauseConstruction extends DataFlow::CallCfgNode {
/** Gets the argument that specifies the SQL text. */
DataFlow::Node getTextArg() { result in [this.getArg(0), this.getArgByName("text")] }
}
class DefaultTextClauseConstruction extends TextClauseConstruction {
DefaultTextClauseConstruction() {
this = API::moduleImport("sqlalchemy").getMember("text").getACall()
or
this = API::moduleImport("sqlalchemy").getMember("sql").getMember("text").getACall()
@@ -334,9 +339,6 @@ module SqlAlchemy {
.getMember("TextClause")
.getACall()
}
/** Gets the argument that specifies the SQL text. */
DataFlow::Node getTextArg() { result in [this.getArg(0), this.getArgByName("text")] }
}
}
}

View File

@@ -19,27 +19,27 @@ assert str(type(db.text("Foo"))) == "<class 'sqlalchemy.sql.elements.TextClause'
raw_sql = "SELECT 'Foo'"
conn = db.engine.connect()
result = conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
conn = db.get_engine().connect()
result = conn.execute(raw_sql) # $ MISSING: getSql=raw_sql
result = conn.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
result = db.session.execute(raw_sql) # $ MISSING: getSql=raw_sql
result = db.session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
Session = db.create_session(options={})
session = Session()
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
Session = db.create_session(options={})
with Session.begin() as session:
result = session.execute(raw_sql) # $ MISSING: getSql=raw_sql
result = session.execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]
result = db.create_scoped_session().execute(raw_sql) # $ MISSING: getSql=raw_sql
result = db.create_scoped_session().execute(raw_sql) # $ getSql=raw_sql
assert result.fetchall() == [("Foo",)]