Merge branch 'github:main' into jty/python/emailInjection

This commit is contained in:
Jorge
2021-11-15 16:44:11 +01:00
committed by GitHub
646 changed files with 22372 additions and 7448 deletions

View File

@@ -3,13 +3,25 @@ import pkg # $ use=moduleImport("pkg")
async def foo():
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
coro # $ use=moduleImport("pkg").getMember("async_func").getReturn()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def bar():
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_with():
async with pkg.async_func() as result: # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_for():
async for _ in pkg.async_func(): # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in coro: # $ use=moduleImport("pkg").getMember("async_func").getReturn() MISSING: awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
def check_annotations():
# Just to make sure how annotations should look like :)

View File

@@ -0,0 +1,26 @@
import python
import semmle.python.dataflow.new.DataFlow
import TestUtilities.InlineExpectationsTest
import semmle.python.ApiGraphs
class AwaitedTest extends InlineExpectationsTest {
AwaitedTest() { this = "AwaitedTest" }
override string getARelevantTag() { result = "awaited" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(API::Node awaited, DataFlow::Node use, API::Node pred |
awaited = pred.getAwaited() and
use = awaited.getAUse() and
location = use.getLocation() and
// Module variable nodes have no suitable location, so it's best to simply exclude them entirely
// from the inline tests.
not use instanceof DataFlow::ModuleVariableNode and
exists(location.getFile().getRelativePath())
|
tag = "awaited" and
value = pred.getPath() and
element = use.toString()
)
}
}

View File

@@ -7,9 +7,16 @@ class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
override predicate isSource(DataFlow::Node source) {
// Standard sources
source.(DataFlow::CfgNode).getNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
// User defined sources
exists(CallNode call |
call.getFunction().(NameNode).getId() = "taint" and
source.(DataFlow::CfgNode).getNode() = call.getAnArg()
)
}
override predicate isSink(DataFlow::Node sink) {

View File

@@ -0,0 +1,57 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
async def tainted_coro():
return TAINTED_STRING
async def test_await():
coro = tainted_coro()
taint(coro)
s = await coro
ensure_tainted(coro, s) # $ tainted
class AsyncContext:
async def __aenter__(self):
return TAINTED_STRING
async def __aexit__(self, exc_type, exc, tb):
pass
async def test_async_with():
ctx = AsyncContext()
taint(ctx)
async with ctx as tainted:
ensure_tainted(tainted) # $ tainted
class AsyncIter:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
async def test_async_for():
iter = AsyncIter()
taint(iter)
async for tainted in iter:
ensure_tainted(tainted) # $ tainted
# Make tests runable
import asyncio
asyncio.run(test_await())
asyncio.run(test_async_with())
asyncio.run(test_async_for())

View File

@@ -0,0 +1,30 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
class Iter:
def __iter__(self):
return self
def __next__(self):
raise StopIteration
def test_for():
iter = Iter()
taint(iter)
for tainted in iter:
ensure_tainted(tainted) # $ tainted
# Make tests runable
test_for()

View File

@@ -0,0 +1,60 @@
# Add taintlib to PATH so it can be imported during runtime without any hassle
import sys; import os; sys.path.append(os.path.dirname(os.path.dirname((__file__))))
from taintlib import *
# This has no runtime impact, but allows autocomplete to work
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..taintlib import *
# Actual tests
class Context:
def __enter__(self):
return ""
def __exit__(self, exc_type, exc, tb):
pass
def test_with():
ctx = Context()
taint(ctx)
with ctx as tainted:
ensure_tainted(tainted) # $ tainted
class Context_taint:
def __enter__(self):
return TAINTED_STRING
def __exit__(self, exc_type, exc, tb):
pass
def test_with_taint():
ctx = Context_taint()
with ctx as tainted:
ensure_tainted(tainted) # $ MISSING: tainted
class Context_arg:
def __init__(self, arg):
self.arg = arg
def __enter__(self):
return self.arg
def __exit__(self, exc_type, exc, tb):
pass
def test_with_arg():
ctx = Context_arg(TAINTED_STRING)
with ctx as tainted:
ensure_tainted(tainted) # $ tainted
# Make tests runable
test_with()
test_with_taint()
test_with_arg()

View File

@@ -5,6 +5,11 @@ TAINTED_DICT = {"name": TAINTED_STRING, "some key": "foo"}
NOT_TAINTED = "NOT_TAINTED"
# Use this to force expressions to be tainted
def taint(*args):
pass
def ensure_tainted(*args):
print("- ensure_tainted")
for i, arg in enumerate(args):

View File

@@ -128,6 +128,24 @@ class CodeExecutionTest extends InlineExpectationsTest {
}
}
class SqlConstructionTest extends InlineExpectationsTest {
SqlConstructionTest() { this = "SqlConstructionTest" }
override string getARelevantTag() { result = "constructedSql" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(SqlConstruction e, DataFlow::Node sql |
exists(location.getFile().getRelativePath()) and
sql = e.getSql() and
location = e.getLocation() and
element = sql.toString() and
value = prettyNodeForInlineTest(sql) and
tag = "constructedSql"
)
}
}
class SqlExecutionTest extends InlineExpectationsTest {
SqlExecutionTest() { this = "SqlExecutionTest" }

View File

@@ -30,24 +30,36 @@ DataFlow::Node shouldNotBeTainted() {
)
}
class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
// this module allows the configuration to be imported in other `.ql` files without the
// top level query predicates of this file coming into scope.
module Conf {
class TestTaintTrackingConfiguration extends TaintTracking::Configuration {
TestTaintTrackingConfiguration() { this = "TestTaintTrackingConfiguration" }
override predicate isSource(DataFlow::Node source) {
source.asCfgNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
source instanceof RemoteFlowSource
}
override predicate isSource(DataFlow::Node source) {
source.asCfgNode().(NameNode).getId() in [
"TAINTED_STRING", "TAINTED_BYTES", "TAINTED_LIST", "TAINTED_DICT"
]
or
// User defined sources
exists(CallNode call |
call.getFunction().(NameNode).getId() = "taint" and
source.(DataFlow::CfgNode).getNode() = call.getAnArg()
)
or
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
sink = shouldBeTainted()
or
sink = shouldNotBeTainted()
override predicate isSink(DataFlow::Node sink) {
sink = shouldBeTainted()
or
sink = shouldNotBeTainted()
}
}
}
import Conf
class InlineTaintTest extends InlineExpectationsTest {
InlineTaintTest() { this = "InlineTaintTest" }

View File

@@ -0,0 +1,4 @@
edges
nodes
subpaths
#select

View File

@@ -0,0 +1,25 @@
/**
* @kind path-problem
*/
// This query is for debugging InlineTaintTestFailures.
// The intended usage is
// 1. load the database of the failing test
// 2. run this query to see actual paths
// 3. if necessary, look at partial paths by (un)commenting appropriate lines
import python
import semmle.python.dataflow.new.DataFlow
import experimental.meta.InlineTaintTest::Conf
// import DataFlow::PartialPathGraph
import DataFlow::PathGraph
class Conf extends TestTaintTrackingConfiguration {
override int explorationLimit() { result = 5 }
}
// from Conf config, DataFlow::PartialPathNode source, DataFlow::PartialPathNode sink
// where config.hasPartialFlow(source, sink, _)
from Conf config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "This node receives taint from $@.", source.getNode(),
"this source"