Python: ORM: Add raw python test files

no ql test files yet though, will come in next commit.
This commit is contained in:
Rasmus Wriedt Larsen
2022-02-16 14:10:54 +01:00
parent f89fb50eb5
commit c78fed6594
13 changed files with 630 additions and 4 deletions

View File

@@ -0,0 +1,5 @@
db.sqlite3
# The testapp/migrations/ folder needs to be comitted to git,
# but we don't care to store the actual migrations
testapp/migrations/

View File

@@ -0,0 +1,23 @@
The main test files are:
- [testapp/orm_tests.py](testapp/orm_tests.py): which tests flow from source to sink
- [testapp/orm_security_tests.py](testapp/orm_form_test.py): shows how forms can be used to save Models to the DB
- [testapp/orm_security_tests.py](testapp/orm_security_tests.py): which highlights some interesting interactions with security queries
## Setup
```
pip install django pytest pytest-django
```
## Run server
```
python manage.py makemigrations && python manage.py migrate && python manage.py runserver
```
## Run tests
```
pytest
```

View File

@@ -0,0 +1,5 @@
# to force extractor to see files. since we use `--max-import-depth=1`, we use this
# "fake" import that doesn't actually work, but tricks the python extractor to look at
# all the files
from testapp import *

View File

@@ -0,0 +1,5 @@
[pytest]
DJANGO_SETTINGS_MODULE = testproj.settings
python_files = tests.py
# don't require that you have manually run `python manage.py makemigrations`
addopts = --no-migrations --ignore-glob=*.testproj/ -v

View File

@@ -1,3 +1,6 @@
from django.db import models
# Create your models here.
from .orm_tests import *
from .orm_security_tests import *
from .orm_form_test import *

View File

@@ -0,0 +1,35 @@
from django.db import models
from django.http.response import HttpResponse
from django.shortcuts import render
from django import forms
class MyModel(models.Model):
text = models.CharField(max_length=256)
class MyModelForm(forms.ModelForm):
# see https://docs.djangoproject.com/en/4.0/topics/forms/modelforms/#django.forms.ModelForm
class Meta:
model = MyModel
fields = ["text"]
# TODO: When we actually start supporting ModelForm, we need to add test-cases for
# limiting what fields are used. See
# https://docs.djangoproject.com/en/4.0/topics/forms/modelforms/#selecting-the-fields-to-use
def add_mymodel_handler(request):
if request.method == "POST":
form = MyModelForm(request.POST, request.FILES)
if form.is_valid():
new_MyMoodel_instance = form.save()
return HttpResponse("ok")
else:
print("not valid", form.errors)
else:
form = MyModelForm(initial=request.GET)
return render(request, "form_example.html", {"form": form})
def show_mymodel_handler(request):
obj = MyModel.objects.last()
return HttpResponse("Last object (id={}) had text: {!r}".format(obj.id, obj.text))

View File

@@ -0,0 +1,126 @@
"""
Handling of ORM steps that are only relevant to real taint-tracking queries, and not core dataflow.
"""
from django.db import models
from django.http.response import HttpResponse
# ------------------------------------------------------------------------------
# Some fields are not relevant for some security queries
# ------------------------------------------------------------------------------
# TODO: We need some way to mark that a certain data-flow node can only contain
# an integer, so it can be excluded from queries.
class Person(models.Model):
name = models.CharField(max_length=256)
age = models.IntegerField()
def person(request):
if request.method == "POST":
person = Person()
person.name = request.POST["name"]
person.age = request.POST["age"]
# at this point, `person.age` is a string, and could contain anything
assert isinstance(person.age, str)
person.save()
# after saving, there will be an error if the string could not be converted to an integer.
# the attribute on the local object is not changed (so still `str`) but after fetching from DB it is
# an `int`
assert isinstance(person.age, str)
# after doing `.full_clean` it also has the proper data-type
person.full_clean()
assert isinstance(person.age, int)
return HttpResponse("ok")
elif request.method == "GET":
resp_text = "<h1>Persons:</h1>"
for person in Person.objects.all():
resp_text += "\n{} (age {})".format(person.name, person.age)
return HttpResponse(resp_text) # NOT OK
def show_name(request):
person = Person.objects.get(id=request.GET["id"])
return HttpResponse("Name is: {}".format(person.name)) # NOT OK
def show_age(request):
person = Person.objects.get(id=request.GET["id"])
assert isinstance(person.age, int)
# Since the age is an integer, there is not actually XSS in the line below
return HttpResponse("Age is: {}".format(person.age)) # OK
# look at the log after doing
"""
http -f 'http://127.0.0.1:8000/person/' name="foo" age=42
http 'http://127.0.0.1:8000/show_age/?id=1'
"""
# ------------------------------------------------------------------------------
# Custom validators on fields
# ------------------------------------------------------------------------------
#
# We currently do not include these in any of our queries. There are two reasons:
#
# 1. We don't have any good way to determine what the validator actually does. So we
# don't have any way to determine if a validator would make data safe for a
# particular query. So we would have to blindly trust that if any validator what
# specified, that would mean data is always safe :| We still want to produce more
# results, so by default, we would want to do it this way, and live live with the FPs
# that arise -- if it turns out that is too troublesome, we can look more into it.
#
# 2. Using a validator on the input data does not make any guarantees on the data that
# is already in the DB. It's better to perform escaping on the data as part of
# outputing/rendering, since you know _all_ data will be escaped, and that the right
# kind of escaping is applied (there is a difference in what needs to be escaped for
# different vulnerabilities, so doing validation that rejects things that would cause
# XSS might still accept things that can do SQL injection)
from django.core.exceptions import ValidationError
import re
def only_az(value):
if not re.match(r"^[a-zA-Z]$", value):
raise ValidationError("only a-zA-Z allowed")
# First example: Validator is set, but not used
class CommentValidatorNotUsed(models.Model):
text = models.CharField(max_length=256, validators=[only_az])
def save_comment_validator_not_used(request): # $requestHandler
comment = CommentValidatorNotUsed(text=request.POST["text"])
comment.save()
return HttpResponse("ok")
def display_comment_validator_not_used(request): # $requestHandler
comment = CommentValidatorNotUsed.objects.last()
return HttpResponse(comment.text) # NOT OK
# To test this
"""
http -f http://127.0.0.1:8000/save_comment_validator_not_used/ text="foo!@#"
http http://127.0.0.1:8000/display_comment_validator_not_used/
"""
# Second example: Validator is set, AND is used
class CommentValidatorUsed(models.Model):
text = models.CharField(max_length=256, validators=[only_az])
def save_comment_validator_used(request): # $requestHandler
comment = CommentValidatorUsed(text=request.POST["text"])
comment.full_clean()
comment.save()
def display_comment_validator_used(request): # $requestHandler
comment = CommentValidatorUsed.objects.last()
return HttpResponse(comment.text) # sort of OK
# Doing the following will raise a ValidationError
"""
http -f http://127.0.0.1:8000/save_comment_validator_used/ text="foo!@#"
"""

View File

@@ -0,0 +1,325 @@
from django.db import models
SOURCE = "source"
def SINK(arg):
print(arg)
assert arg == SOURCE
def SINK_F(arg):
print(arg)
assert arg != SOURCE
# ------------------------------------------------------------------------------
# Different ways to save data to the DB through ORM
#
# These tests are set up with their own individual model, so it's possible to see
# whether the source works or not (although there is quite a bit of boilerplate). The
# problem with using the same model multiple times, is that it won't be obvious if the
# single SINK call actually has flow from all the expected places or not.
# ------------------------------------------------------------------------------
# --------------------------------------
# Constructor: kw arg
# --------------------------------------
class TestSave1(models.Model):
text = models.CharField(max_length=512)
def test_save1_store():
obj = TestSave1(text=SOURCE)
obj.save()
def test_save1_load():
obj = TestSave1.objects.first()
SINK(obj.text)
# --------------------------------------
# Constructor: positional arg
# --------------------------------------
class TestSave2(models.Model):
text = models.CharField(max_length=512)
def test_save2_store():
# first positional argument is `id`, a primary key automatically added
# see https://docs.djangoproject.com/en/4.0/topics/db/models/#automatic-primary-key-fields
obj = TestSave2(None, SOURCE)
obj.save()
def test_save2_load():
obj = TestSave2.objects.first()
SINK(obj.text)
# --------------------------------------
# Constructor: positional arg, with own primary key
# --------------------------------------
class TestSave3(models.Model):
text = models.CharField(max_length=512, primary_key=True)
def test_save3_store():
# no `id` column added, see https://docs.djangoproject.com/en/4.0/topics/db/models/#automatic-primary-key-fields
obj = TestSave3(SOURCE)
obj.save()
def test_save3_load():
obj = TestSave3.objects.first()
SINK(obj.text)
# --------------------------------------
# Set attribute on fresh object
# --------------------------------------
class TestSave4(models.Model):
text = models.CharField(max_length=512)
def test_save4_store():
obj = TestSave4()
obj.text = SOURCE
obj.save()
def test_save4_load():
obj = TestSave4.objects.first()
SINK(obj.text)
# --------------------------------------
# Set attribute on existing
# --------------------------------------
class TestSave4b(models.Model):
text = models.CharField(max_length=512)
def test_save4b_init():
obj = TestSave4b()
obj.text = "foo"
obj.save()
def test_save4b_store():
obj = TestSave4b.objects.first()
obj.text = SOURCE
obj.save()
def test_save4b_load():
obj = TestSave4b.objects.first()
SINK(obj.text)
# --------------------------------------
# <Model>.objects.create()
# see https://docs.djangoproject.com/en/4.0/ref/models/querysets/#create
# --------------------------------------
class TestSave5(models.Model):
text = models.CharField(max_length=512)
def test_save5_store():
# note: positional args not possible
obj = TestSave5.objects.create(text=SOURCE)
SINK(obj.text)
def test_save5_load():
obj = TestSave5.objects.first()
SINK(obj.text)
# --------------------------------------
# <Model>.objects.get_or_create()
# see https://docs.djangoproject.com/en/4.0/ref/models/querysets/#get-or-create
# --------------------------------------
class TestSave6(models.Model):
text = models.CharField(max_length=512)
email = models.CharField(max_length=256)
def test_save6_store():
obj, _created = TestSave6.objects.get_or_create(defaults={"text": SOURCE}, email=SOURCE)
SINK(obj.text)
SINK(obj.email)
def test_save6_load():
obj = TestSave6.objects.first()
SINK(obj.text)
SINK(obj.email)
# --------------------------------------
# <Model>.objects.update_or_create()
# see https://docs.djangoproject.com/en/4.0/ref/models/querysets/#update-or-create
# --------------------------------------
class TestSave7(models.Model):
text = models.CharField(max_length=512)
email = models.CharField(max_length=256)
def test_save7_store():
obj, _created = TestSave7.objects.update_or_create(defaults={"text": SOURCE}, email=SOURCE)
SINK(obj.text)
SINK(obj.email)
def test_save7_load():
obj = TestSave7.objects.first()
SINK(obj.text)
SINK(obj.email)
# --------------------------------------
# <Model>.objects.[<QuerySet>].update()
# --------------------------------------
class TestSave8(models.Model):
text = models.CharField(max_length=512)
def test_save8_init():
TestSave8.objects.create(text="foo")
def test_save8_store():
_updated_count = TestSave8.objects.all().update(text=SOURCE)
def test_save8_load():
obj = TestSave8.objects.first()
SINK(obj.text)
# --------------------------------------
# .save() on foreign key field
# --------------------------------------
class TestSave9(models.Model):
text = models.CharField(max_length=512)
class TestSave9WithForeignKey(models.Model):
test = models.ForeignKey(TestSave9, models.deletion.CASCADE)
def test_save9_init():
obj = TestSave9.objects.create(text="foo")
TestSave9WithForeignKey.objects.create(test=obj)
def test_save9_store():
w_fk = TestSave9WithForeignKey.objects.first()
w_fk.test.text = SOURCE
w_fk.test.save()
# note that `w_fk.save()` does NOT save the state of the `TestSave9` object.
def test_save9_load():
obj = TestSave9.objects.first()
SINK(obj.text)
# --------------------------------------
# foreign key backreference (auto-generated name)
# see https://docs.djangoproject.com/en/4.0/topics/db/queries/#following-relationships-backward
# --------------------------------------
class save10_BlogPost(models.Model):
# dummy contet, only has automatic `id` field
pass
class save10_Comment(models.Model):
text = models.CharField(max_length=512)
blog = models.ForeignKey(save10_BlogPost, models.deletion.CASCADE)
def test_save10_init():
blogpost = save10_BlogPost.objects.create()
save10_Comment.objects.create(blog=blogpost, text="foo")
def test_save10_store():
blogpost = save10_BlogPost.objects.first()
for comment in blogpost.save10_comment_set.all():
comment.text = SOURCE
comment.save()
def test_save10_load():
obj = save10_Comment.objects.first()
SINK(obj.text)
# --------------------------------------
# foreign key backreference, with custom name
# see https://docs.djangoproject.com/en/4.0/topics/db/queries/#following-relationships-backward
# --------------------------------------
class save11_BlogPost(models.Model):
# dummy contet, only has automatic `id` field
pass
class save11_Comment(models.Model):
text = models.CharField(max_length=512)
blog = models.ForeignKey(save11_BlogPost, models.deletion.CASCADE, related_name="comments")
def test_save11_init():
blogpost = save11_BlogPost.objects.create()
save11_Comment.objects.create(blog=blogpost, text="foo")
def test_save11_store():
blogpost = save11_BlogPost.objects.first()
for comment in blogpost.comments.all():
comment.text = SOURCE
comment.save()
def test_save11_load():
obj = save11_Comment.objects.first()
SINK(obj.text)
# ------------------------------------------------------------------------------
# Different ways to load data from the DB through the ORM
# ------------------------------------------------------------------------------
class TestLoad(models.Model):
text = models.CharField(max_length=512)
def test_load_init():
for _ in range(10):
obj = TestLoad()
obj.text = SOURCE
obj.save()
def test_load_single():
obj = TestLoad.objects.get(id=1)
SINK(obj.text)
def test_load_many():
objs = TestLoad.objects.all()
for obj in objs:
SINK(obj.text)
SINK(objs[0].text)
def test_load_many_skip():
objs = TestLoad.objects.all()[5:]
for obj in objs:
SINK(obj.text)
SINK(objs[0].text)
def test_load_qs_chain_single():
obj = TestLoad.objects.all().filter(text__contains="s").exclude(text=None).first()
SINK(obj.text)
def test_load_qs_chain_many():
objs = TestLoad.objects.all().filter(text__contains="s").exclude(text=None)
for obj in objs:
SINK(obj.text)
SINK(objs[0].text)
def test_load_values():
# see https://docs.djangoproject.com/en/4.0/ref/models/querysets/#django.db.models.query.QuerySet.values
vals = TestLoad.objects.all().values()
for val in vals:
SINK(val['text'])
SINK(vals[0]['text'])
# only selecting some of the fields
vals = TestLoad.objects.all().values("text")
for val in vals:
SINK(val['text'])
SINK(vals[0]['text'])
def test_load_values_list():
# see https://docs.djangoproject.com/en/4.0/ref/models/querysets/#django.db.models.query.QuerySet.values_list
vals = TestLoad.objects.all().values_list()
for (_id, text) in vals:
SINK(text)
SINK(vals[0][1])
# only selecting some of the fields
vals = TestLoad.objects.all().values_list("text")
for (text,) in vals:
SINK(text)
SINK(vals[0][0])
# with flat=True, each row will not be a tuple, but just the value
vals = TestLoad.objects.all().values_list("text", flat=True)
for text in vals:
SINK(text)
SINK(vals[0])
# Good resources:
# - https://docs.djangoproject.com/en/4.0/topics/db/queries/#making-queries
# - https://docs.djangoproject.com/en/4.0/ref/models/querysets/
# - https://docs.djangoproject.com/en/4.0/ref/models/instances/

View File

@@ -0,0 +1,5 @@
<form action="/mymodel/add/" method="post">
{% comment %} {% csrf_token %} {% endcomment %}
{{ form }}
<input type="submit" value="Submit">
</form>

View File

@@ -1,3 +1,76 @@
from django.test import TestCase
import importlib
import re
import pytest
# Create your tests here.
def discover_save_tests():
mod = importlib.import_module("testapp.orm_tests")
test_names = []
for name in dir(mod):
m = re.match("test_(save.*)_load", name)
if not m:
continue
name = m.group(1)
test_names.append(name)
return test_names
def discover_load_tests():
mod = importlib.import_module("testapp.orm_tests")
test_names = []
for name in dir(mod):
m = re.match("test_(load.*)", name)
if not m:
continue
name = m.group(1)
if name == "load_init":
continue
test_names.append(name)
return test_names
@pytest.mark.django_db
@pytest.mark.parametrize("name", discover_save_tests())
def test_run_save_tests(name):
mod = importlib.import_module("testapp.orm_tests")
init_func = getattr(mod, f"test_{name}_init", None)
store_func = getattr(mod, f"test_{name}_store", None)
load_func = getattr(mod, f"test_{name}_load", None)
if init_func:
init_func()
store_func()
load_func()
has_run_load_init = False
@pytest.fixture
def load_test_init():
from .orm_tests import test_load_init
test_load_init()
@pytest.mark.django_db
@pytest.mark.parametrize("name", discover_load_tests())
def test_run_load_tests(load_test_init, name):
mod = importlib.import_module("testapp.orm_tests")
load_func = getattr(mod, f"test_{name}", None)
load_func()
assert getattr(mod, "TestLoad").objects.count() == 10
@pytest.mark.django_db
def test_mymodel_form_save():
from .orm_form_test import MyModel, MyModelForm
import uuid
text = str(uuid.uuid4())
form = MyModelForm(data={"text": text})
form.save()
obj = MyModel.objects.last()
assert obj.text == text

View File

@@ -0,0 +1,19 @@
from django.urls import path, re_path
from . import orm_security_tests
from . import orm_form_test
urlpatterns = [
path("person/", orm_security_tests.person),
path("show_name/", orm_security_tests.show_name),
path("show_age/", orm_security_tests.show_age),
path("save_comment_validator_not_used/", orm_security_tests.save_comment_validator_not_used),
path("display_comment_validator_not_used/", orm_security_tests.display_comment_validator_not_used),
path("save_comment_validator_used/", orm_security_tests.save_comment_validator_used),
path("display_comment_validator_used/", orm_security_tests.display_comment_validator_used),
path("mymodel/add/", orm_form_test.add_mymodel_handler),
path("mymodel/show/", orm_form_test.show_mymodel_handler),
]

View File

@@ -31,6 +31,7 @@ ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'testapp.apps.TestappConfig',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
@@ -43,7 +44,7 @@ MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
# 'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',

View File

@@ -14,8 +14,9 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path("admin/", admin.site.urls),
path("", include("testapp.urls")),
]