#!/usr/bin/env python """ Extract scan data from multiple sarif files in table form. """ from dataclasses import dataclass from sarif_cli import signature, signature_single, signature_single_CLI from sarif_cli import typegraph from sarif_cli import snowflake_id from sarif_cli import status_writer import argparse import csv import dataclasses as dc import json import logging import pandas as pd import pathlib import sarif_cli.table_joins as tj import sarif_cli.table_joins_CLI as tj_CLI import sarif_cli.scan_tables as st from sarif_cli import columns import sys # # Configure logger # logging.basicConfig(format='%(asctime)s %(message)s') # # Start processing # parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.') parser.add_argument('file', metavar='scan-spec.json', type=str, help="json file containing required external scan information.") parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory') parser.add_argument('csvout', metavar='csv-outfile', type=str, help='processing status csv output file name to use') parser.add_argument('-r', '--write-raw-tables', action="store_true", help='Write the raw sarif tables to the output directory') parser.add_argument('-t', '--with-timestamps', action='store_true', help='Read name of files containing timestamp information ' 'from the scan-spec.json file. ' 'The file format changes from ' 'e.g., ' '{"scan_id": 15092319597255524458, "sarif_file_name": "sqlidb-0.1.sarif"} ' 'to ' '{"scan_id": 15092319597255524458, "sarif_file_name": "sqlidb-0.1.sarif", timestamp_file_name: "sqlidb-0.1.timestamps"}' ) parser.add_argument('-f','--input-signature', metavar='input-signature', type=str, default="CLI", help='Signature of the sarif, as in, where it was generated it may affect the signature.\n' 'Options: LGTM, CLI\n' 'If current represented signatures are not sufficient, view signature_single.py for how to support further signatures.' ' Default: "%(default)s"') parser.add_argument("-d", "--debug", action="store_true", help="Run inside IPython with --pdb for post-mortem debugging") args = parser.parse_args() import sys, pdb, traceback def debug_excepthook(type, value, tb): traceback.print_exception(type, value, tb) print("\nEntering post-mortem debugger...\n") pdb.post_mortem(tb) # XX: if args.debug: sys.excepthook = debug_excepthook if args.input_signature not in ["LGTM","CLI"]: print("Unsupported sarif signature requested.") print("Use one of [LGTM, CLI].") sys.exit(0) # Setup csv error writer status_writer.setup_csv_writer(args.csvout) # Load meta info def load(fname): with open(fname, 'rb') if fname != '-' else sys.stdin as fp: try: content = json.load(fp) except json.decoder.JSONDecodeError as err: logging.error('Error reading from {}: {}: line {}, column {}' .format(fname, err.msg, err.lineno, err.colno)) status_writer.file_load_error["sarif_file"] = fname status_writer.csv_write(status_writer.file_load_error) sys.exit(1) return content scan_spec = load(args.file) sarif_struct = load(scan_spec['sarif_file_name']) if args.with_timestamps: t1 = load(scan_spec['timestamp_file_name']) # TODO Remove this kludge for wrong keywords. timestamps = { **t1, "scan_start_date" : t1["scan_start"], "scan_stop_date" : t1["scan_stop"], } else: timestamps = { "db_create_start" : pd.Timestamp(0.0, unit='s'), "db_create_stop" : pd.Timestamp(0.0, unit='s'), "scan_start_date" : pd.Timestamp(0.0, unit='s'), "scan_stop_date" : pd.Timestamp(0.0, unit='s'), } status_writer.setup_status_filenames(scan_spec['sarif_file_name']) # # Preprocess raw SARIF to get smaller signature # context = signature.Context( { "string" : "String", "int" : "Int", "bool" : "Bool" } ) sarif_struct = signature.fillsig(args, sarif_struct, context) # # Setup which signature to use if args.input_signature == "LGTM": signature_to_use = signature_single.struct_graph_LGTM start_node = signature_single.start_node_LGTM else: #signature_to_use = signature_single.struct_graph_CLI signature_to_use = signature_single_CLI.struct_graph_CLI start_node = signature_single_CLI.start_node_CLI # # Use reference type graph (signature) to traverse sarif and attach values to tables try: tgraph = typegraph.Typegraph(signature_to_use) typegraph.destructure(tgraph, start_node, sarif_struct) except Exception: # will have gathered errors/warnings status_writer.csv_write_warnings() #pass the exception up to be put into log by runner raise # # Form output tables # typegraph.attach_tables(tgraph) # # Dataframe / table collection # @dataclass class BaseTables: artifacts : pd.DataFrame codeflows : pd.DataFrame kind_pathproblem : pd.DataFrame kind_problem : pd.DataFrame project : pd.DataFrame relatedLocations : pd.DataFrame rules : pd.DataFrame columns_to_reindex : dict # (name -> name list) dict def __init__(self): pass bt = BaseTables() @dataclass class ScanTables: # project: External table with project information scans : pd.DataFrame results : pd.DataFrame projects : pd.DataFrame columns_to_reindex : dict # (name -> name list) dict def __init__(self): pass scantabs = ScanTables() @dataclass class ExternalInfo: project_id: pd.UInt64Dtype() scan_id : pd.UInt64Dtype() sarif_file_name : str external_info = ExternalInfo( pd.NA, scan_spec["scan_id"], scan_spec["sarif_file_name"] ) # # Add dataframes for base tables # # (relies on some specifics of the sigature type) if args.input_signature == "LGTM": tj = tj else: tj = tj_CLI try: location_info = tj.joins_for_location_info(tgraph) af_0350_location = tj.joins_for_af_0350_location(tgraph) bt.artifacts = tj.joins_for_artifacts(tgraph) bt.codeflows = tj.joins_for_codeflows(tgraph, location_info) bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location) bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location) bt.project = tj.joins_for_project_single(tgraph) bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, location_info) bt.rules = tj.joins_for_rules(tgraph) except Exception: #possible warnings accumulated status_writer.csv_write_warnings() raise # # Setup rest of basetables # bt.columns_to_reindex = { # template from {field.name : [''] for field in dc.fields(bt)} 'artifacts': ['artifacts_id'], 'codeflows': ['codeflow_id'], 'kind_pathproblem': ['results_array_id', 'codeFlows_id'], 'kind_problem': ['results_array_id'], 'project': ['artifacts', 'results', 'rules'], 'relatedLocations': ['struct_id'], 'rules': ['rules_array_id']} scantabs.columns_to_reindex = { 'scans': [], 'projects' : [], 'results': ['codeFlow_id'], } # # Form scan tables # # joins for projects has to happen first as it backfills the guess about the project_id scantabs.projects = st.joins_for_projects(bt, external_info) scantabs.results = st.joins_for_results(bt, external_info) scantabs.scans = \ st.joins_for_scans(bt, external_info, scantabs, args.input_signature, timestamps) # # Replace the remaining internal ids with snowflake ids # flakegen = snowflake_id.Snowflake(0) _id_to_flake = {} def _get_flake(id): flake = _id_to_flake.get(id, -1) if flake == -1: flake = flakegen.next() _id_to_flake[id] = flake return flake # # Cleaner, but makes far too many copies; keep the loop below # # def _reindex(table, colname): # newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True) # for i in range(0, len(newtable)): # newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname]) # return newtable # # for field in dc.fields(bt): # table_name = field.name # for colname in columns_to_reindex[table_name]: # setattr(bt, field.name, _reindex(getattr(bt, field.name), colname)) # def _replace_ids(tables_dataclass): tdc = tables_dataclass for field in dc.fields(tdc): if field.type != pd.DataFrame: continue table_name = field.name table = getattr(tdc, field.name) # Turn all snowflake columns into uint64 and reset indexing to 0..len(table) newtable = table.astype( { colname : 'uint64' for colname in tdc.columns_to_reindex[table_name]} ).reset_index(drop=True) # Swap ids for flakes for colname in tdc.columns_to_reindex[table_name]: for i in range(0, len(newtable)): oid = newtable.loc[i, colname] if oid in [0,-1]: # Ignore special values continue newtable.loc[i, colname] = _get_flake(oid) # Replace the table setattr(tdc, field.name, newtable) # Replace id()s of the base and derived tables _replace_ids(bt) _replace_ids(scantabs) # # Write output # p = pathlib.Path(args.outdir) p.mkdir(exist_ok=True) def write(path, frame): with p.joinpath(path + ".csv").open(mode='wb') as fh: frame.to_csv(fh, index=False, columns=columns.columns[path] , quoting=csv.QUOTE_NONNUMERIC) def _write_dataframes_of(tables_dataclass): for field in dc.fields(tables_dataclass): if field.type != pd.DataFrame: continue table = getattr(tables_dataclass, field.name) write(field.name, table) # Write sarif-based tables if args.write_raw_tables: _write_dataframes_of(bt) # Write derived tables and codeflows _write_dataframes_of(scantabs) write('codeflows', bt.codeflows) status_writer.warning_set["success"]+=1 status_writer.csv_write_warnings()