Python: Use ConceptsTests for ClickHouse SQL libs

This did reveal a few places where we do not detect the incoming SQL
This commit is contained in:
Rasmus Wriedt Larsen
2021-05-25 14:53:09 +02:00
parent ee3477c20a
commit c9a9535dbc
10 changed files with 78 additions and 41 deletions

View File

@@ -1,6 +1,3 @@
import python
import experimental.meta.ConceptsTest
import experimental.semmle.python.frameworks.ClickHouseDriver
import semmle.python.Concepts
from SqlExecution s
select s, s.getSql()

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1 --lang=3

View File

@@ -0,0 +1,30 @@
import aioch
SQL = "SOME SQL"
async def aioch_test():
client = aioch.Client("localhost")
await client.execute(SQL) # $ getSql=SQL
await client.execute(query=SQL) # $ MISSING: getSql=SQL
await client.execute_with_progress(SQL) # $ getSql=SQL
await client.execute_with_progress(query=SQL) # $ MISSING: getSql=SQL
await client.execute_iter(SQL) # $ getSql=SQL
await client.execute_iter(query=SQL) # $ MISSING: getSql=SQL
# Using custom client (this has been seen done for the blocking version in
# `clickhouse_driver` PyPI package)
class MyClient(aioch.Client):
pass
async def test_custom_client():
client = MyClient("localhost")
await client.execute(SQL) # $ getSql=SQL

View File

@@ -1,5 +0,0 @@
| ClickHouseDriver.py:15:22:15:106 | ControlFlowNode for Attribute() | ClickHouseDriver.py:15:52:15:105 | ControlFlowNode for BinaryExpr |
| ClickHouseDriver.py:18:5:18:87 | ControlFlowNode for Attribute() | ClickHouseDriver.py:18:33:18:86 | ControlFlowNode for BinaryExpr |
| ClickHouseDriver.py:22:5:22:62 | ControlFlowNode for Attribute() | ClickHouseDriver.py:22:33:22:37 | ControlFlowNode for query |
| ClickHouseDriver.py:27:5:27:74 | ControlFlowNode for Attribute() | ClickHouseDriver.py:27:20:27:73 | ControlFlowNode for BinaryExpr |
| ClickHouseDriver.py:30:5:30:89 | ControlFlowNode for Attribute() | ClickHouseDriver.py:30:35:30:88 | ControlFlowNode for BinaryExpr |

View File

@@ -1,32 +0,0 @@
from django.conf.urls import url
from clickhouse_driver import Client
from clickhouse_driver import connect
from aioch import Client as aiochClient
# Dummy Client subclass
class MyClient(Client):
def dummy(self):
return None
def show_user(request, username):
# BAD -- Untrusted user input is directly injected into the sql query using async library 'aioch'
aclient = aiochClient("localhost")
progress = await aclient.execute_with_progress("SELECT * FROM users WHERE username = '%s'" % username)
# BAD -- Untrusted user input is directly injected into the sql query using native client of library 'clickhouse_driver'
Client('localhost').execute("SELECT * FROM users WHERE username = '%s'" % username)
# GOOD -- query uses prepared statements
query = "SELECT * FROM users WHERE username = %(username)s"
Client('localhost').execute(query, {"username": username})
# BAD -- Untrusted user input is directly injected into the sql query using PEP249 interface
conn = connect('clickhouse://localhost')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = '%s'" % username)
# BAD -- Untrusted user input is directly injected into the sql query using MyClient, which is a subclass of Client
MyClient('localhost').execute("SELECT * FROM users WHERE username = '%s'" % username)
urlpatterns = [url(r'^users/(?P<username>[^/]+)$', show_user)]

View File

@@ -0,0 +1,3 @@
import python
import experimental.meta.ConceptsTest
import experimental.semmle.python.frameworks.ClickHouseDriver

View File

@@ -0,0 +1,42 @@
import clickhouse_driver
SQL = "SOME SQL"
# Normal operation
client = clickhouse_driver.client.Client("localhost")
client.execute(SQL) # $ MISSING: getSql=SQL
client.execute(query=SQL) # $ MISSING: getSql=SQL
client.execute_with_progress(SQL) # $ MISSING: getSql=SQL
client.execute_with_progress(query=SQL) # $ MISSING: getSql=SQL
client.execute_iter(SQL) # $ MISSING: getSql=SQL
client.execute_iter(query=SQL) # $ MISSING: getSql=SQL
# commonly used alias
client = clickhouse_driver.Client("localhost")
client.execute(SQL) # $ getSql=SQL
# Using PEP249 interface
conn = clickhouse_driver.connect('clickhouse://localhost')
cursor = conn.cursor()
cursor.execute(SQL) # $ getSql=SQL
# Using custom client
#
# examples from real world code
# https://github.com/Altinity/clickhouse-mysql-data-reader/blob/3b1b7088751b05e5bbf45890c5949b58208c2343/clickhouse_mysql/dbclient/chclient.py#L10
# https://github.com/Felixoid/clickhouse-plantuml/blob/d8b2ba7d164a836770ec21f5e4035dfb04c41d9c/clickhouse_plantuml/client.py#L9
class MyClient(clickhouse_driver.Client):
pass
MyClient("localhost").execute(SQL) # $ getSql=SQL

View File

@@ -0,0 +1 @@
semmle-extractor-options: --max-import-depth=1