Python: Add basic support for environment/commandargs threat-models

This commit is contained in:
Rasmus Wriedt Larsen
2024-08-09 15:04:53 +02:00
parent 528f08fb83
commit b9239d7101
4 changed files with 84 additions and 2 deletions

View File

@@ -0,0 +1,16 @@
extensions:
- addsTo:
pack: codeql/python-all
extensible: sourceModel
data:
- ['os', 'Member[getenv].ReturnValue', 'environment']
- ['os', 'Member[getenvb].ReturnValue', 'environment']
- ['os', 'Member[environ].DictionaryElementAny', 'environment']
- ['os', 'Member[environb].DictionaryElementAny', 'environment']
- ['posix', 'Member[environ].DictionaryElementAny', 'environment']
- ['sys', 'Member[argv].DictionaryElementAny', 'commandargs']
- ['sys', 'Member[orig_argv].DictionaryElementAny', 'commandargs']
# TODO: argparse
# TODO: input / read from stdin

View File

@@ -3,6 +3,7 @@ import semmle.python.dataflow.new.DataFlow
import semmle.python.Concepts
import TestUtilities.InlineExpectationsTest
private import semmle.python.dataflow.new.internal.PrintNode
private import codeql.threatmodels.ThreatModels
module SystemCommandExecutionTest implements TestSig {
string getARelevantTag() { result = "getCommand" }
@@ -632,6 +633,22 @@ module XmlParsingTest implements TestSig {
}
}
module ThreatModelSourceTest implements TestSig {
string getARelevantTag() {
exists(string kind | knownThreatModel(kind) | result = "threatModelSource" + "[" + kind + "]")
}
predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(ThreatModelSource src | not src.getThreatModel() = "remote" |
location = src.getLocation() and
element = src.toString() and
value = prettyNodeForInlineTest(src) and
tag = "threatModelSource[" + src.getThreatModel() + "]"
)
}
}
import MakeTest<MergeTests5<MergeTests5<SystemCommandExecutionTest, DecodingTest, EncodingTest, LoggingTest,
CodeExecutionTest>,
MergeTests5<SqlConstructionTest, SqlExecutionTest, XPathConstructionTest, XPathExecutionTest,
@@ -642,4 +659,4 @@ import MakeTest<MergeTests5<MergeTests5<SystemCommandExecutionTest, DecodingTest
MergeTests5<FileSystemAccessTest, FileSystemWriteAccessTest, PathNormalizationTest,
SafeAccessCheckTest, PublicKeyGenerationTest>,
MergeTests5<CryptographicOperationTest, HttpClientRequestTest, CsrfProtectionSettingTest,
CsrfLocalProtectionSettingTest, XmlParsingTest>>>
CsrfLocalProtectionSettingTest, MergeTests<XmlParsingTest, ThreatModelSourceTest>>>>

View File

@@ -0,0 +1,49 @@
import os
import sys
import posix
os.getenv("foo") # $ threatModelSource[environment]=os.getenv(..)
os.getenvb("bar") # $ threatModelSource[environment]=os.getenvb(..)
os.environ["foo"] # $ threatModelSource[environment]=os.environ["foo"]
os.environ.get("foo") # $ MISSING: threatModelSource[environment]=os.environ.get(..)
os.environb["bar"] # $ threatModelSource[environment]=os.environb["bar"]
posix.environ[b"foo"] # $ threatModelSource[environment]=posix.environ[b"foo"]
sys.argv[1] # $ threatModelSource[commandargs]=sys.argv[1]
sys.orig_argv[1] # $ threatModelSource[commandargs]=sys.orig_argv[1]
########################################
# argparse
########################################
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("foo")
args = parser.parse_args()
args.foo # $ MISSING: threatModelSource[commandargs]=args.foo
explicit_argv_parsing = parser.parse_args(sys.argv)
explicit_argv_parsing.foo # $ MISSING: threatModelSource[commandargs]=explicit_argv_parsing.foo
fake_args = parser.parse_args(["<foo>"])
fake_args.foo
########################################
# reading input from stdin
########################################
sys.stdin.readline() # $ MISSING: threatModelSource
input() # $ MISSING: threatModelSource
########################################
# socket
########################################
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("example.com", 1234))
s.recv(1024) # $ MISSING: threatModelSource[socket]

View File

@@ -45,7 +45,7 @@ def func2(environ, start_response): # $ requestHandler
start_response(status, headers) # $ headerWriteBulk=headers headerWriteBulkUnsanitized=name,value
return [b"Hello"] # $ HttpResponse responseBody=List
case = sys.argv[1]
case = sys.argv[1] # $ threatModelSource[commandargs]=sys.argv[1]
if case == "1":
server = wsgiref.simple_server.WSGIServer(ADDRESS, wsgiref.simple_server.WSGIRequestHandler)
server.set_app(func)