Files
sarif-cli/bin/sarif-pull
michael hohn 25a6538946 wip: add simple pull parser
Works, but is incomplete:
qlite> SELECT COUNT(*) FROM results;
SELECT COUNT(*) FROM runs;
SELECT COUNT(*) FROM alerts;
SELECT COUNT(*) FROM referenced_source_regions;
3139
1
0
0
2025-10-20 00:16:21 -07:00

85 lines
3.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Pull-style SARIF to SQLite converter.
Example: sarif-pull foo.sarif foo.db
"""
import sqlite3, sys, os, uuid
import json, fnmatch, hashlib, datetime
def load_json(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def flatten_json(obj, prefix="", sep="/"):
"""Yield (path, value) pairs from nested dicts/lists."""
if isinstance(obj, dict):
for k, v in obj.items():
yield from flatten_json(v, f"{prefix}{sep}{k}" if prefix else k, sep)
elif isinstance(obj, list):
for i, v in enumerate(obj):
yield from flatten_json(v, f"{prefix}{sep}{i}" if prefix else str(i), sep)
else:
yield prefix, obj
def hash_snippet(text):
return hashlib.sha1(text.encode('utf-8', 'ignore')).hexdigest()
def now_timestamp():
return datetime.datetime.utcnow().isoformat(sep=' ', timespec='seconds')
def ensure_schema(db):
import subprocess
subprocess.run(["sarif-make-schema", db], check=True)
def extract_results(run_id, run):
results = []
tool = run.get("tool", {}).get("driver", {}).get("name", "")
version = run.get("tool", {}).get("driver", {}).get("semanticVersion", "")
for res in run.get("results", []) or []:
msg = (res.get("message") or {}).get("text", "")
rule_id = res.get("ruleId", "")
sev = (res.get("properties") or {}).get("problem.severity", "")
locs = res.get("locations") or []
for loc in locs:
ploc = loc.get("physicalLocation", {}) if loc else {}
file_path = (ploc.get("artifactLocation") or {}).get("uri", "")
region = ploc.get("region") or {}
results.append({
"run_id": run_id,
"rule_id": rule_id,
"severity": sev,
"message": msg,
"file_path": file_path,
"line_start": region.get("startLine"),
"line_end": region.get("endLine"),
"column_start": region.get("startColumn"),
"column_end": region.get("endColumn"),
})
return results, tool, version
def main():
if len(sys.argv) < 3:
print("Usage: sarif-pull input.sarif output.db")
sys.exit(1)
sarif_file, dbfile = sys.argv[1], sys.argv[2]
ensure_schema(dbfile)
sarif = load_json(sarif_file)
con = sqlite3.connect(dbfile)
cur = con.cursor()
for i, run in enumerate(sarif.get("runs", [])):
run_id = f"{os.path.basename(sarif_file)}#{i}"
results, tool, version = extract_results(run_id, run)
cur.execute("INSERT OR REPLACE INTO runs VALUES (?, ?, ?, ?, ?)",
(run_id, now_timestamp(), tool, version, 0))
cur.executemany("""INSERT OR REPLACE INTO results VALUES
(:run_id, :rule_id, :severity, :message, :file_path,
:line_start, :line_end, :column_start, :column_end)""", results)
con.commit()
con.close()
print(f"Inserted {len(results)} results into {dbfile}")
if __name__ == "__main__":
main()