#!/usr/bin/env python """ Extract data from multiple sarif files in table form. """ from dataclasses import dataclass from sarif_cli import signature, signature_multi from sarif_cli import typegraph from sarif_cli import snowflake_id import argparse import csv import dataclasses as dc import json import pandas as pd import pathlib import sarif_cli.table_joins as tj import sys # # Start processing # parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.') parser.add_argument('file', metavar='sarif-files.json', type=str, help="json file containing the metadata array. Use - for stdin. ") parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory') parser.add_argument('-c', '--combine-only', action="store_true", help='Read the referenced input file(s) and write the combined structure to stdout') args = parser.parse_args() # Load meta info with open(args.file, 'r') if args.file != '-' else sys.stdin as fp: meta_struct = json.load(fp) # Attach referenced files def load(fname): with open(fname, 'rb') as fp: content = json.load(fp) return content for sarif_meta in meta_struct: sarif_meta['sarif_content'] = load(sarif_meta['sarif_file_name']) # Only output composite? if args.combine_only: json.dump(meta_struct, sys.stdout, indent=4) sys.exit(0) # # Preprocess raw SARIF to get smaller signature # context = signature.Context( { "string" : "String", "int" : "Int", "bool" : "Bool" } ) meta_struct = signature.fillsig(args, meta_struct, context) # # Use reference type graph (signature) to traverse sarif and attach values to tables # tgraph = typegraph.Typegraph(signature_multi.struct_graph_2022_03_08) typegraph.destructure(tgraph, signature_multi.start_node_2022_03_08, meta_struct) # # Form output tables # typegraph.attach_tables(tgraph) # # Dataframe / table collection # @dataclass class BaseTables: artifacts : pd.DataFrame codeflows : pd.DataFrame kind_pathproblem : pd.DataFrame kind_problem : pd.DataFrame project : pd.DataFrame relatedLocations : pd.DataFrame rules : pd.DataFrame def __init__(self): pass bt = BaseTables() # # Add dataframes # sf_2683 = tj.joins_for_location_info(tgraph) af_0350_location = tj.joins_for_af_0350_location(tgraph) bt.artifacts = tj.joins_for_artifacts(tgraph) bt.codeflows = tj.joins_for_codeflows(tgraph, sf_2683) bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location) bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location) bt.project = tj.joins_for_project(tgraph) # multi-sarif only bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, sf_2683) bt.rules = tj.joins_for_rules(tgraph) # # Replace the remaining internal ids with snowflake ids # flakegen = snowflake_id.Snowflake(0) columns_to_reindex = { # template from {field.name : [''] for field in dc.fields(bt)} 'artifacts': ['artifacts_id'], 'codeflows': ['codeflow_id'], 'kind_pathproblem': ['results_array_id', 'codeFlows_id'], 'kind_problem': ['results_array_id'], 'project': ['artifacts', 'results', 'rules'], 'relatedLocations': ['struct_id'], 'rules': ['rules_array_id']} _id_to_flake = {} def _get_flake(id): flake = _id_to_flake.get(id, -1) if flake == -1: flake = flakegen.next() _id_to_flake[id] = flake return flake # # Cleaner, but makes far too many copies; keep the loop below # # def _reindex(table, colname): # newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True) # for i in range(0, len(newtable)): # newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname]) # return newtable # # for field in dc.fields(bt): # table_name = field.name # for colname in columns_to_reindex[table_name]: # setattr(bt, field.name, _reindex(getattr(bt, field.name), colname)) # for field in dc.fields(bt): table_name = field.name table = getattr(bt, field.name) # Turn all snowflake columns into uint64 and reset indexing to 0..len(table) newtable = table.astype( { colname : 'uint64' for colname in columns_to_reindex[table_name]} ).reset_index(drop=True) # Swap ids for flakes for colname in columns_to_reindex[table_name]: for i in range(0, len(newtable)): newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname]) # Replace the table setattr(bt, field.name, newtable) # # Write output # p = pathlib.Path(args.outdir) p.mkdir(exist_ok=True) def write(path, frame): with p.joinpath(path + ".csv").open(mode='wb') as fh: frame.to_csv(fh, index=False, quoting=csv.QUOTE_NONNUMERIC) for field in dc.fields(bt): table = getattr(bt, field.name) write(field.name, table)