Files
sarif-cli/bin/sarif-extract-multi

155 lines
4.7 KiB
Python
Executable File

#!/usr/bin/env python
""" Extract data from multiple sarif files in table form.
"""
from dataclasses import dataclass
from sarif_cli import signature, signature_multi
from sarif_cli import typegraph
from sarif_cli import snowflake_id
import argparse
import dataclasses as dc
import json
import pandas as pd
import pathlib
import sarif_cli.table_joins as tj
import sys
#
# Start processing
#
parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.')
parser.add_argument('file', metavar='sarif-files.json', type=str,
help="json file containing the metadata array. Use - for stdin. ")
parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory')
parser.add_argument('-c', '--combine-only', action="store_true",
help='Read the referenced input file(s) and write the combined structure to stdout')
args = parser.parse_args()
# Load meta info
with open(args.file, 'r') if args.file != '-' else sys.stdin as fp:
meta_struct = json.load(fp)
# Attach referenced files
def load(fname):
with open(fname, 'rb') as fp:
content = json.load(fp)
return content
for sarif_meta in meta_struct:
sarif_meta['sarif_content'] = load(sarif_meta['sarif_file_name'])
# Only output composite?
if args.combine_only:
json.dump(meta_struct, sys.stdout, indent=4)
sys.exit(0)
#
# Preprocess raw SARIF to get smaller signature
#
context = signature.Context(
{
"string" : "String",
"int" : "Int",
"bool" : "Bool"
}
)
meta_struct = signature.fillsig(args, meta_struct, context)
#
# Use reference type graph (signature) to traverse sarif and attach values to tables
#
tgraph = typegraph.Typegraph(signature_multi.struct_graph_2022_03_08)
typegraph.destructure(tgraph, signature_multi.start_node_2022_03_08, meta_struct)
#
# Form output tables
#
typegraph.attach_tables(tgraph)
#
# Dataframe / table collection
#
@dataclass
class BaseTables:
artifacts : pd.DataFrame
codeflows : pd.DataFrame
kind_pathproblem : pd.DataFrame
kind_problem : pd.DataFrame
project : pd.DataFrame
relatedLocations : pd.DataFrame
rules : pd.DataFrame
def __init__(self): pass
bt = BaseTables()
#
# Add dataframes
#
sf_2683 = tj.joins_for_sf_2683(tgraph)
af_0350_location = tj.joins_for_af_0350_location(tgraph)
bt.artifacts = tj.joins_for_artifacts(tgraph)
bt.codeflows = tj.joins_for_codeflows(tgraph, sf_2683)
bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location)
bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location)
bt.project = tj.joins_for_project(tgraph) # multi-sarif only
bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, sf_2683)
bt.rules = tj.joins_for_rules(tgraph)
#
# Replace the remaining internal ids with snowflake ids
#
flakegen = snowflake_id.Snowflake(0)
columns_to_reindex = {
# template from {field.name : [''] for field in dc.fields(bt)}
'artifacts': ['artifacts_id'],
'codeflows': ['codeflow_id'],
'kind_pathproblem': ['results_array_id', 'codeFlows_id'],
'kind_problem': ['results_array_id'],
'project': ['artifacts', 'results', 'rules'],
'relatedLocations': ['struct_id'],
'rules': ['rules_array_id']}
_id_to_flake = {}
def _get_flake(id):
flake = _id_to_flake.get(id, -1)
if flake == -1:
flake = flakegen.next()
_id_to_flake[id] = flake
return flake
#
# Cleaner, but makes far too many copies; keep the loop below
#
# def _reindex(table, colname):
# newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True)
# for i in range(0, len(newtable)):
# newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
# return newtable
#
# for field in dc.fields(bt):
# table_name = field.name
# for colname in columns_to_reindex[table_name]:
# setattr(bt, field.name, _reindex(getattr(bt, field.name), colname))
#
for field in dc.fields(bt):
table_name = field.name
table = getattr(bt, field.name)
# Turn all snowflake columns into uint64 and reset indexing to 0..len(table)
newtable = table.astype(
{ colname : 'uint64'
for colname in columns_to_reindex[table_name]}
).reset_index(drop=True)
# Swap ids for flakes
for colname in columns_to_reindex[table_name]:
for i in range(0, len(newtable)):
newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
# Replace the table
setattr(bt, field.name, newtable)
#
# Write output
#
p = pathlib.Path(args.outdir)
p.mkdir(exist_ok=True)
def write(path, frame):
with p.joinpath(path + ".csv").open(mode='wb') as fh:
frame.to_csv(fh, index=False)
for field in dc.fields(bt):
table = getattr(bt, field.name)
write(field.name, table)