#!/usr/bin/env python """ Extract scan data from multiple sarif files in table form. """ from dataclasses import dataclass from sarif_cli import signature, signature_single from sarif_cli import typegraph from sarif_cli import snowflake_id import argparse import dataclasses as dc import json import logging import pandas as pd import pathlib import sarif_cli.table_joins as tj import sarif_cli.scan_tables as st import sys # # Configure logger # logging.basicConfig(format='%(asctime)s %(message)s') # # Start processing # parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.') parser.add_argument('file', metavar='scan-spec.json', type=str, help="json file containing required external scan information.") parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory') args = parser.parse_args() # Load meta info def load(fname): with open(fname, 'rb') if fname != '-' else sys.stdin as fp: try: content = json.load(fp) except json.decoder.JSONDecodeError as err: logging.error('Error reading from {}: {}: line {}, column {}' .format(args.file, err.msg, err.lineno, err.colno)) sys.exit(1) return content scan_spec = load(args.file) sarif_struct = load(scan_spec['sarif_file_name']) # # Preprocess raw SARIF to get smaller signature # context = signature.Context( { "string" : "String", "int" : "Int", "bool" : "Bool" } ) sarif_struct = signature.fillsig(args, sarif_struct, context) # # Use reference type graph (signature) to traverse sarif and attach values to tables # tgraph = typegraph.Typegraph(signature_single.struct_graph_2022_02_01) typegraph.destructure(tgraph, signature_single.start_node_2022_02_01, sarif_struct) # # Form output tables # typegraph.attach_tables(tgraph) # # Dataframe / table collection # @dataclass class BaseTables: artifacts : pd.DataFrame codeflows : pd.DataFrame kind_pathproblem : pd.DataFrame kind_problem : pd.DataFrame project : pd.DataFrame relatedLocations : pd.DataFrame rules : pd.DataFrame def __init__(self): pass bt = BaseTables() @dataclass class ScanTables: # project: External table with project information scans : pd.DataFrame results : pd.DataFrame def __init__(self): pass scantabs = ScanTables() @dataclass class ExternalInfo: scan_id : int ql_query_id : str external_info = ExternalInfo( scan_spec['scan_id'], 'deadbeef00', # TODO: Take ql_query_id from where? ) # # Add dataframes for base tables # sf_2683 = tj.joins_for_sf_2683(tgraph) af_0350_location = tj.joins_for_af_0350_location(tgraph) bt.artifacts = tj.joins_for_artifacts(tgraph) bt.codeflows = tj.joins_for_codeflows(tgraph, sf_2683) bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location) bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location) bt.project = tj.joins_for_project_single(tgraph) bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, sf_2683) bt.rules = tj.joins_for_rules(tgraph) # # Form scan tables # scantabs.results = st.joins_for_results(bt, external_info) scantabs.scans = st.joins_for_scans(bt, external_info) # # Replace the remaining internal ids with snowflake ids # flakegen = snowflake_id.Snowflake(0) columns_to_reindex = { # template from {field.name : [''] for field in dc.fields(bt)} 'artifacts': ['artifacts_id'], 'codeflows': ['codeflow_id'], 'kind_pathproblem': ['results_array_id', 'codeFlows_id'], 'kind_problem': ['results_array_id'], 'project': ['artifacts', 'results', 'rules'], 'relatedLocations': ['struct_id'], 'rules': ['rules_array_id']} _id_to_flake = {} def _get_flake(id): flake = _id_to_flake.get(id, -1) if flake == -1: flake = flakegen.next() _id_to_flake[id] = flake return flake # # Cleaner, but makes far too many copies; keep the loop below # # def _reindex(table, colname): # newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True) # for i in range(0, len(newtable)): # newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname]) # return newtable # # for field in dc.fields(bt): # table_name = field.name # for colname in columns_to_reindex[table_name]: # setattr(bt, field.name, _reindex(getattr(bt, field.name), colname)) # for field in dc.fields(bt): table_name = field.name table = getattr(bt, field.name) # Turn all snowflake columns into uint64 and reset indexing to 0..len(table) newtable = table.astype( { colname : 'uint64' for colname in columns_to_reindex[table_name]} ).reset_index(drop=True) # Swap ids for flakes for colname in columns_to_reindex[table_name]: for i in range(0, len(newtable)): newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname]) # Replace the table setattr(bt, field.name, newtable) # # Write output # p = pathlib.Path(args.outdir) p.mkdir(exist_ok=True) def write(path, frame): with p.joinpath(path + ".csv").open(mode='wb') as fh: frame.to_csv(fh, index=False) for field in dc.fields(bt): table = getattr(bt, field.name) write(field.name, table)