Merge pull request #5680 from mrthankyou/python-use-sqlalchemy

Python: Add SqlAlchemy model
This commit is contained in:
Rasmus Wriedt Larsen
2021-07-22 15:31:39 +02:00
committed by GitHub
8 changed files with 258 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
import python
import experimental.meta.ConceptsTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -0,0 +1,3 @@
argumentToEnsureNotTaintedNotMarkedAsSpurious
untaintedArgumentToEnsureTaintedNotMarkedAsMissing
failures

View File

@@ -0,0 +1,2 @@
import experimental.meta.InlineTaintTest
import experimental.semmle.python.frameworks.SqlAlchemy

View File

@@ -0,0 +1,57 @@
import sqlalchemy
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import StaticPool
from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload
from sqlalchemy.sql import text
engine = create_engine(
'sqlite:///:memory:',
echo=True,
connect_args={"check_same_thread": False},
poolclass=StaticPool
)
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
ed_user = User(name='ed')
ed_user2 = User(name='george')
session.add(ed_user)
session.add(ed_user2)
session.commit()
# Injection without requiring the text() taint-step
session.query(User).filter_by(name="some sql") # $ MISSING: getSql="some sql"
session.scalar("some sql") # $ getSql="some sql"
engine.scalar("some sql") # $ getSql="some sql"
session.execute("some sql") # $ getSql="some sql"
with engine.connect() as connection:
connection.execute("some sql") # $ getSql="some sql"
with engine.begin() as connection:
connection.execute("some sql") # $ getSql="some sql"
# Injection requiring the text() taint-step
t = text("some sql")
session.query(User).filter(t) # $ getSql=t
session.query(User).group_by(User.id).having(t) # $ getSql=User.id MISSING: getSql=t
session.query(User).group_by(t).first() # $ getSql=t
session.query(User).order_by(t).first() # $ getSql=t
query = select(User).where(User.name == t) # $ MISSING: getSql=t
with engine.connect() as conn:
conn.execute(query) # $ getSql=query

View File

@@ -0,0 +1,12 @@
import sqlalchemy
def test_taint():
ts = TAINTED_STRING
ensure_tainted(
ts, # $ tainted
sqlalchemy.text(ts), # $ tainted
sqlalchemy.sql.text(ts),# $ tainted
sqlalchemy.sql.expression.text(ts),# $ tainted
sqlalchemy.sql.expression.TextClause(ts),# $ tainted
)