Python: Add django sink with concept test

This commit is contained in:
Rasmus Lerchedahl Petersen
2020-10-19 21:34:55 +02:00
parent 646ced2a1d
commit d7308bddf2
4 changed files with 115 additions and 1 deletions

View File

@@ -7,4 +7,98 @@ private import experimental.dataflow.DataFlow
private import experimental.dataflow.RemoteFlowSources
private import experimental.semmle.python.Concepts
private module Django { }
/**
* Provides models for the `django` PyPI package.
* See https://django.palletsprojects.com/en/1.1.x/.
*/
private module Django {
/** Gets a reference to the `django` module. */
private DataFlow::Node django(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("django")
or
exists(DataFlow::TypeTracker t2 | result = django(t2).track(t2, t))
}
/** Gets a reference to the `django` module. */
DataFlow::Node django() { result = django(DataFlow::TypeTracker::end()) }
/** Provides models for the `django` module. */
module django {
/** Gets a reference to the `django.db` module. */
private DataFlow::Node db(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("django.db")
or
t.startInAttr("db") and
result = django()
or
exists(DataFlow::TypeTracker t2 | result = db(t2).track(t2, t))
}
/** Gets a reference to the `django.db` module. */
DataFlow::Node db() { result = db(DataFlow::TypeTracker::end()) }
/** Provides models for the `django.db` module. */
module db {
/** Gets a reference to the `django.db.connection` object. */
private DataFlow::Node connection(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("django.db.connection")
or
t.startInAttr("connection") and
result = db()
or
exists(DataFlow::TypeTracker t2 | result = connection(t2).track(t2, t))
}
/** Gets a reference to the `django.db.connection` object. */
DataFlow::Node connection() { result = connection(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `django.db.connection.cursor` class. */
private DataFlow::Node classCursor(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("django.db.connection.cursor")
or
t.startInAttr("cursor") and
result = connection()
or
exists(DataFlow::TypeTracker t2 | result = classCursor(t2).track(t2, t))
}
/** Gets a reference to the `django.db.connection.cursor` class. */
DataFlow::Node classCursor() { result = classCursor(DataFlow::TypeTracker::end()) }
/** Gets a reference to an instance of `django.db.connection.cursor`. */
private DataFlow::Node cursor(DataFlow::TypeTracker t) {
t.start() and
result.asCfgNode().(CallNode).getFunction() = classCursor().asCfgNode()
or
exists(DataFlow::TypeTracker t2 | result = cursor(t2).track(t2, t))
}
/** Gets a reference to an instance of `django.db.connection.cursor`. */
DataFlow::Node cursor() { result = cursor(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `django.db.connection.cursor.execute` function. */
private DataFlow::Node execute(DataFlow::TypeTracker t) {
t.startInAttr("execute") and
result = cursor()
or
exists(DataFlow::TypeTracker t2 | result = execute(t2).track(t2, t))
}
/** Gets a reference to the `django.db.connection.cursor.execute` function. */
DataFlow::Node execute() { result = execute(DataFlow::TypeTracker::end()) }
}
}
/** A call to the `django.db.connection.cursor.execute` function. */
private class DbConnectionExecute extends SqlExecution::Range, DataFlow::CfgNode {
override CallNode node;
DbConnectionExecute() { node.getFunction() = django::db::execute().asCfgNode() }
override DataFlow::Node getSql() { result.asCfgNode() = node.getArg(0) }
}
}

View File

@@ -0,0 +1,2 @@
import python
import experimental.meta.ConceptsTest

View File

@@ -0,0 +1,18 @@
from django.db import connection, models
def test_plain(username):
# GOOD -- Using parameters
connection.cursor().execute("SELECT * FROM users WHERE username = %s", username) # $getSql="SELECT * FROM users WHERE username = %s"
# BAD -- Using string formatting
connection.cursor().execute("SELECT * FROM users WHERE username = '%s'" % username) # $getSql=BinaryExpr
def test_context(username):
with connection.cursor() as cursor:
# GOOD -- Using parameters
cursor.execute("SELECT * FROM users WHERE username = %s", username) # $getSql="SELECT * FROM users WHERE username = %s"
# BAD -- Using string formatting
cursor.execute("SELECT * FROM users WHERE username = '%s'" % username) # $getSql=BinaryExpr