#!/usr/bin/env python import argparse import json import sarif_cli.traverse as S import sys import sqlite3 import hashlib # -------------------------------------------------------------------- # Argument parsing # -------------------------------------------------------------------- parser = argparse.ArgumentParser(description='summary of results, stored in sqlite') parser.add_argument('file', metavar='sarif-file', type=str, help='input file, - for stdin') parser.add_argument('dbfile', metavar='db-file', type=str, help='sqlite database file to append results to') parser.add_argument('-s', '--list-source', metavar='srcroot', type=str, help='list source snippets using srcroot as sarif SRCROOT') parser.add_argument('-r', '--related-locations', action="store_true", help='list related locations like "hides "') parser.add_argument('-e', '--endpoints-only', action="store_true", help='only list source and sink, dropping the path. Identical, successive source/sink pairs are combined') args = parser.parse_args() # -------------------------------------------------------------------- # Read SARIF # -------------------------------------------------------------------- with open(args.file, 'r') if args.file != '-' else sys.stdin as fp: sarif_struct = json.load(fp) if not S.is_sarif_struct(sarif_struct): S.msg("ERROR: invalid json contents in %s\n" % (args.file)) S.dbg("invalid json contents in %s\n" % (args.file)) sys.exit(0) # -------------------------------------------------------------------- # Compute unique id (tool version, git commit, date) # -------------------------------------------------------------------- def compute_unique_id(sarif_struct, runi, sarif_file): def _safeget(*path): try: return S.get(*path) except Exception: return None tool_version = _safeget(sarif_struct, 'runs', runi, 'tool', 'driver', 'version') revision_id = _safeget(sarif_struct, 'runs', runi, 'versionControlProvenance', 0, 'revisionId') start_time = _safeget(sarif_struct, 'runs', runi, 'invocations', 0, 'startTimeUtc') seed = f"{tool_version or ''}|{revision_id or ''}|{start_time or ''}|{sarif_file}" return hashlib.sha1(seed.encode('utf-8')).hexdigest() # -------------------------------------------------------------------- # Define keep_with_context inside S # -------------------------------------------------------------------- def _init_db(dbfile): conn = sqlite3.connect(dbfile) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS sarif_results ( sarif_file TEXT, unique_id TEXT, runi INTEGER, resi INTEGER, codefi INTEGER, threadi INTEGER, loci INTEGER, related_index INTEGER, artifact_uri TEXT, l1 INTEGER, c1 INTEGER, l2 INTEGER, c2 INTEGER, line_num INTEGER, msg_type TEXT, message TEXT, source_line TEXT, rule_id TEXT ); """) cur.execute("CREATE INDEX IF NOT EXISTS idx_artifact_uri ON sarif_results(artifact_uri);") cur.execute("CREATE INDEX IF NOT EXISTS idx_runi_resi ON sarif_results(runi, resi);") cur.execute("CREATE INDEX IF NOT EXISTS idx_msg_type ON sarif_results(msg_type);") cur.execute("CREATE INDEX IF NOT EXISTS idx_unique_id ON sarif_results(unique_id);") cur.execute("CREATE INDEX IF NOT EXISTS idx_rule_id ON sarif_results(rule_id);") conn.commit() return conn _conn = _init_db(args.dbfile) _buffer = [] _COMMIT_INTERVAL = 1000 def _flush_buffer(): global _buffer if not _buffer: return cur = _conn.cursor() cur.executemany(""" INSERT INTO sarif_results ( sarif_file, unique_id, runi, resi, codefi, threadi, loci, related_index, artifact_uri, l1, c1, l2, c2, line_num, msg_type, message, source_line, rule_id ) VALUES ( :sarif_file, :unique_id, :runi, :resi, :codefi, :threadi, :loci, :related_index, :artifact_uri, :l1, :c1, :l2, :c2, :line_num, :msg_type, :message, :source_line, :rule_id ) """, _buffer) _conn.commit() _buffer = [] def keep_with_context(ctx): global _buffer _buffer.append(ctx) if len(_buffer) >= _COMMIT_INTERVAL: _flush_buffer() S.keep_with_context = keep_with_context import atexit atexit.register(_flush_buffer) # -------------------------------------------------------------------- # Traverse SARIF # -------------------------------------------------------------------- for runi in S.indices(sarif_struct, 'runs'): unique_id = compute_unique_id(sarif_struct, runi, args.file) run_obj = S.get(sarif_struct, 'runs', runi) results = run_obj.get('results', []) if not results: S.dbg(f"Skipping {args.file} run {runi}: no results key\n") continue num_results = len(results) for resi in S.indices(sarif_struct, 'runs', runi, 'results'): result = S.get(sarif_struct, 'runs', runi, 'results', resi) rule_id = result.get("ruleId") if not rule_id: try: rule_id = S.get(result, "rule", "id") except Exception: rule_id = None # ---------------- Locations (non-path problems) if 'locations' in result: message, artifact, region = S.get_location_message_info(result) if region == S.WholeFile: l1, c1, l2, c2 = -1, -1, -1, -1 else: l1, c1, l2, c2 = S.lineinfo(region) filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2) S.msg("RESULT: %s: %s\n" % (filepath, message)) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": None, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": None, "msg_type": "RESULT", "message": message, "source_line": "", "rule_id": rule_id }) if region != S.WholeFile and args.list_source: lines = S.load_lines(args.list_source, artifact['uri'], l1, l2) for line, line_num in zip(lines, range(l1, l2 + 1)): S.display_underlined(l1, c1, l2, c2, line, line_num) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": None, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": line_num, "msg_type": "SOURCE", "message": message, "source_line": line, "rule_id": rule_id }) if args.related_locations: relatedLocations = result.get('relatedLocations', None) if isinstance(relatedLocations, list): for relo_index, relo in enumerate(relatedLocations): message, artifact, region = S.get_relatedlocation_message_info(relo) if artifact == S.NoFile: S.msg("REFERENCE: %s: %s\n" % ("", message)) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": relo_index, "artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1, "line_num": None, "msg_type": "REFERENCE", "message": message, "source_line": "", "rule_id": rule_id }) else: if region == S.WholeFile: l1, c1, l2, c2 = -1, -1, -1, -1 else: l1, c1, l2, c2 = S.lineinfo(region) filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2) S.msg("REFERENCE: %s: %s\n" % (filepath, message)) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": relo_index, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": None, "msg_type": "REFERENCE", "message": message, "source_line": "", "rule_id": rule_id }) if args.list_source: lines = S.load_lines(args.list_source, artifact['uri'], l1, l2) for line, line_num in zip(lines, range(l1, l2 + 1)): S.display_underlined(l1, c1, l2, c2, line, line_num) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": relo_index, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": line_num, "msg_type": "SOURCE", "message": message, "source_line": line, "rule_id": rule_id }) # ---------------- CodeFlows (path problems) if 'codeFlows' in result: last_codeFlow = None for codefi in S.indices(result, 'codeFlows'): codeFlow = S.get(result, 'codeFlows', codefi) S.msg("PATH %d\n" % codefi) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": codefi, "threadi": None, "loci": None, "related_index": None, "artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1, "line_num": None, "msg_type": "PATH", "message": "", "source_line": "", "rule_id": rule_id }) for threadi in S.indices(codeFlow, 'threadFlows'): threadFlow = S.get(codeFlow, 'threadFlows', threadi) if args.endpoints_only: t1 = S.indices(threadFlow, 'locations') location_range = [t1[0], t1[-1]] if (last_codeFlow and (S.get(last_codeFlow, 'threadFlows', threadi, 'locations', 0) == S.get(codeFlow, 'threadFlows', threadi, 'locations', 0)) and (S.get(last_codeFlow, 'threadFlows', threadi, 'locations', -1) == S.get(codeFlow, 'threadFlows', threadi, 'locations', -1))): continue else: location_range = S.indices(threadFlow, 'locations') for loci in location_range: location = S.get(threadFlow, 'locations', loci, 'location') message, artifact, region = S.get_relatedlocation_message_info(location) if artifact == S.NoFile: S.msg("FLOW STEP %d: %s: %s\n" % (loci, "", message)) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": codefi, "threadi": threadi, "loci": loci, "related_index": None, "artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1, "line_num": None, "msg_type": "FLOW_STEP", "message": message, "source_line": "", "rule_id": rule_id }) else: if region == S.WholeFile: l1, c1, l2, c2 = -1, -1, -1, -1 else: l1, c1, l2, c2 = S.lineinfo(region) filepath = "%s:%d:%d:%d:%d" % (artifact['uri'], l1, c1, l2, c2) S.msg("FLOW STEP %d: %s: %s\n" % (loci, filepath, message)) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": codefi, "threadi": threadi, "loci": loci, "related_index": None, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": None, "msg_type": "FLOW_STEP", "message": message, "source_line": "", "rule_id": rule_id }) if args.list_source: lines = S.load_lines(args.list_source, artifact['uri'], l1, l2) for line, line_num in zip(lines, range(l1, l2 + 1)): S.display_underlined(l1, c1, l2, c2, line, line_num) S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": codefi, "threadi": threadi, "loci": loci, "related_index": None, "artifact_uri": artifact.get('uri', ''), "l1": l1, "c1": c1, "l2": l2, "c2": c2, "line_num": line_num, "msg_type": "SOURCE", "message": message, "source_line": line, "rule_id": rule_id }) last_codeFlow = codeFlow S.msg("\n") S.keep_with_context({ "sarif_file": args.file, "unique_id": unique_id, "runi": runi, "resi": resi, "codefi": None, "threadi": None, "loci": None, "related_index": None, "artifact_uri": "", "l1": -1, "c1": -1, "l2": -1, "c2": -1, "line_num": None, "msg_type": "NEWLINE", "message": "", "source_line": "", "rule_id": rule_id })