mirror of
https://github.com/hohn/sarif-cli.git
synced 2025-12-16 17:23:03 +01:00
tested simple pull extractor. fail.
This commit is contained in:
123
bin/sarif-pull
123
bin/sarif-pull
@@ -1,84 +1,69 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pull-style SARIF to SQLite converter.
|
||||
Example: sarif-pull foo.sarif foo.db
|
||||
Pull-style SARIF → SQLite importer.
|
||||
Populates runs, results, alerts, referenced_source_regions.
|
||||
"""
|
||||
import sqlite3, sys, os, uuid
|
||||
|
||||
import json, fnmatch, hashlib, datetime
|
||||
|
||||
def load_json(path):
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
def flatten_json(obj, prefix="", sep="/"):
|
||||
"""Yield (path, value) pairs from nested dicts/lists."""
|
||||
if isinstance(obj, dict):
|
||||
for k, v in obj.items():
|
||||
yield from flatten_json(v, f"{prefix}{sep}{k}" if prefix else k, sep)
|
||||
elif isinstance(obj, list):
|
||||
for i, v in enumerate(obj):
|
||||
yield from flatten_json(v, f"{prefix}{sep}{i}" if prefix else str(i), sep)
|
||||
else:
|
||||
yield prefix, obj
|
||||
|
||||
def hash_snippet(text):
|
||||
return hashlib.sha1(text.encode('utf-8', 'ignore')).hexdigest()
|
||||
|
||||
def now_timestamp():
|
||||
return datetime.datetime.utcnow().isoformat(sep=' ', timespec='seconds')
|
||||
|
||||
import sqlite3, sys, os
|
||||
from sarif_util import load_json, hash_snippet, now_timestamp
|
||||
import subprocess
|
||||
|
||||
def ensure_schema(db):
|
||||
import subprocess
|
||||
subprocess.run(["sarif-make-schema", db], check=True)
|
||||
|
||||
def extract_results(run_id, run):
|
||||
results = []
|
||||
tool = run.get("tool", {}).get("driver", {}).get("name", "")
|
||||
version = run.get("tool", {}).get("driver", {}).get("semanticVersion", "")
|
||||
for res in run.get("results", []) or []:
|
||||
msg = (res.get("message") or {}).get("text", "")
|
||||
rule_id = res.get("ruleId", "")
|
||||
sev = (res.get("properties") or {}).get("problem.severity", "")
|
||||
locs = res.get("locations") or []
|
||||
def extract_all(run_id, run):
|
||||
results, alerts, regions = [], [], []
|
||||
tool = run.get("tool",{}).get("driver",{}).get("name","")
|
||||
version = run.get("tool",{}).get("driver",{}).get("semanticVersion","")
|
||||
for res in run.get("results",[]) or []:
|
||||
msg=(res.get("message") or {}).get("text","")
|
||||
rule_id=res.get("ruleId","")
|
||||
sev=(res.get("properties") or {}).get("problem.severity","")
|
||||
locs=res.get("locations") or []
|
||||
for loc in locs:
|
||||
ploc = loc.get("physicalLocation", {}) if loc else {}
|
||||
file_path = (ploc.get("artifactLocation") or {}).get("uri", "")
|
||||
region = ploc.get("region") or {}
|
||||
results.append({
|
||||
"run_id": run_id,
|
||||
"rule_id": rule_id,
|
||||
"severity": sev,
|
||||
"message": msg,
|
||||
"file_path": file_path,
|
||||
"line_start": region.get("startLine"),
|
||||
"line_end": region.get("endLine"),
|
||||
"column_start": region.get("startColumn"),
|
||||
"column_end": region.get("endColumn"),
|
||||
})
|
||||
return results, tool, version
|
||||
ploc=loc.get("physicalLocation",{}) if loc else {}
|
||||
file_path=(ploc.get("artifactLocation") or {}).get("uri","")
|
||||
region=ploc.get("region") or {}
|
||||
ls,le,cs,ce=(region.get("startLine"),region.get("endLine"),
|
||||
region.get("startColumn"),region.get("endColumn"))
|
||||
rid=hash_snippet(f"{run_id}|{rule_id}|{file_path}|{ls}|{le}|{cs}|{ce}")
|
||||
results.append(dict(run_id=run_id,rule_id=rule_id,severity=sev,
|
||||
message=msg,file_path=file_path,
|
||||
line_start=ls,line_end=le,
|
||||
column_start=cs,column_end=ce))
|
||||
alerts.append(dict(alert_id=rid,run_id=run_id,rule_id=rule_id,
|
||||
kind="result",file_path=file_path,
|
||||
message=msg,severity=sev))
|
||||
regions.append(dict(region_id=hash_snippet(f"{file_path}|{ls}|{le}|{cs}|{ce}"),
|
||||
result_id=rid,file_path=file_path,
|
||||
start_line=ls,end_line=le,
|
||||
start_column=cs,end_column=ce,
|
||||
snippet=None,source_hash=None))
|
||||
return results, alerts, regions, tool, version
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
if len(sys.argv)<3:
|
||||
print("Usage: sarif-pull input.sarif output.db")
|
||||
sys.exit(1)
|
||||
sarif_file, dbfile = sys.argv[1], sys.argv[2]
|
||||
sarif_file,dbfile=sys.argv[1:3]
|
||||
ensure_schema(dbfile)
|
||||
sarif = load_json(sarif_file)
|
||||
con = sqlite3.connect(dbfile)
|
||||
cur = con.cursor()
|
||||
for i, run in enumerate(sarif.get("runs", [])):
|
||||
run_id = f"{os.path.basename(sarif_file)}#{i}"
|
||||
results, tool, version = extract_results(run_id, run)
|
||||
cur.execute("INSERT OR REPLACE INTO runs VALUES (?, ?, ?, ?, ?)",
|
||||
(run_id, now_timestamp(), tool, version, 0))
|
||||
sarif=load_json(sarif_file)
|
||||
con=sqlite3.connect(dbfile)
|
||||
cur=con.cursor()
|
||||
for i,run in enumerate(sarif.get("runs",[])):
|
||||
run_id=f"{os.path.basename(sarif_file)}#{i}"
|
||||
results,alerts,regions,tool,version=extract_all(run_id,run)
|
||||
cur.execute("INSERT OR REPLACE INTO runs VALUES (?,?,?,?,?)",
|
||||
(run_id,now_timestamp(),tool,version,0))
|
||||
cur.executemany("""INSERT OR REPLACE INTO results VALUES
|
||||
(:run_id, :rule_id, :severity, :message, :file_path,
|
||||
:line_start, :line_end, :column_start, :column_end)""", results)
|
||||
con.commit()
|
||||
con.close()
|
||||
print(f"Inserted {len(results)} results into {dbfile}")
|
||||
(:run_id,:rule_id,:severity,:message,:file_path,
|
||||
:line_start,:line_end,:column_start,:column_end)""",results)
|
||||
cur.executemany("""INSERT OR REPLACE INTO alerts VALUES
|
||||
(:alert_id,:run_id,:rule_id,:kind,:file_path,:message,:severity)""",alerts)
|
||||
cur.executemany("""INSERT OR REPLACE INTO referenced_source_regions VALUES
|
||||
(:region_id,:result_id,:file_path,:start_line,:end_line,
|
||||
:start_column,:end_column,:snippet,:source_hash)""",regions)
|
||||
con.commit(); con.close()
|
||||
print(f"Inserted {len(results)} results, {len(alerts)} alerts, "
|
||||
f"{len(regions)} regions into {dbfile}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
if __name__=="__main__": main()
|
||||
|
||||
Reference in New Issue
Block a user