Add thorough tests, including one MISSING alert

This commit is contained in:
Owen Mansel-Chan
2026-06-12 13:29:27 +01:00
parent d389ea4039
commit 434a99447e
2 changed files with 76 additions and 0 deletions

View File

@@ -0,0 +1,52 @@
from fastapi import FastAPI
from hdbcli import dbapi
from db_connection import get_conn
from db_connection import hdb_con
from db_connection import hdb_con2
from db_connection import hdb_con3
app = FastAPI()
class DatabaseConnection:
def __init__(self):
self._conn = dbapi.connect(address='localhost', port=30015, user='system', password='Password123')
def get_conn(self):
return self._conn
db_connection = DatabaseConnection()
@app.get("/unsafe1/")
async def unsafe(name: str): # $ Source
query = "select * from users where name=" + name
cursor = hdb_con.cursor()
cursor.execute(query) # $ Alert
cursor.close()
@app.get("/unsafe2/")
async def unsafe2(name: str): # $ Source
query = "select * from users where name=" + name
cursor = hdb_con2.cursor()
cursor.execute(query) # $ Alert
cursor.close()
@app.get("/unsafe3/")
async def unsafe3(name: str): # $ MISSING: Source
query = "select * from users where name=" + name
cursor = hdb_con3.cursor()
cursor.execute(query) # $ MISSING: Alert
cursor.close()
@app.get("/unsafe4/")
async def unsafe4(name: str): # $ Source
query = "select * from users where name=" + name
cursor = get_conn().cursor()
cursor.execute(query) # $ Alert
cursor.close()
@app.get("/unsafe5/")
async def unsafe5(name: str): # $ Source
query = "select * from users where name=" + name
cursor = db_connection.get_conn().cursor()
cursor.execute(query) # $ Alert
cursor.close()

View File

@@ -0,0 +1,24 @@
from hdbcli import dbapi
from typing import Optional
hdb_con = dbapi.connect(address='localhost', port=30015, user='system', password='Password123')
class DatabaseConnection:
def __init__(self):
self._conn = dbapi.connect(address='localhost', port=30015, user='system', password='Password123')
def get_conn(self):
return self._conn
hdb_con2 = DatabaseConnection().get_conn()
hdb_con3 = DatabaseConnection()._conn
_hana_connection: Optional[DatabaseConnection] = None
def get_conn():
global _hana_connection
if _hana_connection is None:
_hana_connection = DatabaseConnection()
return _hana_connection.get_conn()