mirror of
https://github.com/hohn/sarif-cli.git
synced 2025-12-16 17:23:03 +01:00
WIP: sarif-extract-scans: back to single sarif file handling, incorporate multi-file libraries
This commit is contained in:
committed by
=Michael Hohn
parent
675a5a4008
commit
b212423907
174
bin/sarif-extract-scans
Executable file
174
bin/sarif-extract-scans
Executable file
@@ -0,0 +1,174 @@
|
||||
#!/usr/bin/env python
|
||||
""" Extract scan data from multiple sarif files in table form.
|
||||
"""
|
||||
from dataclasses import dataclass
|
||||
from sarif_cli import signature, signature_single
|
||||
from sarif_cli import typegraph
|
||||
from sarif_cli import snowflake_id
|
||||
import argparse
|
||||
import dataclasses as dc
|
||||
import json
|
||||
import logging
|
||||
import pandas as pd
|
||||
import pathlib
|
||||
import sarif_cli.table_joins as tj
|
||||
import sarif_cli.derived_joins as derived
|
||||
import sys
|
||||
|
||||
#
|
||||
# Configure logger
|
||||
#
|
||||
logging.basicConfig(format='%(asctime)s %(message)s')
|
||||
|
||||
#
|
||||
# Start processing
|
||||
#
|
||||
parser = argparse.ArgumentParser(description='Read a collection of sarif files and produce tabular output.')
|
||||
parser.add_argument('file', metavar='scan-spec.json', type=str,
|
||||
help="json file containing required external scan information.")
|
||||
parser.add_argument('outdir', metavar='output-dir', type=str, help='output directory')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load meta info
|
||||
def load(fname):
|
||||
with open(fname, 'rb') if fname != '-' else sys.stdin as fp:
|
||||
try:
|
||||
content = json.load(fp)
|
||||
except json.decoder.JSONDecodeError as err:
|
||||
logging.error('Error reading from {}: {}: line {}, column {}'
|
||||
.format(args.file, err.msg, err.lineno, err.colno))
|
||||
sys.exit(1)
|
||||
return content
|
||||
|
||||
scan_spec = load(args.file)
|
||||
sarif_struct = load(scan_spec['sarif_file_name'])
|
||||
|
||||
#
|
||||
# Preprocess raw SARIF to get smaller signature
|
||||
#
|
||||
context = signature.Context(
|
||||
{
|
||||
"string" : "String",
|
||||
"int" : "Int",
|
||||
"bool" : "Bool"
|
||||
}
|
||||
)
|
||||
sarif_struct = signature.fillsig(args, sarif_struct, context)
|
||||
|
||||
#
|
||||
# Use reference type graph (signature) to traverse sarif and attach values to tables
|
||||
#
|
||||
tgraph = typegraph.Typegraph(signature_single.struct_graph_2022_02_01)
|
||||
typegraph.destructure(tgraph, signature_single.start_node_2022_02_01, sarif_struct)
|
||||
#
|
||||
# Form output tables
|
||||
#
|
||||
typegraph.attach_tables(tgraph)
|
||||
|
||||
#
|
||||
# Dataframe / table collection
|
||||
#
|
||||
@dataclass
|
||||
class BaseTables:
|
||||
artifacts : pd.DataFrame
|
||||
codeflows : pd.DataFrame
|
||||
kind_pathproblem : pd.DataFrame
|
||||
kind_problem : pd.DataFrame
|
||||
project : pd.DataFrame
|
||||
relatedLocations : pd.DataFrame
|
||||
rules : pd.DataFrame
|
||||
def __init__(self): pass
|
||||
bt = BaseTables()
|
||||
|
||||
@dataclass
|
||||
class ScanTables:
|
||||
# project: External table with project information
|
||||
scans : pd.DataFrame
|
||||
results : pd.DataFrame
|
||||
def __init__(self): pass
|
||||
scantabs = ScanTables()
|
||||
|
||||
#
|
||||
# Add dataframes for base tables
|
||||
#
|
||||
sf_2683 = tj.joins_for_sf_2683(tgraph)
|
||||
af_0350_location = tj.joins_for_af_0350_location(tgraph)
|
||||
bt.artifacts = tj.joins_for_artifacts(tgraph)
|
||||
bt.codeflows = tj.joins_for_codeflows(tgraph, sf_2683)
|
||||
bt.kind_pathproblem = tj.joins_for_path_problem(tgraph, af_0350_location)
|
||||
bt.kind_problem = tj.joins_for_problem(tgraph, af_0350_location)
|
||||
bt.project = tj.joins_for_project_single(tgraph)
|
||||
bt.relatedLocations = tj.joins_for_relatedLocations(tgraph, sf_2683)
|
||||
bt.rules = tj.joins_for_rules(tgraph)
|
||||
|
||||
#
|
||||
# Form derived query tables
|
||||
#
|
||||
# XX
|
||||
# scantabs.project = derived.joins_for_project(bt)
|
||||
# scantabs.scans = derived.joins_for_scans(bt)
|
||||
# scantabs.results = derived.joins_for_results(bt)
|
||||
|
||||
|
||||
#
|
||||
# Replace the remaining internal ids with snowflake ids
|
||||
#
|
||||
flakegen = snowflake_id.Snowflake(0)
|
||||
|
||||
columns_to_reindex = {
|
||||
# template from {field.name : [''] for field in dc.fields(bt)}
|
||||
'artifacts': ['artifacts_id'],
|
||||
'codeflows': ['codeflow_id'],
|
||||
'kind_pathproblem': ['results_array_id', 'codeFlows_id'],
|
||||
'kind_problem': ['results_array_id'],
|
||||
'project': ['artifacts', 'results', 'rules'],
|
||||
'relatedLocations': ['struct_id'],
|
||||
'rules': ['rules_array_id']}
|
||||
|
||||
_id_to_flake = {}
|
||||
def _get_flake(id):
|
||||
flake = _id_to_flake.get(id, -1)
|
||||
if flake == -1:
|
||||
flake = flakegen.next()
|
||||
_id_to_flake[id] = flake
|
||||
return flake
|
||||
|
||||
#
|
||||
# Cleaner, but makes far too many copies; keep the loop below
|
||||
#
|
||||
# def _reindex(table, colname):
|
||||
# newtable = table.astype({ colname : 'uint64'}).reset_index(drop=True)
|
||||
# for i in range(0, len(newtable)):
|
||||
# newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
|
||||
# return newtable
|
||||
#
|
||||
# for field in dc.fields(bt):
|
||||
# table_name = field.name
|
||||
# for colname in columns_to_reindex[table_name]:
|
||||
# setattr(bt, field.name, _reindex(getattr(bt, field.name), colname))
|
||||
#
|
||||
for field in dc.fields(bt):
|
||||
table_name = field.name
|
||||
table = getattr(bt, field.name)
|
||||
# Turn all snowflake columns into uint64 and reset indexing to 0..len(table)
|
||||
newtable = table.astype(
|
||||
{ colname : 'uint64'
|
||||
for colname in columns_to_reindex[table_name]}
|
||||
).reset_index(drop=True)
|
||||
# Swap ids for flakes
|
||||
for colname in columns_to_reindex[table_name]:
|
||||
for i in range(0, len(newtable)):
|
||||
newtable.loc[i, colname] = _get_flake(newtable.loc[i, colname])
|
||||
# Replace the table
|
||||
setattr(bt, field.name, newtable)
|
||||
#
|
||||
# Write output
|
||||
#
|
||||
p = pathlib.Path(args.outdir)
|
||||
p.mkdir(exist_ok=True)
|
||||
def write(path, frame):
|
||||
with p.joinpath(path + ".csv").open(mode='wb') as fh:
|
||||
frame.to_csv(fh, index=False)
|
||||
for field in dc.fields(bt):
|
||||
table = getattr(bt, field.name)
|
||||
write(field.name, table)
|
||||
Reference in New Issue
Block a user