Merge pull request #8506 from michaelnebel/java/generalize-generate-flow-model

Java/C#: Generalize script for generating flow models.
This commit is contained in:
Michael Nebel
2022-03-25 16:20:53 +01:00
committed by GitHub
2 changed files with 212 additions and 172 deletions

View File

@@ -1,178 +1,15 @@
#!/usr/bin/python3
import json
import os
import os.path
import shlex
import subprocess
import sys
import tempfile
import os.path
import subprocess
# Add Model as Data script directory to sys.path.
gitroot = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
madpath = os.path.join(gitroot, "misc/scripts/models-as-data/")
sys.path.append(madpath)
def printHelp():
print("""Usage:
GenerateFlowModel.py <library-database> <outputQll> [--with-sinks] [--with-sources] [--with-summaries]
import generate_flow_model as model
This generates summary, source and sink models for the code in the database.
The files will be placed in `java/ql/lib/semmle/code/java/frameworks/<outputQll>` where
outputQll is the name (and path) of the output QLL file. Usually, models are grouped by their
respective frameworks.
Which models are generated is controlled by the flags:
--with-sinks
--with-sources
--with-summaries
If none of these flags are specified, all models are generated.
Example invocations:
$ GenerateFlowModel.py /tmp/dbs/apache_commons-codec_45649c8 "apache/Codec.qll"
$ GenerateFlowModel.py /tmp/dbs/jdk15_db "javase/jdk_sinks.qll" --with-sinks
Requirements: `codeql` should both appear on your path.
""")
if any(s == "--help" for s in sys.argv):
printHelp()
sys.exit(0)
generateSinks = False
generateSources = False
generateSummaries = False
if "--with-sinks" in sys.argv:
sys.argv.remove("--with-sinks")
generateSinks = True
if "--with-sources" in sys.argv:
sys.argv.remove("--with-sources")
generateSources = True
if "--with-summaries" in sys.argv:
sys.argv.remove("--with-summaries")
generateSummaries = True
if not generateSinks and not generateSources and not generateSummaries:
generateSinks = generateSources = generateSummaries = True
if len(sys.argv) != 3:
printHelp()
sys.exit(1)
codeQlRoot = subprocess.check_output(
["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
targetQll = sys.argv[2]
if not targetQll.endswith(".qll"):
targetQll += ".qll"
filename = os.path.basename(targetQll)
shortname = filename[:-4]
generatedFrameworks = os.path.join(
codeQlRoot, "java/ql/lib/semmle/code/java/frameworks/")
frameworkTarget = os.path.join(generatedFrameworks, targetQll)
workDir = tempfile.mkdtemp()
os.makedirs(generatedFrameworks, exist_ok=True)
def runQuery(infoMessage, query):
print("########## Querying " + infoMessage + "...")
database = sys.argv[1]
queryFile = os.path.join(os.path.dirname(
__file__), query)
resultBqrs = os.path.join(workDir, "out.bqrs")
cmd = ['codeql', 'query', 'run', queryFile, '--database',
database, '--output', resultBqrs, '--threads', '8']
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to generate " + infoMessage +
". Failed command was: " + shlex.join(cmd))
sys.exit(1)
return readRows(resultBqrs)
def readRows(bqrsFile):
generatedJson = os.path.join(workDir, "out.json")
cmd = ['codeql', 'bqrs', 'decode', bqrsFile,
'--format=json', '--output', generatedJson]
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to decode BQRS. Failed command was: " + shlex.join(cmd))
sys.exit(1)
with open(generatedJson) as f:
results = json.load(f)
try:
results['#select']['tuples']
except KeyError:
print('Unexpected JSON output - no tuples found')
exit(1)
rows = ""
for (row) in results['#select']['tuples']:
rows += " \"" + row[0] + "\",\n"
return rows[:-2]
def asCsvModel(superclass, kind, rows):
classTemplate = """
private class {0}{1}Csv extends {2} {{
override predicate row(string row) {{
row =
[
{3}
]
}}
}}
"""
if rows.strip() == "":
return ""
return classTemplate.format(shortname[0].upper() + shortname[1:], kind.capitalize(), superclass, rows)
if generateSummaries:
summaryRows = runQuery("summary models", "CaptureSummaryModels.ql")
summaryCsv = asCsvModel("SummaryModelCsv", "summary", summaryRows)
else:
summaryCsv = ""
if generateSinks:
sinkRows = runQuery("sink models", "CaptureSinkModels.ql")
sinkCsv = asCsvModel("SinkModelCsv", "sinks", sinkRows)
else:
sinkCsv = ""
if generateSources:
sourceRows = runQuery("source models", "CaptureSourceModels.ql")
sourceCsv = asCsvModel("SourceModelCsv", "sources", sourceRows)
else:
sourceCsv = ""
qllTemplate = """
/** Definitions of taint steps in the {0} framework */
import java
private import semmle.code.java.dataflow.ExternalFlow
{1}
{2}
{3}
"""
qllContents = qllTemplate.format(shortname, sinkCsv, sourceCsv, summaryCsv)
with open(frameworkTarget, "w") as frameworkQll:
frameworkQll.write(qllContents)
cmd = ['codeql', 'query', 'format', '--in-place', frameworkTarget]
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to format query. Failed command was: " + shlex.join(cmd))
sys.exit(1)
print("")
print("CSV model written to " + frameworkTarget)
language = "java"
model.Generator.make(language).run()

View File

@@ -0,0 +1,203 @@
#!/usr/bin/python3
import json
import os
import os.path
import shlex
import subprocess
import sys
import tempfile
class Generator:
def __init__ (self, language):
self.language = language
self.generateSinks = False
self.generateSources = False
self.generateSummaries = False
self.dryRun = False
def printHelp(self):
print(f"""Usage:
python3 GenerateFlowModel.py <library-database> <outputQll> [--with-sinks] [--with-sources] [--with-summaries] [--dry-run]
This generates summary, source and sink models for the code in the database.
The files will be placed in `{self.language}/ql/lib/semmle/code/{self.language}/frameworks/<outputQll>` where
outputQll is the name (and path) of the output QLL file. Usually, models are grouped by their
respective frameworks.
Which models are generated is controlled by the flags:
--with-sinks
--with-sources
--with-summaries
If none of these flags are specified, all models are generated.
--dry-run: Only run the queries, but don't write to file.
Example invocations:
$ python3 GenerateFlowModel.py /tmp/dbs/my_library_db "mylibrary/Framework.qll"
$ python3 GenerateFlowModel.py /tmp/dbs/my_library_db "mylibrary/FrameworkSinks.qll" --with-sinks
Requirements: `codeql` should both appear on your path.
""")
def setenvironment(self, target, database):
self.codeQlRoot = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
if not target.endswith(".qll"):
target += ".qll"
self.filename = os.path.basename(target)
self.shortname = self.filename[:-4]
self.database = database
self.generatedFrameworks = os.path.join(
self.codeQlRoot, f"{self.language}/ql/lib/semmle/code/{self.language}/frameworks/")
self.frameworkTarget = os.path.join(self.generatedFrameworks, target)
self.workDir = tempfile.mkdtemp()
os.makedirs(self.generatedFrameworks, exist_ok=True)
@staticmethod
def make(language):
generator = Generator(language)
if any(s == "--help" for s in sys.argv):
generator.printHelp()
sys.exit(0)
if "--with-sinks" in sys.argv:
sys.argv.remove("--with-sinks")
generator.generateSinks = True
if "--with-sources" in sys.argv:
sys.argv.remove("--with-sources")
generator.generateSources = True
if "--with-summaries" in sys.argv:
sys.argv.remove("--with-summaries")
generator.generateSummaries = True
if "--dry-run" in sys.argv:
sys.argv.remove("--dry-run")
generator.dryRun = True
if not generator.generateSinks and not generator.generateSources and not generator.generateSummaries:
generator.generateSinks = generator.generateSources = generator.generateSummaries = True
if len(sys.argv) != 3:
generator.printHelp()
sys.exit(1)
generator.setenvironment(sys.argv[2], sys.argv[1])
return generator
def runQuery(self, infoMessage, query):
print("########## Querying " + infoMessage + "...")
queryFile = os.path.join(self.codeQlRoot, f"{self.language}/ql/src/utils/model-generator", query)
resultBqrs = os.path.join(self.workDir, "out.bqrs")
cmd = ['codeql', 'query', 'run', queryFile, '--database',
self.database, '--output', resultBqrs, '--threads', '8']
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to generate " + infoMessage +
". Failed command was: " + shlex.join(cmd))
sys.exit(1)
return self.readRows(resultBqrs)
def readRows(self, bqrsFile):
generatedJson = os.path.join(self.workDir, "out.json")
cmd = ['codeql', 'bqrs', 'decode', bqrsFile,
'--format=json', '--output', generatedJson]
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to decode BQRS. Failed command was: " + shlex.join(cmd))
sys.exit(1)
with open(generatedJson) as f:
results = json.load(f)
try:
results['#select']['tuples']
except KeyError:
print('Unexpected JSON output - no tuples found')
exit(1)
rows = ""
for (row) in results['#select']['tuples']:
rows += " \"" + row[0] + "\",\n"
return rows[:-2]
def asCsvModel(self, superclass, kind, rows):
classTemplate = """
private class {0}{1}Csv extends {2} {{
override predicate row(string row) {{
row =
[
{3}
]
}}
}}
"""
if rows.strip() == "":
return ""
return classTemplate.format(self.shortname[0].upper() + self.shortname[1:], kind.capitalize(), superclass, rows)
def makeContent(self):
if self.generateSummaries:
summaryRows = self.runQuery("summary models", "CaptureSummaryModels.ql")
summaryCsv = self.asCsvModel("SummaryModelCsv", "summary", summaryRows)
else:
summaryCsv = ""
if self.generateSinks:
sinkRows = self.runQuery("sink models", "CaptureSinkModels.ql")
sinkCsv = self.asCsvModel("SinkModelCsv", "sinks", sinkRows)
else:
sinkCsv = ""
if self.generateSources:
sourceRows = self.runQuery("source models", "CaptureSourceModels.ql")
sourceCsv = self.asCsvModel("SourceModelCsv", "sources", sourceRows)
else:
sourceCsv = ""
return f"""
/** Definitions of taint steps in the {self.shortname} framework */
import {self.language}
private import semmle.code.{self.language}.dataflow.ExternalFlow
{sinkCsv}
{sourceCsv}
{summaryCsv}
"""
def save(self, content):
with open(self.frameworkTarget, "w") as frameworkQll:
frameworkQll.write(content)
cmd = ['codeql', 'query', 'format', '--in-place', self.frameworkTarget]
ret = subprocess.call(cmd)
if ret != 0:
print("Failed to format query. Failed command was: " + shlex.join(cmd))
sys.exit(1)
print("")
print("CSV model written to " + self.frameworkTarget)
def run(self):
content = self.makeContent()
if self.dryRun:
print("CSV Models generated, but not written to file.")
sys.exit(0)
self.save(content)