C#: Add python script for generating YAML files containing data extensions.

This commit is contained in:
Michael Nebel
2022-10-11 15:49:30 +02:00
parent 4972839b69
commit e6a8019c2b
3 changed files with 170 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
# Tool to generate data extensions files based on the existing models.
# Usage:
# python3 ConvertExtensions.py
# (1) A folder named `csharp/ql/lib/ext` will be created, if it doesn't already exist.
# (2) The converted models will be written to `csharp/ql/lib/ext`. One file for each namespace.
import os
import subprocess
import sys
# Add Models as Data script directory to sys.path.
gitroot = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
madpath = os.path.join(gitroot, "misc/scripts/models-as-data/")
sys.path.append(madpath)
import helpers
import convert_extensions as extensions
print('Running script to generate data extensions files from the existing MaD models.')
print('Making a dummy database.')
# Configuration
language = "csharp"
dbDir = "db"
helpers.run_cmd(['codeql', 'database', 'create', f'--language={language}', '-c', 'dotnet clean project/', '-c', 'dotnet build project/', dbDir])
print('Converting data extensions for C#.')
extensions.Converter(language, dbDir).run()
print('Cleanup.')
# Cleanup - delete database.
helpers.remove_dir(dbDir)
print('Done.')

View File

@@ -0,0 +1,95 @@
# Helper functionality for MaD models extensions conversion.
import helpers
import os
import shutil
import subprocess
import sys
import tempfile
def quote_if_needed(v):
# string columns
if type(v) is str:
return "\"" + v + "\""
# bool column
return str(v)
def insert_update(rows, key, value):
if key in rows:
rows[key] += value
else:
rows[key] = value
def merge(*dicts):
merged = {}
for d in dicts:
for entry in d:
insert_update(merged, entry, d[entry])
return merged
def parseData(data):
rows = { }
for row in data:
d = map(quote_if_needed, row)
insert_update(rows, row[0], " - [" + ', '.join(d) + ']\n')
return rows
class Converter:
def __init__(self, language, dbDir):
self.language = language
self.dbDir = dbDir
self.codeQlRoot = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip()
self.extDir = os.path.join(self.codeQlRoot, f"{self.language}/ql/lib/ext/")
self.dirname = "modelconverter"
self.modelFileExtension = ".model.yml"
self.workDir = tempfile.mkdtemp()
def runQuery(self, query):
print('########## Querying: ', query)
queryFile = os.path.join(self.codeQlRoot, f"{self.language}/ql/src/utils/{self.dirname}", query)
resultBqrs = os.path.join(self.workDir, "out.bqrs")
helpers.run_cmd(['codeql', 'query', 'run', queryFile, '--database', self.dbDir, '--output', resultBqrs], "Failed to generate " + query)
return helpers.readData(self.workDir, resultBqrs)
def asAddsTo(self, rows, predicate):
extensions = { }
for key in rows:
extensions[key] = helpers.addsToTemplate.format(f"codeql/{self.language}-all", predicate, rows[key])
return extensions
def getAddsTo(self, query, predicate):
data = self.runQuery(query)
rows = parseData(data)
return self.asAddsTo(rows, predicate)
def makeContent(self):
summaries = self.getAddsTo("ExtractSummaries.ql", helpers.summaryModelPredicate)
sources = self.getAddsTo("ExtractSources.ql", helpers.sourceModelPredicate)
sinks = self.getAddsTo("ExtractSinks.ql", helpers.sinkModelPredicate)
return merge(sources, sinks, summaries)
def save(self, extensions):
# Create directory if it doesn't exist
os.makedirs(self.extDir, exist_ok=True)
# Create a file for each namespace and save models.
extensionTemplate = """
extensions:
{0}
"""
for entry in extensions:
with open(self.extDir + "/" + entry + self.modelFileExtension, "w") as f:
f.write(extensionTemplate.format(extensions[entry]))
def run(self):
extensions = self.makeContent()
self.save(extensions)

View File

@@ -0,0 +1,41 @@
import json
import os
import shutil
import subprocess
# Shared strings.
summaryModelPredicate = "extSummaryModel"
sinkModelPredicate = "extSinkModel"
sourceModelPredicate = "extSourceModel"
addsToTemplate = """
- addsTo:
pack: {0}
extensible: {1}
data:
{2}
"""
def remove_dir(dirName):
if os.path.isdir(dirName):
shutil.rmtree(dirName)
print("Removed directory:", dirName)
def run_cmd(cmd, msg="Failed to run command"):
print('Running ' + ' '.join(cmd))
if subprocess.check_call(cmd):
print(msg)
exit(1)
def readData(workDir, bqrsFile):
generatedJson = os.path.join(workDir, "out.json")
print('Decoding BQRS to JSON.')
run_cmd(['codeql', 'bqrs', 'decode', bqrsFile, '--output', generatedJson, '--format=json'], "Failed to decode BQRS.")
with open(generatedJson) as f:
results = json.load(f)
try:
return results['#select']['tuples']
except KeyError:
print('Unexpected JSON output - no tuples found')
exit(1)