Python: Add more honest test for awaited

This commit is contained in:
Rasmus Lerchedahl Petersen
2021-10-26 10:43:06 +02:00
parent a8a181a32f
commit f91e43c068
3 changed files with 35 additions and 9 deletions

View File

@@ -3,24 +3,24 @@ import pkg # $ use=moduleImport("pkg")
async def foo():
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
coro # $ use=moduleImport("pkg").getMember("async_func").getReturn()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await coro # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def bar():
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited()
result = await pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn().getAwaited() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_with():
async with pkg.async_func() as result: # $ use=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async with pkg.async_func() as result: # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
return result # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
async def test_async_for():
async for _ in pkg.async_func(): # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in pkg.async_func(): # $ use=moduleImport("pkg").getMember("async_func").getReturn() awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
coro = pkg.async_func() # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in coro: # $ use=moduleImport("pkg").getMember("async_func").getReturn()
async for _ in coro: # $ use=moduleImport("pkg").getMember("async_func").getReturn() MISSING: awaited=moduleImport("pkg").getMember("async_func").getReturn()
pass
def check_annotations():

View File

@@ -0,0 +1,26 @@
import python
import semmle.python.dataflow.new.DataFlow
import TestUtilities.InlineExpectationsTest
import semmle.python.ApiGraphs
class AwaitedTest extends InlineExpectationsTest {
AwaitedTest() { this = "AwaitedTest" }
override string getARelevantTag() { result = "awaited" }
override predicate hasActualResult(Location location, string element, string tag, string value) {
exists(API::Node a, DataFlow::Node n, API::Node pred |
a = pred.getAwaited() and
n = a.getAUse() and
location = n.getLocation() and
// Module variable nodes have no suitable location, so it's best to simply exclude them entirely
// from the inline tests.
not n instanceof DataFlow::ModuleVariableNode and
exists(location.getFile().getRelativePath())
|
tag = "awaited" and
value = pred.getPath() and
element = n.toString()
)
}
}