mirror of
https://github.com/hohn/sarif-cli.git
synced 2025-12-16 17:23:03 +01:00
enabled by -f flag with CLI value tested on sarif from CodeQL CLIs: 2.6.3, 2.9.4, 2.11.4 MUST contain versionControlProvenance property however
277 lines
8.6 KiB
Python
Executable File
277 lines
8.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
""" Extract scan data from multiple sarif files in table form.
|
|
"""
|
|
from dataclasses import dataclass
|
|
from sarif_cli import signature, signature_single, signature_single_CLI
|
|
from sarif_cli import typegraph
|
|
from sarif_cli import snowflake_id
|
|
from sarif_cli import status_writer
|
|
import argparse
|
|
import csv
|
|
import dataclasses as dc
|
|
import json
|
|
import logging
|
|
import pandas as pd
|
|
import pathlib
|
|
import sarif_cli.table_joins as tj
|
|
import sarif_cli.table_joins_CLI as tj_CLI
|
|
import sarif_cli.scan_tables as st
|
|
import sys
|
|
|
|
#
|
|
# Configure logger
|
|
#
|
|
logging.basicConfig(format='%(asctime)s %(message)s')
|
|
|
|
#
|
|
# Start processing
|
|
#
|
|
parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.')
|
|
parser.add_argument('file', metavar='scan-spec.json', type=str,
|
|
help="json file containing required external scan information.")
|
|
parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory')
|
|
parser.add_argument('csvout', metavar='csv-outfile', type=str, help='processing status csv output file name to use')
|
|
parser.add_argument('-r', '--write-raw-tables', action="store_true",
|
|
help='Write the raw sarif tables to the output directory')
|
|
parser.add_argument('-f','--input-signature', metavar='input-signature', type=str, default="LGTM",
|
|
help='Signature of the sarif, as in, where it was generated it may affect the signature.'
|
|
'Options: LGTM, CLI'
|
|
'If current represented signatures are not sufficient, view signature_single.py for how to support further signatures.'
|
|
' Default: "%(default)s"')
|
|
args = parser.parse_args()
|
|
|
|
if args.input_signature not in ["LGTM","CLI"]:
|
|
print("Unsupported sarif signature requested.")
|
|
print("Use one of [LGTM, CLI].")
|
|
sys.exit(0)
|
|
|
|
# Setup csv error writer
|
|
status_writer.setup_csv_writer(args.csvout)
|
|
|
|
# Load meta info
|
|
def load(fname):
|
|
with open(fname, 'rb') if fname != '-' else sys.stdin as fp:
|
|
try:
|
|
content = json.load(fp)
|
|
except json.decoder.JSONDecodeError as err:
|
|
logging.error('Error reading from {}: {}: line {}, column {}'
|
|
.format(fname, err.msg, err.lineno, err.colno))
|
|
status_writer.file_load_error["sarif_file"] = fname
|
|
status_writer.csv_write(status_writer.file_load_error)
|
|
sys.exit(1)
|
|
return content
|
|
|
|
scan_spec = load(args.file)
|
|
sarif_struct = load(scan_spec['sarif_file_name'])
|
|
status_writer.setup_status_filenames(scan_spec['sarif_file_name'])
|
|
|
|
#
|
|
# Preprocess raw SARIF to get smaller signature
|
|
#
|
|
context = signature.Context(
|
|
{
|
|
"string" : "String",
|
|
"int" : "Int",
|
|
"bool" : "Bool"
|
|
}
|
|
)
|
|
sarif_struct = signature.fillsig(args, sarif_struct, context)
|
|
|
|
#
|
|
# Setup which signature to use
|
|
if args.input_signature == "LGTM":
|
|
signature_to_use = signature_single.struct_graph_LGTM
|
|
start_node = signature_single.start_node_LGTM
|
|
else:
|
|
#signature_to_use = signature_single.struct_graph_CLI
|
|
signature_to_use = signature_single_CLI.struct_graph_CLI
|
|
start_node = signature_single_CLI.start_node_CLI
|
|
#
|
|
# Use reference type graph (signature) to traverse sarif and attach values to tables
|
|
try:
|
|
tgraph = typegraph.Typegraph(signature_to_use)
|
|
typegraph.destructure(tgraph, start_node, sarif_struct)
|
|
except Exception:
|
|
# will have gathered errors/warnings
|
|
status_writer.csv_write_warnings()
|
|
#pass the exception up to be put into log by runner
|
|
raise(Exception)
|
|
|
|
#
|
|
# Form output tables
|
|
#
|
|
typegraph.attach_tables(tgraph)
|
|
|
|
#
|
|
# Dataframe / table collection
|
|
#
|
|
@dataclass
|
|
class BaseTables:
|
|
artifacts : pd.DataFrame
|
|
codeflows : pd.DataFrame
|
|
kind_pathproblem : pd.DataFrame
|
|
kind_problem : pd.DataFrame
|
|
project : pd.DataFrame
|
|
relatedLocations : pd.DataFrame
|
|
rules : pd.DataFrame
|
|
columns_to_reindex : dict # (name -> name list) dict
|
|
def __init__(self): pass
|
|
bt = BaseTables()
|
|
|
|
@dataclass
|
|
class ScanTables:
|
|
# project: External table with project information
|
|
scans : pd.DataFrame
|
|
results : pd.DataFrame
|
|
projects : pd.DataFrame
|
|
columns_to_reindex : dict # (name -> name list) dict
|
|
def __init__(self): pass
|
|
scantabs = ScanTables()
|
|
|
|
@dataclass
|
|
class ExternalInfo:
|
|
project_id : int
|
|
scan_id : pd.UInt64Dtype()
|
|
sarif_file_name : str
|
|
ql_query_id : str
|
|
|
|
external_info = ExternalInfo(
|
|
scan_spec["project_id"],
|
|
scan_spec["scan_id"],
|
|
scan_spec["sarif_file_name"],
|
|
# TODO: Take ql_query_id from where? (git commit id of the ql query set)
|
|
'deadbeef00',
|
|
)
|
|
|
|
#
|
|
# Add dataframes for base tables
|
|
#
|
|
# (relies on some specifics of the sigature type)
|
|
if args.input_signature == "LGTM":
|
|
tj = tj
|
|
else:
|
|
tj = tj_CLI
|
|
try:
|
|
location_info = tj.joins_for_location_info(tgraph)
|
|
af_0350_location = tj.joins_for_af_0350_location(tgraph)
|
|
bt.artifacts = tj.joins_for_artifacts(tgraph)
|
|
bt.codeflows = tj.joins_for_codeflows(tgraph, location_info)
|
|
bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location)
|
|
bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location)
|
|
bt.project = tj.joins_for_project_single(tgraph)
|
|
bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, location_info)
|
|
bt.rules = tj.joins_for_rules(tgraph)
|
|
except Exception:
|
|
#possible warnings accumulated
|
|
status_writer.csv_write_warnings()
|
|
raise Exception
|
|
|
|
#
|
|
# Setup rest of basetables
|
|
#
|
|
bt.columns_to_reindex = {
|
|
# template from {field.name : [''] for field in dc.fields(bt)}
|
|
'artifacts': ['artifacts_id'],
|
|
'codeflows': ['codeflow_id'],
|
|
'kind_pathproblem': ['results_array_id', 'codeFlows_id'],
|
|
'kind_problem': ['results_array_id'],
|
|
'project': ['artifacts', 'results', 'rules'],
|
|
'relatedLocations': ['struct_id'],
|
|
'rules': ['rules_array_id']}
|
|
|
|
scantabs.columns_to_reindex = {
|
|
'scans': [],
|
|
'projects' : [],
|
|
'results': ['codeFlow_id'],
|
|
}
|
|
|
|
#
|
|
# Form scan tables
|
|
#
|
|
# joins for projects has to happen first as it backfills the guess about the project_id
|
|
scantabs.projects = st.joins_for_projects(bt, external_info)
|
|
scantabs.results = st.joins_for_results(bt, external_info)
|
|
scantabs.scans = st.joins_for_scans(bt, external_info, scantabs, args.input_signature)
|
|
|
|
#
|
|
# Replace the remaining internal ids with snowflake ids
|
|
#
|
|
flakegen = snowflake_id.Snowflake(0)
|
|
|
|
_id_to_flake = {}
|
|
def _get_flake(id):
|
|
flake = _id_to_flake.get(id, -1)
|
|
if flake == -1:
|
|
flake = flakegen.next()
|
|
_id_to_flake[id] = flake
|
|
return flake
|
|
|
|
#
|
|
# Cleaner, but makes far too many copies; keep the loop below
|
|
#
|
|
# def _reindex(table, colname):
|
|
# newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True)
|
|
# for i in range(0, len(newtable)):
|
|
# newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
|
|
# return newtable
|
|
#
|
|
# for field in dc.fields(bt):
|
|
# table_name = field.name
|
|
# for colname in columns_to_reindex[table_name]:
|
|
# setattr(bt, field.name, _reindex(getattr(bt, field.name), colname))
|
|
#
|
|
|
|
def _replace_ids(tables_dataclass):
|
|
tdc = tables_dataclass
|
|
for field in dc.fields(tdc):
|
|
if field.type != pd.DataFrame:
|
|
continue
|
|
table_name = field.name
|
|
table = getattr(tdc, field.name)
|
|
# Turn all snowflake columns into uint64 and reset indexing to 0..len(table)
|
|
newtable = table.astype(
|
|
{ colname : 'uint64'
|
|
for colname in tdc.columns_to_reindex[table_name]}
|
|
).reset_index(drop=True)
|
|
# Swap ids for flakes
|
|
for colname in tdc.columns_to_reindex[table_name]:
|
|
for i in range(0, len(newtable)):
|
|
oid = newtable.loc[i, colname]
|
|
if oid in [0,-1]:
|
|
# Ignore special values
|
|
continue
|
|
newtable.loc[i, colname] = _get_flake(oid)
|
|
# Replace the table
|
|
setattr(tdc, field.name, newtable)
|
|
|
|
# Replace id()s of the base and derived tables
|
|
_replace_ids(bt)
|
|
_replace_ids(scantabs)
|
|
|
|
#
|
|
# Write output
|
|
#
|
|
p = pathlib.Path(args.outdir)
|
|
p.mkdir(exist_ok=True)
|
|
|
|
def write(path, frame):
|
|
with p.joinpath(path + ".csv").open(mode='wb') as fh:
|
|
frame.to_csv(fh, index=False, quoting=csv.QUOTE_NONNUMERIC)
|
|
|
|
def _write_dataframes_of(tables_dataclass):
|
|
for field in dc.fields(tables_dataclass):
|
|
if field.type != pd.DataFrame:
|
|
continue
|
|
table = getattr(tables_dataclass, field.name)
|
|
write(field.name, table)
|
|
|
|
# Write sarif-based tables
|
|
if args.write_raw_tables:
|
|
_write_dataframes_of(bt)
|
|
|
|
# Write derived tables and codeflows
|
|
_write_dataframes_of(scantabs)
|
|
|
|
write('codeflows', bt.codeflows)
|
|
status_writer.warning_set["success"]+=1
|
|
status_writer.csv_write_warnings() |