mirror of
https://github.com/github/codeql.git
synced 2026-05-01 19:55:15 +02:00
Merge branch 'main' into jorgectf/python/ldapimproperauth
This commit is contained in:
17
python/ql/test/experimental/dataflow/ApiGraphs/async_test.py
Normal file
17
python/ql/test/experimental/dataflow/ApiGraphs/async_test.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import pkg # $ use=moduleImport("pkg")
|
||||
|
||||
async def foo():
|
||||
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
|
||||
coro # $ use=moduleImport("pkg").getMember("async_func").getReturn()
|
||||
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
|
||||
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
|
||||
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
|
||||
|
||||
async def bar():
|
||||
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
|
||||
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
|
||||
|
||||
def check_annotations():
|
||||
# Just to make sure how annotations should look like :)
|
||||
result = pkg.sync_func() # $ use=moduleImport("pkg").getMember("sync_func").getReturn()
|
||||
return result # $ use=moduleImport("pkg").getMember("sync_func").getReturn()
|
||||
@@ -1 +1 @@
|
||||
semmle-extractor-options: --lang=3
|
||||
semmle-extractor-options: --lang=3 --max-import-depth=1
|
||||
|
||||
@@ -13,7 +13,8 @@ class ApiUseTest extends InlineExpectationsTest {
|
||||
l = n.getLocation() and
|
||||
// Module variable nodes have no suitable location, so it's best to simply exclude them entirely
|
||||
// from the inline tests.
|
||||
not n instanceof DataFlow::ModuleVariableNode
|
||||
not n instanceof DataFlow::ModuleVariableNode and
|
||||
exists(l.getFile().getRelativePath())
|
||||
}
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
conjunctive_lookup
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj1 | bar |
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj1 | foo |
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj2 | bar |
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj2 | foo |
|
||||
calls_lookup
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj1 | foo |
|
||||
| test.py:6:1:6:6 | ControlFlowNode for meth() | meth() | obj2 | bar |
|
||||
@@ -0,0 +1,6 @@
|
||||
if cond:
|
||||
meth = obj1.foo
|
||||
else:
|
||||
meth = obj2.bar
|
||||
|
||||
meth()
|
||||
18
python/ql/test/experimental/dataflow/method-calls/test.ql
Normal file
18
python/ql/test/experimental/dataflow/method-calls/test.ql
Normal file
@@ -0,0 +1,18 @@
|
||||
import python
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import experimental.dataflow.TestUtil.PrintNode
|
||||
|
||||
query predicate conjunctive_lookup(
|
||||
DataFlow::MethodCallNode methCall, string call, string object, string methodName
|
||||
) {
|
||||
call = prettyNode(methCall) and
|
||||
object = prettyNode(methCall.getObject()) and
|
||||
methodName = methCall.getMethodName()
|
||||
}
|
||||
|
||||
query predicate calls_lookup(
|
||||
DataFlow::MethodCallNode methCall, string call, string object, string methodName
|
||||
) {
|
||||
call = prettyNode(methCall) and
|
||||
exists(DataFlow::Node o | methCall.calls(o, methodName) and object = prettyNode(o))
|
||||
}
|
||||
@@ -1,12 +1,17 @@
|
||||
// /**
|
||||
// * @kind path-problem
|
||||
// */
|
||||
import python
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import TestUtilities.InlineExpectationsTest
|
||||
import semmle.python.dataflow.new.SensitiveDataSources
|
||||
private import semmle.python.ApiGraphs
|
||||
|
||||
class SensitiveDataSourcesTest extends InlineExpectationsTest {
|
||||
SensitiveDataSourcesTest() { this = "SensitiveDataSourcesTest" }
|
||||
|
||||
override string getARelevantTag() { result = "SensitiveDataSource" }
|
||||
override string getARelevantTag() { result in ["SensitiveDataSource", "SensitiveUse"] }
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
@@ -15,6 +20,32 @@ class SensitiveDataSourcesTest extends InlineExpectationsTest {
|
||||
element = source.toString() and
|
||||
value = source.getClassification() and
|
||||
tag = "SensitiveDataSource"
|
||||
or
|
||||
exists(DataFlow::Node use |
|
||||
any(SensitiveUseConfiguration config).hasFlow(source, use) and
|
||||
location = use.getLocation() and
|
||||
element = use.toString() and
|
||||
value = source.getClassification() and
|
||||
tag = "SensitiveUse"
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class SensitiveUseConfiguration extends TaintTracking::Configuration {
|
||||
SensitiveUseConfiguration() { this = "SensitiveUseConfiguration" }
|
||||
|
||||
override predicate isSource(DataFlow::Node node) { node instanceof SensitiveDataSource }
|
||||
|
||||
override predicate isSink(DataFlow::Node node) {
|
||||
node = API::builtin("print").getACall().getArg(_)
|
||||
}
|
||||
|
||||
override predicate isAdditionalTaintStep(DataFlow::Node node1, DataFlow::Node node2) {
|
||||
sensitiveDataExtraStepForCalls(node1, node2)
|
||||
}
|
||||
}
|
||||
// import DataFlow::PathGraph
|
||||
// from SensitiveUseConfiguration cfg, DataFlow::PathNode source, DataFlow::PathNode sink
|
||||
// where cfg.hasFlowPath(source, sink)
|
||||
// select sink, source, sink, "taint from $@", source.getNode(), "here"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
|
||||
from not_found import get_passwd, account_id
|
||||
from not_found import get_passwd # $ SensitiveDataSource=password
|
||||
from not_found import account_id # $ SensitiveDataSource=id
|
||||
|
||||
def get_password():
|
||||
pass
|
||||
@@ -20,14 +21,94 @@ fetch_certificate() # $ SensitiveDataSource=certificate
|
||||
account_id() # $ SensitiveDataSource=id
|
||||
safe_to_store = encrypt_password(pwd)
|
||||
|
||||
f = get_password
|
||||
f() # $ SensitiveDataSource=password
|
||||
|
||||
# more tests of functions we don't have definition for
|
||||
x = unkown_func_not_even_imported_get_password() # $ SensitiveDataSource=password
|
||||
print(x) # $ SensitiveUse=password
|
||||
|
||||
f = get_passwd
|
||||
x = f()
|
||||
print(x) # $ SensitiveUse=password
|
||||
|
||||
import not_found
|
||||
f = not_found.get_passwd # $ SensitiveDataSource=password
|
||||
x = f()
|
||||
print(x) # $ SensitiveUse=password
|
||||
|
||||
def my_func(non_sensitive_name):
|
||||
x = non_sensitive_name()
|
||||
print(x) # $ SensitiveUse=password
|
||||
f = not_found.get_passwd # $ SensitiveDataSource=password
|
||||
my_func(f)
|
||||
|
||||
# attributes
|
||||
foo = ObjectFromDatabase()
|
||||
foo.secret # $ SensitiveDataSource=secret
|
||||
foo.username # $ SensitiveDataSource=id
|
||||
|
||||
getattr(foo, "password") # $ SensitiveDataSource=password
|
||||
x = "password"
|
||||
getattr(foo, x) # $ SensitiveDataSource=password
|
||||
|
||||
# based on variable/parameter names
|
||||
def my_func(password): # $ SensitiveDataSource=password
|
||||
print(password) # $ SensitiveUse=password
|
||||
|
||||
password = some_function() # $ SensitiveDataSource=password
|
||||
print(password) # $ SensitiveUse=password
|
||||
|
||||
for password in some_function2(): # $ SensitiveDataSource=password
|
||||
print(password) # $ SensitiveUse=password
|
||||
|
||||
with some_function3() as password: # $ SensitiveDataSource=password
|
||||
print(password) # $ SensitiveUse=password
|
||||
|
||||
|
||||
# Special handling of lookups of sensitive properties
|
||||
request.args["password"], # $ MISSING: SensitiveDataSource=password
|
||||
request.args["password"], # $ SensitiveDataSource=password
|
||||
request.args.get("password") # $ SensitiveDataSource=password
|
||||
|
||||
x = "password"
|
||||
request.args.get(x) # $ SensitiveDataSource=password
|
||||
|
||||
# I don't think handling `getlist` is super important, just included it to show what we don't handle
|
||||
request.args.getlist("password")[0] # $ MISSING: SensitiveDataSource=password
|
||||
|
||||
from not_found import password2 as foo # $ SensitiveDataSource=password
|
||||
print(foo) # $ SensitiveUse=password
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# cross-talk between different calls
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
# Case 1: providing name as argument
|
||||
|
||||
_configuration = {"sleep_timer": 5, "mysql_password": "1234"}
|
||||
|
||||
def get_config(key):
|
||||
# Treating this as a SensitiveDataSource is questionable, since that will result in
|
||||
# _all_ calls to `get_config` being treated as giving sensitive data
|
||||
return _configuration[key]
|
||||
|
||||
foo = get_config("mysql_password")
|
||||
print(foo) # $ MISSING: SensitiveUse=password
|
||||
|
||||
bar = get_config("sleep_timer")
|
||||
print(bar)
|
||||
|
||||
# Case 2: Providing function as argument
|
||||
|
||||
def call_wrapper(func):
|
||||
print("Will call", func)
|
||||
# Treating this as a SensitiveDataSource is questionable, since that will result in
|
||||
# _all_ calls to `call_wrapper` being treated as giving sensitive data
|
||||
return func() # $ SensitiveDataSource=password
|
||||
|
||||
foo = call_wrapper(get_password)
|
||||
print(foo) # $ SensitiveUse=password
|
||||
|
||||
harmless = lambda: "bar"
|
||||
bar = call_wrapper(harmless)
|
||||
print(bar) # $ SPURIOUS: SensitiveUse=password
|
||||
|
||||
@@ -104,7 +104,7 @@ def non_syntactic():
|
||||
_str = str
|
||||
ensure_tainted(
|
||||
meth(), # $ MISSING: tainted
|
||||
_str(ts), # $ MISSING: tainted
|
||||
_str(ts), # $ tainted
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import python
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TypeTracker
|
||||
|
||||
private DataFlow::LocalSourceNode module_tracker(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode module_tracker(TypeTracker t) {
|
||||
t.start() and
|
||||
result = DataFlow::importNode("module")
|
||||
or
|
||||
@@ -13,7 +13,7 @@ query DataFlow::Node module_tracker() {
|
||||
module_tracker(DataFlow::TypeTracker::end()).flowsTo(result)
|
||||
}
|
||||
|
||||
private DataFlow::LocalSourceNode module_attr_tracker(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode module_attr_tracker(TypeTracker t) {
|
||||
t.startInAttr("attr") and
|
||||
result = module_tracker()
|
||||
or
|
||||
|
||||
@@ -6,7 +6,7 @@ import TestUtilities.InlineExpectationsTest
|
||||
// -----------------------------------------------------------------------------
|
||||
// tracked
|
||||
// -----------------------------------------------------------------------------
|
||||
private DataFlow::LocalSourceNode tracked(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode tracked(TypeTracker t) {
|
||||
t.start() and
|
||||
result.asCfgNode() = any(NameNode n | n.getId() = "tracked")
|
||||
or
|
||||
@@ -34,14 +34,14 @@ class TrackedTest extends InlineExpectationsTest {
|
||||
// -----------------------------------------------------------------------------
|
||||
// int + str
|
||||
// -----------------------------------------------------------------------------
|
||||
private DataFlow::LocalSourceNode int_type(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode int_type(TypeTracker t) {
|
||||
t.start() and
|
||||
result.asCfgNode() = any(CallNode c | c.getFunction().(NameNode).getId() = "int")
|
||||
or
|
||||
exists(TypeTracker t2 | result = int_type(t2).track(t2, t))
|
||||
}
|
||||
|
||||
private DataFlow::LocalSourceNode string_type(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode string_type(TypeTracker t) {
|
||||
t.start() and
|
||||
result.asCfgNode() = any(CallNode c | c.getFunction().(NameNode).getId() = "str")
|
||||
or
|
||||
@@ -83,7 +83,7 @@ class TrackedStringTest extends InlineExpectationsTest {
|
||||
// -----------------------------------------------------------------------------
|
||||
// tracked_self
|
||||
// -----------------------------------------------------------------------------
|
||||
private DataFlow::LocalSourceNode tracked_self(TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode tracked_self(TypeTracker t) {
|
||||
t.start() and
|
||||
exists(Function f |
|
||||
f.isMethod() and
|
||||
@@ -117,7 +117,7 @@ class TrackedSelfTest extends InlineExpectationsTest {
|
||||
// -----------------------------------------------------------------------------
|
||||
// This modeling follows the same pattern that we currently use in our real library modeling.
|
||||
/** Gets a reference to `foo` (fictive module). */
|
||||
private DataFlow::LocalSourceNode foo(DataFlow::TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode foo(DataFlow::TypeTracker t) {
|
||||
t.start() and
|
||||
result = DataFlow::importNode("foo")
|
||||
or
|
||||
@@ -128,7 +128,7 @@ private DataFlow::LocalSourceNode foo(DataFlow::TypeTracker t) {
|
||||
DataFlow::Node foo() { foo(DataFlow::TypeTracker::end()).flowsTo(result) }
|
||||
|
||||
/** Gets a reference to `foo.bar` (fictive module). */
|
||||
private DataFlow::LocalSourceNode foo_bar(DataFlow::TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode foo_bar(DataFlow::TypeTracker t) {
|
||||
t.start() and
|
||||
result = DataFlow::importNode("foo.bar")
|
||||
or
|
||||
@@ -142,7 +142,7 @@ private DataFlow::LocalSourceNode foo_bar(DataFlow::TypeTracker t) {
|
||||
DataFlow::Node foo_bar() { foo_bar(DataFlow::TypeTracker::end()).flowsTo(result) }
|
||||
|
||||
/** Gets a reference to `foo.bar.baz` (fictive attribute on `foo.bar` module). */
|
||||
private DataFlow::LocalSourceNode foo_bar_baz(DataFlow::TypeTracker t) {
|
||||
private DataFlow::TypeTrackingNode foo_bar_baz(DataFlow::TypeTracker t) {
|
||||
t.start() and
|
||||
result = DataFlow::importNode("foo.bar.baz")
|
||||
or
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
import python
|
||||
import experimental.meta.ConceptsTest
|
||||
import experimental.semmle.python.frameworks.SqlAlchemy
|
||||
@@ -0,0 +1,3 @@
|
||||
argumentToEnsureNotTaintedNotMarkedAsSpurious
|
||||
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
|
||||
failures
|
||||
@@ -0,0 +1,2 @@
|
||||
import experimental.meta.InlineTaintTest
|
||||
import experimental.semmle.python.frameworks.SqlAlchemy
|
||||
@@ -0,0 +1,57 @@
|
||||
import sqlalchemy
|
||||
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.pool import StaticPool
|
||||
from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
engine = create_engine(
|
||||
'sqlite:///:memory:',
|
||||
echo=True,
|
||||
connect_args={"check_same_thread": False},
|
||||
poolclass=StaticPool
|
||||
)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = 'users'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String)
|
||||
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
|
||||
ed_user = User(name='ed')
|
||||
ed_user2 = User(name='george')
|
||||
|
||||
session.add(ed_user)
|
||||
session.add(ed_user2)
|
||||
|
||||
session.commit()
|
||||
|
||||
# Injection without requiring the text() taint-step
|
||||
session.query(User).filter_by(name="some sql") # $ MISSING: getSql="some sql"
|
||||
session.scalar("some sql") # $ getSql="some sql"
|
||||
engine.scalar("some sql") # $ getSql="some sql"
|
||||
session.execute("some sql") # $ getSql="some sql"
|
||||
|
||||
with engine.connect() as connection:
|
||||
connection.execute("some sql") # $ getSql="some sql"
|
||||
|
||||
with engine.begin() as connection:
|
||||
connection.execute("some sql") # $ getSql="some sql"
|
||||
|
||||
# Injection requiring the text() taint-step
|
||||
t = text("some sql")
|
||||
session.query(User).filter(t) # $ getSql=t
|
||||
session.query(User).group_by(User.id).having(t) # $ getSql=User.id MISSING: getSql=t
|
||||
session.query(User).group_by(t).first() # $ getSql=t
|
||||
session.query(User).order_by(t).first() # $ getSql=t
|
||||
|
||||
query = select(User).where(User.name == t) # $ MISSING: getSql=t
|
||||
with engine.connect() as conn:
|
||||
conn.execute(query) # $ getSql=query
|
||||
@@ -0,0 +1,12 @@
|
||||
import sqlalchemy
|
||||
|
||||
def test_taint():
|
||||
ts = TAINTED_STRING
|
||||
|
||||
ensure_tainted(
|
||||
ts, # $ tainted
|
||||
sqlalchemy.text(ts), # $ tainted
|
||||
sqlalchemy.sql.text(ts),# $ tainted
|
||||
sqlalchemy.sql.expression.text(ts),# $ tainted
|
||||
sqlalchemy.sql.expression.TextClause(ts),# $ tainted
|
||||
)
|
||||
@@ -93,6 +93,23 @@ class EncodingTest extends InlineExpectationsTest {
|
||||
}
|
||||
}
|
||||
|
||||
class LoggingTest extends InlineExpectationsTest {
|
||||
LoggingTest() { this = "LoggingTest" }
|
||||
|
||||
override string getARelevantTag() { result in ["loggingInput"] }
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
exists(Logging logging, DataFlow::Node data |
|
||||
location = data.getLocation() and
|
||||
element = data.toString() and
|
||||
value = prettyNodeForInlineTest(data) and
|
||||
data = logging.getAnInput() and
|
||||
tag = "loggingInput"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class CodeExecutionTest extends InlineExpectationsTest {
|
||||
CodeExecutionTest() { this = "CodeExecutionTest" }
|
||||
|
||||
@@ -129,6 +146,38 @@ class SqlExecutionTest extends InlineExpectationsTest {
|
||||
}
|
||||
}
|
||||
|
||||
class EscapingTest extends InlineExpectationsTest {
|
||||
EscapingTest() { this = "EscapingTest" }
|
||||
|
||||
override string getARelevantTag() { result in ["escapeInput", "escapeOutput", "escapeKind"] }
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
exists(Escaping esc |
|
||||
exists(DataFlow::Node data |
|
||||
location = data.getLocation() and
|
||||
element = data.toString() and
|
||||
value = prettyNodeForInlineTest(data) and
|
||||
(
|
||||
data = esc.getAnInput() and
|
||||
tag = "escapeInput"
|
||||
or
|
||||
data = esc.getOutput() and
|
||||
tag = "escapeOutput"
|
||||
)
|
||||
)
|
||||
or
|
||||
exists(string format |
|
||||
location = esc.getLocation() and
|
||||
element = format and
|
||||
value = format and
|
||||
format = esc.getKind() and
|
||||
tag = "escapeKind"
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class HttpServerRouteSetupTest extends InlineExpectationsTest {
|
||||
HttpServerRouteSetupTest() { this = "HttpServerRouteSetupTest" }
|
||||
|
||||
@@ -252,6 +301,38 @@ class HttpServerHttpRedirectResponseTest extends InlineExpectationsTest {
|
||||
}
|
||||
}
|
||||
|
||||
class HttpServerCookieWriteTest extends InlineExpectationsTest {
|
||||
HttpServerCookieWriteTest() { this = "HttpServerCookieWriteTest" }
|
||||
|
||||
override string getARelevantTag() {
|
||||
result in ["CookieWrite", "CookieRawHeader", "CookieName", "CookieValue"]
|
||||
}
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
exists(HTTP::Server::CookieWrite cookieWrite |
|
||||
location = cookieWrite.getLocation() and
|
||||
(
|
||||
element = cookieWrite.toString() and
|
||||
value = "" and
|
||||
tag = "CookieWrite"
|
||||
or
|
||||
element = cookieWrite.toString() and
|
||||
value = prettyNodeForInlineTest(cookieWrite.getHeaderArg()) and
|
||||
tag = "CookieRawHeader"
|
||||
or
|
||||
element = cookieWrite.toString() and
|
||||
value = prettyNodeForInlineTest(cookieWrite.getNameArg()) and
|
||||
tag = "CookieName"
|
||||
or
|
||||
element = cookieWrite.toString() and
|
||||
value = prettyNodeForInlineTest(cookieWrite.getValueArg()) and
|
||||
tag = "CookieValue"
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class FileSystemAccessTest extends InlineExpectationsTest {
|
||||
FileSystemAccessTest() { this = "FileSystemAccessTest" }
|
||||
|
||||
@@ -269,6 +350,23 @@ class FileSystemAccessTest extends InlineExpectationsTest {
|
||||
}
|
||||
}
|
||||
|
||||
class FileSystemWriteAccessTest extends InlineExpectationsTest {
|
||||
FileSystemWriteAccessTest() { this = "FileSystemWriteAccessTest" }
|
||||
|
||||
override string getARelevantTag() { result = "fileWriteData" }
|
||||
|
||||
override predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
exists(FileSystemWriteAccess write, DataFlow::Node data |
|
||||
data = write.getADataNode() and
|
||||
location = data.getLocation() and
|
||||
element = data.toString() and
|
||||
value = prettyNodeForInlineTest(data) and
|
||||
tag = "fileWriteData"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class PathNormalizationTest extends InlineExpectationsTest {
|
||||
PathNormalizationTest() { this = "PathNormalizationTest" }
|
||||
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
| ClickHouseDriver.py:15:22:15:106 | ControlFlowNode for Attribute() | ClickHouseDriver.py:15:52:15:105 | ControlFlowNode for BinaryExpr |
|
||||
| ClickHouseDriver.py:18:5:18:87 | ControlFlowNode for Attribute() | ClickHouseDriver.py:18:33:18:86 | ControlFlowNode for BinaryExpr |
|
||||
| ClickHouseDriver.py:22:5:22:62 | ControlFlowNode for Attribute() | ClickHouseDriver.py:22:33:22:37 | ControlFlowNode for query |
|
||||
| ClickHouseDriver.py:27:5:27:74 | ControlFlowNode for Attribute() | ClickHouseDriver.py:27:20:27:73 | ControlFlowNode for BinaryExpr |
|
||||
| ClickHouseDriver.py:30:5:30:89 | ControlFlowNode for Attribute() | ClickHouseDriver.py:30:35:30:88 | ControlFlowNode for BinaryExpr |
|
||||
@@ -1,32 +0,0 @@
|
||||
from django.conf.urls import url
|
||||
from clickhouse_driver import Client
|
||||
from clickhouse_driver import connect
|
||||
from aioch import Client as aiochClient
|
||||
|
||||
# Dummy Client subclass
|
||||
class MyClient(Client):
|
||||
def dummy(self):
|
||||
return None
|
||||
|
||||
def show_user(request, username):
|
||||
|
||||
# BAD -- Untrusted user input is directly injected into the sql query using async library 'aioch'
|
||||
aclient = aiochClient("localhost")
|
||||
progress = await aclient.execute_with_progress("SELECT * FROM users WHERE username = '%s'" % username)
|
||||
|
||||
# BAD -- Untrusted user input is directly injected into the sql query using native client of library 'clickhouse_driver'
|
||||
Client('localhost').execute("SELECT * FROM users WHERE username = '%s'" % username)
|
||||
|
||||
# GOOD -- query uses prepared statements
|
||||
query = "SELECT * FROM users WHERE username = %(username)s"
|
||||
Client('localhost').execute(query, {"username": username})
|
||||
|
||||
# BAD -- Untrusted user input is directly injected into the sql query using PEP249 interface
|
||||
conn = connect('clickhouse://localhost')
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT * FROM users WHERE username = '%s'" % username)
|
||||
|
||||
# BAD -- Untrusted user input is directly injected into the sql query using MyClient, which is a subclass of Client
|
||||
MyClient('localhost').execute("SELECT * FROM users WHERE username = '%s'" % username)
|
||||
|
||||
urlpatterns = [url(r'^users/(?P<username>[^/]+)$', show_user)]
|
||||
@@ -1,6 +0,0 @@
|
||||
import python
|
||||
import experimental.semmle.python.frameworks.ClickHouseDriver
|
||||
import semmle.python.Concepts
|
||||
|
||||
from SqlExecution s
|
||||
select s, s.getSql()
|
||||
Reference in New Issue
Block a user