Files
sarif-cli/bin/sarif-to-table
michael hohn edce50fb79 Add missing result[] handling
fixes missing result errors
       #+BEGIN_SRC text
         Traceback (most recent call last):
           File "/mnt/common/home/hohn/work-gh/sarif-cli/.venv-ubuserv/bin/sarif-to-table", line 125, in <module>
             num_results = len(S.get(sarif_struct, 'runs', runi, 'results'))
           File "/mnt/common/home/hohn/work-gh/sarif-cli/sarif_cli/traverse.py", line 169, in get
             res = res[p]
         KeyError: 'results'
         76% 3204:1006=7s ./repos/RasaHQ/rasa/code-scanning/analyses/132221999.sarif                                Traceback (most recent call last):
           File "/mnt/common/home/hohn/work-gh/sarif-cli/.venv-ubuserv/bin/sarif-to-table", line 125, in <module>
             num_results = len(S.get(sarif_struct, 'runs', runi, 'results'))
           File "/mnt/common/home/hohn/work-gh/sarif-cli/sarif_cli/traverse.py", line 169, in get
             res = res[p]
         KeyError: 'results'
       #+END_SRC
2025-10-20 21:33:02 -07:00

306 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import json
import sarif_cli.traverse as S
import sys
import sqlite3
import hashlib
# --------------------------------------------------------------------
# Argument parsing
# --------------------------------------------------------------------
parser = argparse.ArgumentParser(description='summary of results, stored in sqlite')
parser.add_argument('file', metavar='sarif-file', type=str,
help='input file, - for stdin')
parser.add_argument('dbfile', metavar='db-file', type=str,
help='sqlite database file to append results to')
parser.add_argument('-s', '--list-source', metavar='srcroot', type=str,
help='list source snippets using srcroot as sarif SRCROOT')
parser.add_argument('-r', '--related-locations', action="store_true",
help='list related locations like "hides "')
parser.add_argument('-e', '--endpoints-only', action="store_true",
help='only list source and sink, dropping the path. Identical, successive source/sink pairs are combined')
args = parser.parse_args()
# --------------------------------------------------------------------
# Read SARIF
# --------------------------------------------------------------------
with open(args.file, 'r') if args.file != '-' else sys.stdin as fp:
sarif_struct = json.load(fp)
if not S.is_sarif_struct(sarif_struct):
S.msg("ERROR: invalid json contents in %s\n" % (args.file))
S.dbg("invalid json contents in %s\n" % (args.file))
sys.exit(0)
# --------------------------------------------------------------------
# Compute unique id (tool version, git commit, date)
# --------------------------------------------------------------------
def compute_unique_id(sarif_struct, runi, sarif_file):
def _safeget(*path):
try:
return S.get(*path)
except Exception:
return None
tool_version = _safeget(sarif_struct, 'runs', runi, 'tool', 'driver', 'version')
revision_id = _safeget(sarif_struct, 'runs', runi, 'versionControlProvenance', 0, 'revisionId')
start_time = _safeget(sarif_struct, 'runs', runi, 'invocations', 0, 'startTimeUtc')
seed = f"{tool_version or ''}|{revision_id or ''}|{start_time or ''}|{sarif_file}"
return hashlib.sha1(seed.encode('utf-8')).hexdigest()
# --------------------------------------------------------------------
# Define keep_with_context inside S
# --------------------------------------------------------------------
def _init_db(dbfile):
conn = sqlite3.connect(dbfile)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS sarif_results (
sarif_file TEXT,
unique_id TEXT,
runi INTEGER,
resi INTEGER,
codefi INTEGER,
threadi INTEGER,
loci INTEGER,
related_index INTEGER,
artifact_uri TEXT,
l1 INTEGER,
c1 INTEGER,
l2 INTEGER,
c2 INTEGER,
line_num INTEGER,
msg_type TEXT,
message TEXT,
source_line TEXT,
rule_id TEXT
);
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_artifact_uri ON sarif_results(artifact_uri);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_runi_resi ON sarif_results(runi, resi);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_msg_type ON sarif_results(msg_type);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_unique_id ON sarif_results(unique_id);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_rule_id ON sarif_results(rule_id);")
conn.commit()
return conn
_conn = _init_db(args.dbfile)
_buffer = []
_COMMIT_INTERVAL = 1000
def _flush_buffer():
global _buffer
if not _buffer:
return
cur = _conn.cursor()
cur.executemany("""
INSERT INTO sarif_results (
sarif_file, unique_id, runi, resi, codefi, threadi, loci, related_index,
artifact_uri, l1, c1, l2, c2, line_num, msg_type, message, source_line, rule_id
) VALUES (
:sarif_file, :unique_id, :runi, :resi, :codefi, :threadi, :loci, :related_index,
:artifact_uri, :l1, :c1, :l2, :c2, :line_num, :msg_type, :message, :source_line, :rule_id
)
""", _buffer)
_conn.commit()
_buffer = []
def keep_with_context(ctx):
global _buffer
_buffer.append(ctx)
if len(_buffer) >= _COMMIT_INTERVAL:
_flush_buffer()
S.keep_with_context = keep_with_context
import atexit
atexit.register(_flush_buffer)
# --------------------------------------------------------------------
# Traverse SARIF
# --------------------------------------------------------------------
for runi in S.indices(sarif_struct, 'runs'):
unique_id = compute_unique_id(sarif_struct, runi, args.file)
run_obj = S.get(sarif_struct, 'runs', runi)
results = run_obj.get('results', [])
if not results:
S.dbg(f"Skipping {args.file} run {runi}: no results key\n")
continue
num_results = len(results)
for resi in S.indices(sarif_struct, 'runs', runi, 'results'):
result = S.get(sarif_struct, 'runs', runi, 'results', resi)
rule_id = result.get("ruleId")
if not rule_id:
try:
rule_id = S.get(result, "rule", "id")
except Exception:
rule_id = None
# ---------------- Locations (non-path problems)
if 'locations' in result:
message, artifact, region = S.get_location_message_info(result)
if region == S.WholeFile:
l1, c1, l2, c2 = -1, -1, -1, -1
else:
l1, c1, l2, c2 = S.lineinfo(region)
filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2)
S.msg("RESULT: %s: %s\n" % (filepath, message))
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None, "loci": None, "related_index": None,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": None, "msg_type": "RESULT",
"message": message, "source_line": "", "rule_id": rule_id
})
if region != S.WholeFile and args.list_source:
lines = S.load_lines(args.list_source, artifact['uri'], l1, l2)
for line, line_num in zip(lines, range(l1, l2 + 1)):
S.display_underlined(l1, c1, l2, c2, line, line_num)
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None, "loci": None, "related_index": None,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": line_num, "msg_type": "SOURCE",
"message": message, "source_line": line, "rule_id": rule_id
})
if args.related_locations:
relatedLocations = result.get('relatedLocations', None)
if isinstance(relatedLocations, list):
for relo_index, relo in enumerate(relatedLocations):
message, artifact, region = S.get_relatedlocation_message_info(relo)
if artifact == S.NoFile:
S.msg("REFERENCE: %s: %s\n" % ("<NoFile>", message))
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None,
"loci": None, "related_index": relo_index,
"artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1,
"line_num": None, "msg_type": "REFERENCE",
"message": message, "source_line": "", "rule_id": rule_id
})
else:
if region == S.WholeFile:
l1, c1, l2, c2 = -1, -1, -1, -1
else:
l1, c1, l2, c2 = S.lineinfo(region)
filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2)
S.msg("REFERENCE: %s: %s\n" % (filepath, message))
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None,
"loci": None, "related_index": relo_index,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": None, "msg_type": "REFERENCE",
"message": message, "source_line": "", "rule_id": rule_id
})
if args.list_source:
lines = S.load_lines(args.list_source, artifact['uri'], l1, l2)
for line, line_num in zip(lines, range(l1, l2 + 1)):
S.display_underlined(l1, c1, l2, c2, line, line_num)
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None,
"loci": None, "related_index": relo_index,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": line_num, "msg_type": "SOURCE",
"message": message, "source_line": line, "rule_id": rule_id
})
# ---------------- CodeFlows (path problems)
if 'codeFlows' in result:
last_codeFlow = None
for codefi in S.indices(result, 'codeFlows'):
codeFlow = S.get(result, 'codeFlows', codefi)
S.msg("PATH %d\n" % codefi)
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi, "codefi": codefi,
"threadi": None, "loci": None, "related_index": None,
"artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1,
"line_num": None, "msg_type": "PATH",
"message": "", "source_line": "", "rule_id": rule_id
})
for threadi in S.indices(codeFlow, 'threadFlows'):
threadFlow = S.get(codeFlow, 'threadFlows', threadi)
if args.endpoints_only:
t1 = S.indices(threadFlow, 'locations')
location_range = [t1[0], t1[-1]]
if (last_codeFlow and
(S.get(last_codeFlow, 'threadFlows', threadi, 'locations', 0) ==
S.get(codeFlow, 'threadFlows', threadi, 'locations', 0)) and
(S.get(last_codeFlow, 'threadFlows', threadi, 'locations', -1) ==
S.get(codeFlow, 'threadFlows', threadi, 'locations', -1))):
continue
else:
location_range = S.indices(threadFlow, 'locations')
for loci in location_range:
location = S.get(threadFlow, 'locations', loci, 'location')
message, artifact, region = S.get_relatedlocation_message_info(location)
if artifact == S.NoFile:
S.msg("FLOW STEP %d: %s: %s\n" % (loci, "<NoFile>", message))
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": codefi, "threadi": threadi,
"loci": loci, "related_index": None,
"artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1,
"line_num": None, "msg_type": "FLOW_STEP",
"message": message, "source_line": "", "rule_id": rule_id
})
else:
if region == S.WholeFile:
l1, c1, l2, c2 = -1, -1, -1, -1
else:
l1, c1, l2, c2 = S.lineinfo(region)
filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2)
S.msg("FLOW STEP %d: %s: %s\n" % (loci, filepath, message))
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": codefi, "threadi": threadi,
"loci": loci, "related_index": None,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": None, "msg_type": "FLOW_STEP",
"message": message, "source_line": "", "rule_id": rule_id
})
if args.list_source:
lines = S.load_lines(args.list_source, artifact['uri'], l1, l2)
for line, line_num in zip(lines, range(l1, l2 + 1)):
S.display_underlined(l1, c1, l2, c2, line, line_num)
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": codefi, "threadi": threadi,
"loci": loci, "related_index": None,
"artifact_uri": artifact.get('uri', ''),
"l1": l1, "c1": c1, "l2": l2, "c2": c2,
"line_num": line_num, "msg_type": "SOURCE",
"message": message, "source_line": line, "rule_id": rule_id
})
last_codeFlow = codeFlow
S.msg("\n")
S.keep_with_context({
"sarif_file": args.file, "unique_id": unique_id,
"runi": runi, "resi": resi,
"codefi": None, "threadi": None, "loci": None, "related_index": None,
"artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1,
"line_num": None, "msg_type": "NEWLINE",
"message": "", "source_line": "", "rule_id": rule_id
})