Merge branch 'jorgectf/python/deserialization' of https://github.com/jorgectf/codeql into jorgectf/python/deserialization

This commit is contained in:
jorgectf
2022-01-31 17:48:47 +01:00
3887 changed files with 317569 additions and 114448 deletions

View File

@@ -3,13 +3,25 @@ import pkg # $ use=moduleImport("pkg")
async def foo():
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
coro # $ use=moduleImport("pkg").getMember("async_func").getReturn()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def bar():
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_with():
async with pkg.async_func() as result: # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_for():
async for _ in pkg.async_func(): # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in coro: # $ use=moduleImport("pkg").getMember("async_func").getReturn() MISSING: awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
def check_annotations():
# Just to make sure how annotations should look like :)

View File

@@ -0,0 +1,26 @@
import python
import semmle.python.dataflow.new.DataFlow
import TestUtilities.InlineExpectationsTest
import semmle.python.ApiGraphs
class AwaitedTest extends InlineExpectationsTest {
AwaitedTest() { this = "AwaitedTest" }
override string getARelevantTag() { result = "awaited" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(API::Node awaited, DataFlow::Node use, API::Node pred |
awaited = pred.getAwaited() and
use = awaited.getAUse() and
location = use.getLocation() and
// Module variable nodes have no suitable location, so it's best to simply exclude them entirely
// from the inline tests.
not use instanceof DataFlow::ModuleVariableNode and
exists(location.getFile().getRelativePath())
|
tag = "awaited" and
value = pred.getPath() and
element = use.toString()
)
}
}

View File

@@ -18,7 +18,7 @@ abstract class FlowTest extends InlineExpectationsTest {
location = toNode.getLocation() and
tag = this.flowTag() and
value =
"\"" + prettyNode(fromNode).replaceAll("\"", "'") + lineStr(fromNode, toNode) + " -> " +
"\"" + prettyNode(fromNode).replaceAll("\"", "'") + this.lineStr(fromNode, toNode) + " -> " +
prettyNode(toNode).replaceAll("\"", "'") + "\"" and
element = toNode.toString()
)

View File

@@ -25,11 +25,13 @@ abstract class RoutingTest extends InlineExpectationsTest {
element = fromNode.toString() and
(
tag = this.flowTag() and
if "\"" + tag + "\"" = fromValue(fromNode) then value = "" else value = fromValue(fromNode)
if "\"" + tag + "\"" = this.fromValue(fromNode)
then value = ""
else value = this.fromValue(fromNode)
or
tag = "func" and
value = toFunc(toNode) and
not value = fromFunc(fromNode)
value = this.toFunc(toNode) and
not value = this.fromFunc(fromNode)
)
)
}

View File

@@ -1 +1 @@
known_attr = [1000]
known_attr = [1000] #$ writes=known_attr

View File

@@ -0,0 +1,2 @@
from trois import *
print(foo)

View File

@@ -0,0 +1,15 @@
| test3.py:1:17:1:19 | ControlFlowNode for ImportMember | test3.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:1:1:3 | ControlFlowNode for foo | test1.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:1:1:3 | ControlFlowNode for foo | test3.py:1:17:1:19 | ControlFlowNode for ImportMember |
| three.py:1:1:1:3 | ControlFlowNode for foo | test3.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:1:1:3 | ControlFlowNode for foo | two.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | test1.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | test3.py:1:17:1:19 | ControlFlowNode for ImportMember |
| three.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | test3.py:2:7:2:9 | ControlFlowNode for foo |
| three.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | two.py:2:7:2:9 | ControlFlowNode for foo |
| trois.py:1:1:1:3 | ControlFlowNode for foo | deux.py:2:7:2:9 | ControlFlowNode for foo |
| trois.py:1:1:1:3 | ControlFlowNode for foo | test2.py:2:7:2:9 | ControlFlowNode for foo |
| trois.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | deux.py:2:7:2:9 | ControlFlowNode for foo |
| trois.py:1:7:1:7 | ControlFlowNode for IntegerLiteral | test2.py:2:7:2:9 | ControlFlowNode for foo |
| two.py:2:7:2:9 | ControlFlowNode for foo | test3.py:1:17:1:19 | ControlFlowNode for ImportMember |
| two.py:2:7:2:9 | ControlFlowNode for foo | test3.py:2:7:2:9 | ControlFlowNode for foo |

View File

@@ -0,0 +1,19 @@
import semmle.python.dataflow.new.DataFlow
/**
* A configuration to find all flows.
* To be used on tiny programs.
*/
class AllFlowsConfig extends DataFlow::Configuration {
AllFlowsConfig() { this = "AllFlowsConfig" }
override predicate isSource(DataFlow::Node node) { any() }
override predicate isSink(DataFlow::Node node) { any() }
}
from DataFlow::CfgNode source, DataFlow::CfgNode sink
where
source != sink and
exists(AllFlowsConfig cfg | cfg.hasFlow(source, sink))
select source, sink

View File

@@ -0,0 +1 @@
from two import *

View File

@@ -0,0 +1,2 @@
from one import *
print(foo)

View File

@@ -0,0 +1,2 @@
from un import *
print(foo)

View File

@@ -0,0 +1,2 @@
from one import foo
print(foo)

View File

@@ -0,0 +1 @@
foo = 5

View File

@@ -0,0 +1 @@
foo = 6

View File

@@ -0,0 +1,2 @@
from three import *
print(foo)

View File

@@ -0,0 +1 @@
from deux import *

View File

@@ -0,0 +1,13 @@
import python
import experimental.dataflow.TestUtil.FlowTest
import experimental.dataflow.testConfig
class DataFlowTest extends FlowTest {
DataFlowTest() { this = "DataFlowTest" }
override string flowTag() { result = "flow" }
override predicate relevantFlow(DataFlow::Node source, DataFlow::Node sink) {
exists(TestConfiguration cfg | cfg.hasFlow(source, sink))
}
}

View File

@@ -0,0 +1,156 @@
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from testlib import *
# These are defined so that we can evaluate the test code.
NONSOURCE = "not a source"
SOURCE = "source"
def is_source(x):
return x == "source" or x == b"source" or x == 42 or x == 42.0 or x == 42j
def SINK(x):
if is_source(x):
print("OK")
else:
print("Unexpected flow", x)
def SINK_F(x):
if is_source(x):
print("Unexpected flow", x)
else:
print("OK")
def test_guard():
match SOURCE:
case x if SINK(x): #$ flow="SOURCE, l:-1 -> x"
pass
@expects(2)
def test_as_pattern():
match SOURCE:
case x as y:
SINK(x) #$ flow="SOURCE, l:-2 -> x"
SINK(y) #$ flow="SOURCE, l:-3 -> y"
def test_or_pattern():
match SOURCE:
# We cannot use NONSOURCE in place of "" below, since it would be seen as a variable.
case ("" as x) | x:
SINK(x) #$ flow="SOURCE, l:-3 -> x"
# No flow for literal pattern
def test_literal_pattern():
match SOURCE:
case 42 as x:
SINK(x) #$ flow="SOURCE, l:-2 -> x" flow="42, l:-1 -> x"
def test_capture_pattern():
match SOURCE:
case x:
SINK(x) #$ flow="SOURCE, l:-2 -> x"
# No flow for wildcard pattern
class Unsafe:
VALUE = SOURCE
def test_value_pattern():
match SOURCE:
case Unsafe.VALUE as x:
SINK(x) #$ flow="SOURCE, l:-2 -> x" MISSING: flow="SOURCE, l:-5 -> x"
@expects(2)
def test_sequence_pattern_tuple():
match (NONSOURCE, SOURCE):
case (x, y):
SINK_F(x)
SINK(y) #$ flow="SOURCE, l:-3 -> y"
@expects(2)
def test_sequence_pattern_list():
match [NONSOURCE, SOURCE]:
case [x, y]:
SINK_F(x) #$ SPURIOUS: flow="SOURCE, l:-2 -> x"
SINK(y) #$ flow="SOURCE, l:-3 -> y"
# Sets are excluded from sequence patterns,
# see https://www.python.org/dev/peps/pep-0635/#sequence-patterns
@expects(2)
def test_star_pattern_tuple():
match (NONSOURCE, SOURCE):
case (x, *y):
SINK_F(x)
SINK(y[0]) #$ flow="SOURCE, l:-3 -> y[0]"
@expects(2)
def test_star_pattern_tuple_exclusion():
match (SOURCE, NONSOURCE):
case (x, *y):
SINK(x) #$ flow="SOURCE, l:-2 -> x"
SINK_F(y[0])
@expects(2)
def test_star_pattern_list():
match [NONSOURCE, SOURCE]:
case [x, *y]:
SINK_F(x) #$ SPURIOUS: flow="SOURCE, l:-2 -> x"
SINK(y[0]) #$ flow="SOURCE, l:-3 -> y[0]"
@expects(2)
def test_star_pattern_list_exclusion():
match [SOURCE, NONSOURCE]:
case [x, *y]:
SINK(x) #$ flow="SOURCE, l:-2 -> x"
SINK_F(y[0]) #$ SPURIOUS: flow="SOURCE, l:-3 -> y[0]"
@expects(2)
def test_mapping_pattern():
match {"a": NONSOURCE, "b": SOURCE}:
case {"a": x, "b": y}:
SINK_F(x)
SINK(y) #$ flow="SOURCE, l:-3 -> y"
# also tests the key value pattern
@expects(2)
def test_double_star_pattern():
match {"a": NONSOURCE, "b": SOURCE}:
case {"a": x, **y}:
SINK_F(x)
SINK(y["b"]) #$ flow="SOURCE, l:-3 -> y['b']"
@expects(2)
def test_double_star_pattern_exclusion():
match {"a": SOURCE, "b": NONSOURCE}:
case {"a": x, **y}:
SINK(x) #$ flow="SOURCE, l:-2 -> x"
SINK_F(y["b"])
try:
SINK_F(y["a"])
except KeyError:
pass
class Cell:
def __init__(self, value):
self.value = value
# also tests the keyword pattern
@expects(2)
def test_class_pattern():
bad_cell = Cell(SOURCE)
good_cell = Cell(NONSOURCE)
match bad_cell:
case Cell(value = x):
SINK(x) #$ flow="SOURCE, l:-5 -> x"
match good_cell:
case Cell(value = x):
SINK_F(x)

View File

@@ -112,3 +112,16 @@ print(foo) # $ SensitiveUse=password
harmless = lambda: "bar"
bar = call_wrapper(harmless)
print(bar) # $ SPURIOUS: SensitiveUse=password
# ------------------------------------------------------------------------------
# cross-talk in dictionary.
# ------------------------------------------------------------------------------
from unknown_settings import password # $ SensitiveDataSource=password
print(password) # $ SensitiveUse=password
_config = {"sleep_timer": 5, "mysql_password": password}
# since we have taint-step from store of `password`, we will consider any item in the
# dictionary to be a password :(
print(_config["sleep_timer"]) # $ SPURIOUS: SensitiveUse=password

View File

@@ -7,9 +7,16 @@ class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
override predicate isSource(DataFlow::Node source) {
// Standard sources
source.(DataFlow::CfgNode).getNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
// User defined sources
exists(CallNode call |
call.getFunction().(NameNode).getId() = "taint" and
source.(DataFlow::CfgNode).getNode() = call.getAnArg()
)
}
override predicate isSink(DataFlow::Node sink) {

View File

@@ -77,6 +77,57 @@ def test_in_set():
ensure_tainted(ts) # $ tainted
def test_in_local_variable():
ts = TAINTED_STRING
safe = ["safe", "also_safe"]
if ts in safe:
ensure_not_tainted(ts) # $ SPURIOUS: tainted
else:
ensure_tainted(ts) # $ tainted
SAFE = ["safe", "also_safe"]
def test_in_global_variable():
ts = TAINTED_STRING
if ts in SAFE:
ensure_not_tainted(ts) # $ SPURIOUS: tainted
else:
ensure_tainted(ts) # $ tainted
# these global variables can be modified, so should not be considered safe
SAFE_mod_1 = ["safe", "also_safe"]
SAFE_mod_2 = ["safe", "also_safe"]
SAFE_mod_3 = ["safe", "also_safe"]
def make_modification(x):
global SAFE_mod_2, SAFE_mod_3
SAFE_mod_1.append(x)
SAFE_mod_2 += [x]
SAFE_mod_3 = SAFE_mod_3 + [x]
def test_in_modified_global_variable():
ts = TAINTED_STRING
if ts in SAFE_mod_1:
ensure_tainted(ts) # $ tainted
else:
ensure_tainted(ts) # $ tainted
if ts in SAFE_mod_2:
ensure_tainted(ts) # $ tainted
else:
ensure_tainted(ts) # $ tainted
if ts in SAFE_mod_3:
ensure_tainted(ts) # $ tainted
else:
ensure_tainted(ts) # $ tainted
def test_in_unsafe1(xs):
ts = TAINTED_STRING
if ts in xs:
@@ -131,6 +182,10 @@ test_non_eq2()
test_in_list()
test_in_tuple()
test_in_set()
test_in_local_variable()
test_in_global_variable()
make_modification("unsafe")
test_in_modified_global_variable()
test_in_unsafe1(["unsafe", "foo"])
test_in_unsafe2("unsafe")
test_not_in1()

View File

@@ -0,0 +1,57 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
async def tainted_coro():
return TAINTED_STRING
async def test_await():
coro = tainted_coro()
taint(coro)
s = await coro
ensure_tainted(coro, s) # $ tainted
class AsyncContext:
async def __aenter__(self):
return TAINTED_STRING
async def __aexit__(self, exc_type, exc, tb):
pass
async def test_async_with():
ctx = AsyncContext()
taint(ctx)
async with ctx as tainted:
ensure_tainted(tainted) # $ tainted
class AsyncIter:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
async def test_async_for():
iter = AsyncIter()
taint(iter)
async for tainted in iter:
ensure_tainted(tainted) # $ tainted
# Make tests runable
import asyncio
asyncio.run(test_await())
asyncio.run(test_async_with())
asyncio.run(test_async_for())

View File

@@ -0,0 +1,30 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
class Iter:
def __iter__(self):
return self
def __next__(self):
raise StopIteration
def test_for():
iter = Iter()
taint(iter)
for tainted in iter:
ensure_tainted(tainted) # $ tainted
# Make tests runable
test_for()

View File

@@ -0,0 +1,60 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
class Context:
def __enter__(self):
return ""
def __exit__(self, exc_type, exc, tb):
pass
def test_with():
ctx = Context()
taint(ctx)
with ctx as tainted:
ensure_tainted(tainted) # $ tainted
class Context_taint:
def __enter__(self):
return TAINTED_STRING
def __exit__(self, exc_type, exc, tb):
pass
def test_with_taint():
ctx = Context_taint()
with ctx as tainted:
ensure_tainted(tainted) # $ MISSING: tainted
class Context_arg:
def __init__(self, arg):
self.arg = arg
def __enter__(self):
return self.arg
def __exit__(self, exc_type, exc, tb):
pass
def test_with_arg():
ctx = Context_arg(TAINTED_STRING)
with ctx as tainted:
ensure_tainted(tainted) # $ tainted
# Make tests runable
test_with()
test_with_taint()
test_with_arg()

View File

@@ -5,6 +5,11 @@ TAINTED_DICT = {"name": TAINTED_STRING, "some key": "foo"}
NOT_TAINTED = "NOT_TAINTED"
# Use this to force expressions to be tainted
def taint(*args):
pass
def ensure_tainted(*args):
print("- ensure_tainted")
for i, arg in enumerate(args):

View File

@@ -57,6 +57,10 @@ if __name__ == "__main__":
check_tests_valid("variable-capture.nonlocal")
check_tests_valid("variable-capture.dict")
check_tests_valid("module-initialization.multiphase")
# The below will fail unless we use Python 3.10 or newer.
# check_tests_valid("match.test")
# The below fails when trying to import modules
# check_tests_valid("module-initialization.test")
# check_tests_valid("module-initialization.testOnce")

View File

@@ -128,6 +128,24 @@ class CodeExecutionTest extends InlineExpectationsTest {
}
}
class SqlConstructionTest extends InlineExpectationsTest {
SqlConstructionTest() { this = "SqlConstructionTest" }
override string getARelevantTag() { result = "constructedSql" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(SqlConstruction e, DataFlow::Node sql |
exists(location.getFile().getRelativePath()) and
sql = e.getSql() and
location = e.getLocation() and
element = sql.toString() and
value = prettyNodeForInlineTest(sql) and
tag = "constructedSql"
)
}
}
class SqlExecutionTest extends InlineExpectationsTest {
SqlExecutionTest() { this = "SqlExecutionTest" }
@@ -457,3 +475,31 @@ class CryptographicOperationTest extends InlineExpectationsTest {
)
}
}
class HttpClientRequestTest extends InlineExpectationsTest {
HttpClientRequestTest() { this = "HttpClientRequestTest" }
override string getARelevantTag() {
result in ["clientRequestUrlPart", "clientRequestCertValidationDisabled"]
}
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(HTTP::Client::Request req, DataFlow::Node url |
url = req.getAUrlPart() and
location = url.getLocation() and
element = url.toString() and
value = prettyNodeForInlineTest(url) and
tag = "clientRequestUrlPart"
)
or
exists(location.getFile().getRelativePath()) and
exists(HTTP::Client::Request req |
req.disablesCertificateValidation(_, _) and
location = req.getLocation() and
element = req.toString() and
value = "" and
tag = "clientRequestCertValidationDisabled"
)
}
}

View File

@@ -30,24 +30,36 @@ DataFlow::Node shouldNotBeTainted() {
)
}
class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
// this module allows the configuration to be imported in other `.ql` files without the
// top level query predicates of this file coming into scope.
module Conf {
class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
override predicate isSource(DataFlow::Node source) {
source.asCfgNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
source instanceof RemoteFlowSource
}
override predicate isSource(DataFlow::Node source) {
source.asCfgNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
// User defined sources
exists(CallNode call |
call.getFunction().(NameNode).getId() = "taint" and
source.(DataFlow::CfgNode).getNode() = call.getAnArg()
)
or
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
sink = shouldBeTainted()
or
sink = shouldNotBeTainted()
override predicate isSink(DataFlow::Node sink) {
sink = shouldBeTainted()
or
sink = shouldNotBeTainted()
}
}
}
import Conf
class InlineTaintTest extends InlineExpectationsTest {
InlineTaintTest() { this = "InlineTaintTest" }

View File

@@ -0,0 +1,4 @@
edges
nodes
subpaths
#select

View File

@@ -0,0 +1,25 @@
/**
* @kind path-problem
*/
// This query is for debugging InlineTaintTestFailures.
// The intended usage is
// 1. load the database of the failing test
// 2. run this query to see actual paths
// 3. if necessary, look at partial paths by (un)commenting appropriate lines
import python
import semmle.python.dataflow.new.DataFlow
import experimental.meta.InlineTaintTest::Conf
// import DataFlow::PartialPathGraph
import DataFlow::PathGraph
class Conf extends TestTaintTrackingConfiguration {
override int explorationLimit() { result = 5 }
}
// from Conf config, DataFlow::PartialPathNode source, DataFlow::PartialPathNode sink
// where config.hasPartialFlow(source, sink, _)
from Conf config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "This node receives taint from $@.", source.getNode(),
"this source"

View File

@@ -0,0 +1,4 @@
edges
nodes
subpaths
#select

View File

@@ -0,0 +1,25 @@
/**
* @kind path-problem
*/
// This query is for debugging InlineTaintTestFailures.
// The intended usage is
// 1. load the database of the failing test
// 2. run this query to see actual paths
// 3. if necessary, look at partial paths by (un)commenting appropriate lines
import python
import semmle.python.dataflow.new.DataFlow
import experimental.dataflow.testConfig
// import DataFlow::PartialPathGraph
import DataFlow::PathGraph
class Conf extends TestConfiguration {
override int explorationLimit() { result = 5 }
}
// from Conf config, DataFlow::PartialPathNode source, DataFlow::PartialPathNode sink
// where config.hasPartialFlow(source, sink, _)
from Conf config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "This node receives taint from $@.", source.getNode(),
"this source"

View File

@@ -0,0 +1,8 @@
| authlib.py:11:1:11:39 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| authlib.py:12:1:12:50 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| pyjwt.py:10:1:10:29 | ControlFlowNode for Attribute() | This JWT encoding has an empty algorithm. |
| pyjwt.py:10:1:10:29 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| pyjwt.py:13:1:13:40 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| pyjwt.py:14:1:14:44 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| python_jose.py:10:1:10:40 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |
| python_jose.py:11:1:11:44 | ControlFlowNode for Attribute() | This JWT encoding has an empty key. |

View File

@@ -0,0 +1 @@
experimental/Security/CWE-347/JWTEmptyKeyOrAlgorithm.ql

View File

@@ -0,0 +1,3 @@
| pyjwt.py:22:12:22:16 | ControlFlowNode for token | is not verified with a cryptographic secret or public key. |
| pyjwt.py:23:12:23:16 | ControlFlowNode for token | is not verified with a cryptographic secret or public key. |
| python_jose.py:19:12:19:16 | ControlFlowNode for token | is not verified with a cryptographic secret or public key. |

View File

@@ -0,0 +1 @@
experimental/Security/CWE-347/JWTMissingSecretOrPublicKeyVerification.ql

View File

@@ -0,0 +1,18 @@
from authlib.jose import jwt # It is already a JsonWebToken object
from authlib.jose import JsonWebToken
# Encoding
# good - key and algorithm supplied
jwt.encode({"alg": "HS256"}, token, "key")
JsonWebToken().encode({"alg": "HS256"}, token, "key")
# bad - empty key
jwt.encode({"alg": "HS256"}, token, "")
JsonWebToken().encode({"alg": "HS256"}, token, "")
# Decoding
# good - "it will raise BadSignatureError when signature doesnt match"
jwt.decode(token, key)
JsonWebToken().decode(token, key)

View File

@@ -0,0 +1,31 @@
import jwt
# Encoding
# good - key and algorithm supplied
jwt.encode(token, "key", "HS256")
jwt.encode(token, key="key", algorithm="HS256")
# bad - both key and algorithm set to None
jwt.encode(token, None, None)
# bad - empty key
jwt.encode(token, "", algorithm="HS256")
jwt.encode(token, key="", algorithm="HS256")
# Decoding
# good
jwt.decode(token, "key", "HS256")
# bad - unverified decoding
jwt.decode(token, verify=False)
jwt.decode(token, key, options={"verify_signature": False})
# good - verified decoding
jwt.decode(token, verify=True)
jwt.decode(token, key, options={"verify_signature": True})
def indeterminate(verify):
jwt.decode(token, key, verify)

View File

@@ -0,0 +1,22 @@
from jose import jwt
# Encoding
# good - key and algorithm supplied
jwt.encode(token, "key", "HS256")
jwt.encode(token, key="key", algorithm="HS256")
# bad - empty key
jwt.encode(token, "", algorithm="HS256")
jwt.encode(token, key="", algorithm="HS256")
# Decoding
# good
jwt.decode(token, "key", "HS256")
# bad - unverified decoding
jwt.decode(token, key, options={"verify_signature": False})
# good - verified decoding
jwt.decode(token, key, options={"verify_signature": True})