Python: Add inline query test

This commit is contained in:
Rasmus Lerchedahl Petersen
2023-09-07 10:22:30 +02:00
parent 19046ea417
commit bf8bfd91cd
9 changed files with 23 additions and 16 deletions

View File

@@ -0,0 +1,3 @@
failures
missingAnnotationOnSink
testFailures

View File

@@ -0,0 +1,4 @@
import python
import experimental.dataflow.TestUtil.DataflowQueryTest
import semmle.python.security.dataflow.NoSQLInjectionQuery
import FromTaintTrackingStateConfig<Config>

View File

@@ -19,7 +19,7 @@ def subclass_objects():
unsafe_search = request.args['search'] unsafe_search = request.args['search']
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
return Movie.objects(__raw__=json_search) return Movie.objects(__raw__=json_search) #$ result=BAD
@app.route("/get_db_find") @app.route("/get_db_find")
def get_db_find(): def get_db_find():
@@ -27,7 +27,7 @@ def get_db_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
retrieved_db = db.get_db() retrieved_db = db.get_db()
return retrieved_db["Movie"].find({'name': json_search}) return retrieved_db["Movie"].find({'name': json_search}) #$ result=BAD
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -21,7 +21,7 @@ def subclass_objects():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
safe_search = sanitize(json_search) safe_search = sanitize(json_search)
return Movie.objects(__raw__=safe_search) return Movie.objects(__raw__=safe_search) #$ result=OK
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -11,7 +11,7 @@ def home_page():
unsafe_search = request.args['search'] unsafe_search = request.args['search']
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
return mongo.db.user.find({'name': json_search}) return mongo.db.user.find({'name': json_search}) #$ result=BAD
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -13,7 +13,7 @@ def home_page():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
safe_search = sanitize(json_search) safe_search = sanitize(json_search)
return mongo.db.user.find({'name': safe_search}) return mongo.db.user.find({'name': safe_search}) #$ result=OK
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -19,7 +19,7 @@ def connect_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
db = me.connect('mydb') db = me.connect('mydb')
return db.movie.find({'name': json_search}) return db.movie.find({'name': json_search}) #$ result=BAD
@app.route("/connection_connect_find") @app.route("/connection_connect_find")
def connection_connect_find(): def connection_connect_find():
@@ -27,7 +27,7 @@ def connection_connect_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
db = connect('mydb') db = connect('mydb')
return db.movie.find({'name': json_search}) return db.movie.find({'name': json_search}) #$ result=BAD
@app.route("/get_db_find") @app.route("/get_db_find")
def get_db_find(): def get_db_find():
@@ -35,7 +35,7 @@ def get_db_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
db = me.get_db() db = me.get_db()
return db.movie.find({'name': json_search}) return db.movie.find({'name': json_search}) #$ result=BAD
@app.route("/connection_get_db_find") @app.route("/connection_get_db_find")
def connection_get_db_find(): def connection_get_db_find():
@@ -43,14 +43,14 @@ def connection_get_db_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
db = get_db() db = get_db()
return db.movie.find({'name': json_search}) return db.movie.find({'name': json_search}) #$ result=BAD
@app.route("/subclass_objects") @app.route("/subclass_objects")
def subclass_objects(): def subclass_objects():
unsafe_search = request.args['search'] unsafe_search = request.args['search']
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
return Movie.objects(__raw__=json_search) return Movie.objects(__raw__=json_search) #$ result=BAD
@app.route("/subscript_find") @app.route("/subscript_find")
def subscript_find(): def subscript_find():
@@ -58,7 +58,7 @@ def subscript_find():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
db = me.connect('mydb') db = me.connect('mydb')
return db['movie'].find({'name': json_search}) return db['movie'].find({'name': json_search}) #$ result=BAD
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -21,7 +21,7 @@ def connect_find():
safe_search = sanitize(json_search) safe_search = sanitize(json_search)
db = me.connect('mydb') db = me.connect('mydb')
return db.movie.find({'name': safe_search}) return db.movie.find({'name': safe_search}) #$ result=OK
# if __name__ == "__main__": # if __name__ == "__main__":
# app.run(debug=True) # app.run(debug=True)

View File

@@ -12,7 +12,7 @@ def bad():
unsafe_search = request.args['search'] unsafe_search = request.args['search']
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
return client.db.collection.find_one({'data': json_search}) return client.db.collection.find_one({'data': json_search}) #$ result=BAD
@app.route("/good") @app.route("/good")
@@ -21,7 +21,7 @@ def good():
json_search = json.loads(unsafe_search) json_search = json.loads(unsafe_search)
safe_search = sanitize(json_search) safe_search = sanitize(json_search)
return client.db.collection.find_one({'data': safe_search}) return client.db.collection.find_one({'data': safe_search}) #$ result=OK
@app.route("/bad2") @app.route("/bad2")
@@ -30,7 +30,7 @@ def bad2():
client = MongoClient("localhost", 27017, maxPoolSize=50) client = MongoClient("localhost", 27017, maxPoolSize=50)
db = client.localhost db = client.localhost
collection = db['collection'] collection = db['collection']
cursor = collection.find_one({"$where": f"this._id == '${event_id}'"}) cursor = collection.find_one({"$where": f"this._id == '${event_id}'"}) #$ result=BAD
@app.route("/bad3") @app.route("/bad3")
@@ -40,7 +40,7 @@ def bad3():
client = MongoClient("localhost", 27017, maxPoolSize=50) client = MongoClient("localhost", 27017, maxPoolSize=50)
db = client.get_database(name="localhost") db = client.get_database(name="localhost")
collection = db.get_collection("collection") collection = db.get_collection("collection")
cursor = collection.find_one({"$where": f"this._id == '${event_id}'"}) cursor = collection.find_one({"$where": f"this._id == '${event_id}'"}) #$ result=BAD
if __name__ == "__main__": if __name__ == "__main__":