#!/usr/bin/env python3 """Traverse the org/project.scantables/ directories produced by ./sarif-extract-scans-runner and concatenate the collection of individual tables (codeflows.csv results.csv scans.csv projects.csv) into 4 large tables. """ from copy import deepcopy from datetime import datetime import argparse import csv import numpy import os import pandas as pd import sys from sarif_cli import scan_tables from sarif_cli import table_joins # # TODO: Factor out functionality / structures in common with # ./sarif-extract-scans-runner # # # Handle arguments # parser = argparse.ArgumentParser(description='Run sarif-extract-scans over a directory hierarchy') parser.add_argument('sarif_files', metavar='sarif-files', type=str, help='File containing list of sarif files, use - for stdin') parser.add_argument('aggregate_dir', metavar='aggregate-dir', type=str, help='Directory for writing the combined scan tables') parser.add_argument('-in', '--in-dir', metavar='input-dir', type=str, default="", help='Directory containing input set of results (corresponds to --outdir on the runner if supplied') parser.add_argument('-m', '--max-files', metavar='M', type=int, default=100000, help='Maximum number of files to process.' ' Default: %(default)d') parser.add_argument('-i', '--update-interval', metavar='N', type=int, default=100, help='Update status after processing N files.' ' Default: %(default)d') parser.add_argument('--doc', dest='fulldoc', default=False, action='store_true', help='Print full documentation for this script') # Avoid argparse error when only --doc is given if len(sys.argv) == 2 and sys.argv[1] == '--doc': print(__doc__) sys.exit(0) args = parser.parse_args() # # Utilities # _extract_scans_tables = { "scans" : [], "results" : [], "projects" : [], "codeflows" : [], } _table_output_dtypes = { "scans" : scan_tables.ScanTablesTypes.scans, "results" : scan_tables.ScanTablesTypes.results, "projects" : scan_tables.ScanTablesTypes.projects, "codeflows" : table_joins.BaseTablesTypes.codeflows, } # Accomodate special dtype cases for parsing to avoid # # TypeError: the dtype datetime64 is not supported for parsing, pass this # column using parse_dates instead # _parse_dates = { "scans" : [], "results" : [], "projects" : [], "codeflows" : [], } # Prep for in-place modification, use copies of original module values _table_input_dtypes = { key: deepcopy(val) for key, val in _table_output_dtypes.items()} # Replace datetime64 with str and track the affected columns for tab_name, tab_dtypes in _table_input_dtypes.items(): for col_key, col_dtype in tab_dtypes.items(): # Let pandas parse datetime64 as str, then convert to date if col_dtype == numpy.dtype('M'): # Note: pd.StringDtype() here will cause parsing failure later tab_dtypes[col_key] = str _parse_dates[tab_name].append(col_key) def _all_csv_files_exist(output_dir): for file_prefix in _extract_scans_tables.keys(): csv_fname = os.path.join(output_dir, file_prefix + ".csv") if not os.path.exists(csv_fname): return False return True # # Prepare output directory first; can't really run without it # try: os.mkdir(args.aggregate_dir, mode=0o755) except FileExistsError: pass # # If specific input dir specified - format that # if args.in_dir != "": args.in_dir+="/" # # Collect sarif file information # with open(args.sarif_files, 'r') if args.sarif_files != '-' else sys.stdin as fp: paths = fp.readlines() # # Traverse all possible scantable-containing directories # count = -1 for path in paths: count += 1 if count > args.max_files: break # # Paths and components # path = path.rstrip() # # Validate input data directory and content # output_dir = os.path.join(args.in_dir+ path + ".scantables") if not os.path.exists(output_dir): continue if not _all_csv_files_exist(output_dir): continue # # Append data for every table # for file_prefix in _extract_scans_tables.keys(): csv_fname = os.path.join(output_dir, file_prefix + ".csv") data = pd.read_csv(csv_fname, dtype = _table_input_dtypes[file_prefix], parse_dates = _parse_dates[file_prefix]) _extract_scans_tables[file_prefix].append(data) # Some timing information if count % args.update_interval == 0: print("{:6} {:6}/{:6}".format("COUNT", count, len(paths))) print("{:6} {}".format("DATE", datetime.now().isoformat())) sys.stdout.flush() # # Create and write the combined dataframes # for file_prefix in _extract_scans_tables.keys(): all = (pd.concat(_extract_scans_tables[file_prefix], ignore_index=True, axis='index') .astype(_table_output_dtypes[file_prefix]).reset_index(drop=True)) with open(os.path.join(args.aggregate_dir, file_prefix + ".csv"), 'w') as fh: all.to_csv(fh, index=False, quoting=csv.QUOTE_NONNUMERIC)