Files
sarif-cli/bin/sarif-extract-scans
Michael Hohn ee11214aee Add support for external timestamps
This allows external files containing

    timestamps = {
        "db_create_start"      : pd.Timestamp(0.0, unit='s'),
        "db_create_stop"       : pd.Timestamp(0.0, unit='s'),
        "scan_start_date"      : pd.Timestamp(0.0, unit='s'),
        "scan_stop_date"       : pd.Timestamp(0.0, unit='s'),
    }

to be used to provide those values, instead of the above defaults.

This patch changes the top-level scripts
        bin/sarif-extract-scans
        bin/sarif-extract-scans-runner
and provides
        scripts/test-timestamps.sh
for verification.

The following keys are also accepted:
    {
      "db_create_start": ...,
      "db_create_stop": ...,
      "scan_start": ...
      "scan_stop": ...
    }
2023-08-18 17:06:58 -07:00

305 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python
""" Extract scan data from multiple sarif files in table form.
"""
from dataclasses import dataclass
from sarif_cli import signature, signature_single, signature_single_CLI
from sarif_cli import typegraph
from sarif_cli import snowflake_id
from sarif_cli import status_writer
import argparse
import csv
import dataclasses as dc
import json
import logging
import pandas as pd
import pathlib
import sarif_cli.table_joins as tj
import sarif_cli.table_joins_CLI as tj_CLI
import sarif_cli.scan_tables as st
from sarif_cli import columns
import sys
#
# Configure logger
#
logging.basicConfig(format='%(asctime)s %(message)s')
#
# Start processing
#
parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.')
parser.add_argument('file', metavar='scan-spec.json', type=str,
help="json file containing required external scan information.")
parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory')
parser.add_argument('csvout', metavar='csv-outfile', type=str, help='processing status csv output file name to use')
parser.add_argument('-r', '--write-raw-tables', action="store_true",
help='Write the raw sarif tables to the output directory')
parser.add_argument('-t', '--with-timestamps', action='store_true',
help='Read name of files containing timestamp information '
'from the scan-spec.json file. '
'The file format changes from '
'e.g., '
'{"scan_id": 15092319597255524458, "sarif_file_name": "sqlidb-0.1.sarif"} '
'to '
'{"scan_id": 15092319597255524458, "sarif_file_name": "sqlidb-0.1.sarif", timestamp_file_name: "sqlidb-0.1.timestamps"}'
)
parser.add_argument('-f','--input-signature', metavar='input-signature', type=str, default="CLI",
help='Signature of the sarif, as in, where it was generated it may affect the signature.\n'
'Options: LGTM, CLI\n'
'If current represented signatures are not sufficient, view signature_single.py for how to support further signatures.'
' Default: "%(default)s"')
args = parser.parse_args()
if args.input_signature not in ["LGTM","CLI"]:
print("Unsupported sarif signature requested.")
print("Use one of [LGTM, CLI].")
sys.exit(0)
# Setup csv error writer
status_writer.setup_csv_writer(args.csvout)
# Load meta info
def load(fname):
with open(fname, 'rb') if fname != '-' else sys.stdin as fp:
try:
content = json.load(fp)
except json.decoder.JSONDecodeError as err:
logging.error('Error reading from {}: {}: line {}, column {}'
.format(fname, err.msg, err.lineno, err.colno))
status_writer.file_load_error["sarif_file"] = fname
status_writer.csv_write(status_writer.file_load_error)
sys.exit(1)
return content
scan_spec = load(args.file)
sarif_struct = load(scan_spec['sarif_file_name'])
if args.with_timestamps:
t1 = load(scan_spec['timestamp_file_name'])
# TODO Remove this kludge for wrong keywords.
timestamps = {
**t1,
"scan_start_date" : t1["scan_start"],
"scan_stop_date" : t1["scan_stop"],
}
else:
timestamps = {
"db_create_start" : pd.Timestamp(0.0, unit='s'),
"db_create_stop" : pd.Timestamp(0.0, unit='s'),
"scan_start_date" : pd.Timestamp(0.0, unit='s'),
"scan_stop_date" : pd.Timestamp(0.0, unit='s'),
}
status_writer.setup_status_filenames(scan_spec['sarif_file_name'])
#
# Preprocess raw SARIF to get smaller signature
#
context = signature.Context(
{
"string" : "String",
"int" : "Int",
"bool" : "Bool"
}
)
sarif_struct = signature.fillsig(args, sarif_struct, context)
#
# Setup which signature to use
if args.input_signature == "LGTM":
signature_to_use = signature_single.struct_graph_LGTM
start_node = signature_single.start_node_LGTM
else:
#signature_to_use = signature_single.struct_graph_CLI
signature_to_use = signature_single_CLI.struct_graph_CLI
start_node = signature_single_CLI.start_node_CLI
#
# Use reference type graph (signature) to traverse sarif and attach values to tables
try:
tgraph = typegraph.Typegraph(signature_to_use)
typegraph.destructure(tgraph, start_node, sarif_struct)
except Exception:
# will have gathered errors/warnings
status_writer.csv_write_warnings()
#pass the exception up to be put into log by runner
raise
#
# Form output tables
#
typegraph.attach_tables(tgraph)
#
# Dataframe / table collection
#
@dataclass
class BaseTables:
artifacts : pd.DataFrame
codeflows : pd.DataFrame
kind_pathproblem : pd.DataFrame
kind_problem : pd.DataFrame
project : pd.DataFrame
relatedLocations : pd.DataFrame
rules : pd.DataFrame
columns_to_reindex : dict # (name -> name list) dict
def __init__(self): pass
bt = BaseTables()
@dataclass
class ScanTables:
# project: External table with project information
scans : pd.DataFrame
results : pd.DataFrame
projects : pd.DataFrame
columns_to_reindex : dict # (name -> name list) dict
def __init__(self): pass
scantabs = ScanTables()
@dataclass
class ExternalInfo:
project_id: pd.UInt64Dtype()
scan_id : pd.UInt64Dtype()
sarif_file_name : str
external_info = ExternalInfo(
pd.NA,
scan_spec["scan_id"],
scan_spec["sarif_file_name"]
)
#
# Add dataframes for base tables
#
# (relies on some specifics of the sigature type)
if args.input_signature == "LGTM":
tj = tj
else:
tj = tj_CLI
try:
location_info = tj.joins_for_location_info(tgraph)
af_0350_location = tj.joins_for_af_0350_location(tgraph)
bt.artifacts = tj.joins_for_artifacts(tgraph)
bt.codeflows = tj.joins_for_codeflows(tgraph, location_info)
bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location)
bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location)
bt.project = tj.joins_for_project_single(tgraph)
bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, location_info)
bt.rules = tj.joins_for_rules(tgraph)
except Exception:
#possible warnings accumulated
status_writer.csv_write_warnings()
raise
#
# Setup rest of basetables
#
bt.columns_to_reindex = {
# template from {field.name : [''] for field in dc.fields(bt)}
'artifacts': ['artifacts_id'],
'codeflows': ['codeflow_id'],
'kind_pathproblem': ['results_array_id', 'codeFlows_id'],
'kind_problem': ['results_array_id'],
'project': ['artifacts', 'results', 'rules'],
'relatedLocations': ['struct_id'],
'rules': ['rules_array_id']}
scantabs.columns_to_reindex = {
'scans': [],
'projects' : [],
'results': ['codeFlow_id'],
}
#
# Form scan tables
#
# joins for projects has to happen first as it backfills the guess about the project_id
scantabs.projects = st.joins_for_projects(bt, external_info)
scantabs.results = st.joins_for_results(bt, external_info)
scantabs.scans = \
st.joins_for_scans(bt, external_info, scantabs,
args.input_signature, timestamps)
#
# Replace the remaining internal ids with snowflake ids
#
flakegen = snowflake_id.Snowflake(0)
_id_to_flake = {}
def _get_flake(id):
flake = _id_to_flake.get(id, -1)
if flake == -1:
flake = flakegen.next()
_id_to_flake[id] = flake
return flake
#
# Cleaner, but makes far too many copies; keep the loop below
#
# def _reindex(table, colname):
# newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True)
# for i in range(0, len(newtable)):
# newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
# return newtable
#
# for field in dc.fields(bt):
# table_name = field.name
# for colname in columns_to_reindex[table_name]:
# setattr(bt, field.name, _reindex(getattr(bt, field.name), colname))
#
def _replace_ids(tables_dataclass):
tdc = tables_dataclass
for field in dc.fields(tdc):
if field.type != pd.DataFrame:
continue
table_name = field.name
table = getattr(tdc, field.name)
# Turn all snowflake columns into uint64 and reset indexing to 0..len(table)
newtable = table.astype(
{ colname : 'uint64'
for colname in tdc.columns_to_reindex[table_name]}
).reset_index(drop=True)
# Swap ids for flakes
for colname in tdc.columns_to_reindex[table_name]:
for i in range(0, len(newtable)):
oid = newtable.loc[i, colname]
if oid in [0,-1]:
# Ignore special values
continue
newtable.loc[i, colname] = _get_flake(oid)
# Replace the table
setattr(tdc, field.name, newtable)
# Replace id()s of the base and derived tables
_replace_ids(bt)
_replace_ids(scantabs)
#
# Write output
#
p = pathlib.Path(args.outdir)
p.mkdir(exist_ok=True)
def write(path, frame):
with p.joinpath(path + ".csv").open(mode='wb') as fh:
frame.to_csv(fh, index=False, columns=columns.columns[path] , quoting=csv.QUOTE_NONNUMERIC)
def _write_dataframes_of(tables_dataclass):
for field in dc.fields(tables_dataclass):
if field.type != pd.DataFrame:
continue
table = getattr(tables_dataclass, field.name)
write(field.name, table)
# Write sarif-based tables
if args.write_raw_tables:
_write_dataframes_of(bt)
# Write derived tables and codeflows
_write_dataframes_of(scantabs)
write('codeflows', bt.codeflows)
status_writer.warning_set["success"]+=1
status_writer.csv_write_warnings()