Add error handling csv writer

writer generates status csv per sarif
This commit is contained in:
Kristen Newbury
2022-11-14 13:02:36 -05:00
parent ae4f71e804
commit 066fcb8248
5 changed files with 129 additions and 4 deletions

View File

@@ -7,8 +7,8 @@ import logging
import numpy
import pandas as pd
import re
import sys
from sarif_cli import hash
from sarif_cli import status_writer
class ZeroResults(Exception):
pass
@@ -168,6 +168,7 @@ def joins_for_results(basetables, external_info):
# TODO knewbury to error handling
logging.warning("Zero problem/path_problem results found in sarif "
"file but processing anyway.")
status_writer.csv_write(status_writer.zero_results)
res = tables[0]
# Force all column types to ensure appropriate formatting

View File

@@ -0,0 +1,98 @@
# csv status reporting
import csv
fieldnames = ['sarif_file', 'level', 'message', "extra_info"]
warning_set = {
"success" : 0,
"zero_results" : 0,
"input_sarif_missing" : 0
}
#
# Setup csv status writer
#
def setup_csv_writer(filename):
with open(filename+'.csv', 'w', newline='') as file:
# global in module as singleton alt
global global_filename
global_filename = filename
csv_writer = csv.DictWriter(file, fieldnames)
csv_writer.writeheader()
#
# csv status write - one line for errors
#
def csv_write(data):
with open(global_filename+'.csv', 'a', newline='') as file:
csv_writer = csv.DictWriter(file, fieldnames)
csv_writer.writerow(data)
#
# csv status write - all at once for type of warnings that can
# happen multiple times
# and want success message last
#
def csv_write_warnings():
with open(global_filename+'.csv', 'a', newline='') as file:
csv_writer = csv.DictWriter(file, fieldnames)
if warning_set["input_sarif_missing"] != 0:
csv_writer.writerow(input_sarif_missing)
#reset in case later different types of warnings can be accumulated
input_sarif_missing["extra_info"] = "Missing: "
warning_set["input_sarif_missing"] = 0
if warning_set["success"] != 0:
csv_writer.writerow(success)
def setup_status_filenames(sarif_file_name):
success["sarif_file"] = sarif_file_name
zero_results["sarif_file"] = sarif_file_name
input_sarif_extra["sarif_file"] = sarif_file_name
input_sarif_missing["sarif_file"] = sarif_file_name
unknown_sarif_parsing_shape["sarif_file"] = sarif_file_name
unknown["sarif_file"] = sarif_file_name
success = {
"sarif_file": "",
"level": "SUCCESS",
"message": "File successfully processed."
}
zero_results = {
"sarif_file": "",
"level": "WARNING",
"message": "Zero results seen in sarif file."
}
input_sarif_missing = {
"sarif_file": "",
"level": "WARNING",
"message": "Input sarif is missing neccesary properties.",
"extra_info" : "Missing: "
}
# file load error can happen on either sarif file or scan-spec.json
file_load_error = {
"file": "",
"level": "ERROR",
"message": "Could not load file."
}
input_sarif_extra = {
"sarif_file": "",
"level": "ERROR",
"message": "Input sarif contains extra unneccesary properties."
}
unknown_sarif_parsing_shape = {
"sarif_file": "",
"level": "ERROR",
"message": "Error matching expected sarif format to actual input sarif shape.",
"extra_info" : ""
}
unknown = {
"sarif_file": "",
"level": "ERROR",
"message": "Error details currently undiagnosed. Assess log file for more information."
}

View File

@@ -11,6 +11,7 @@ from dataclasses import dataclass
import logging
from typing import Any, Dict, List, Tuple, Union
import pandas as pd
from sarif_cli import status_writer
#
# Utility classes
@@ -112,6 +113,7 @@ def destructure(typegraph: Typegraph, node: NodeId, tree: Tree):
elif t in [str, int, bool]:
pass
else:
# TODO knewbury error handling
raise Exception("Unhandled type: %s" % t)
def _destructure_dict_1(typegraph, node, tree):
@@ -137,6 +139,7 @@ def _destructure_dict_1(typegraph, node, tree):
# Sanity check
sig = typegraph.signature_graph[node]
if type(sig) != tuple:
# TODO knewbury error handling
raise SignatureMismatch()
# Destructure this dictionary
@@ -157,7 +160,7 @@ def _destructure_dict(typegraph: Typegraph, node, tree):
type_fields = typegraph.fields[node]
if tree_fields == type_fields:
_destructure_dict_1(typegraph, node, tree)
# TODO knewbury error handling here
elif set(tree_fields).issuperset(set(type_fields)):
# Log a warning
# log.warning("XX: Tree has unrecognized fields")
@@ -165,9 +168,15 @@ def _destructure_dict(typegraph: Typegraph, node, tree):
'known entries: {}'.format(tree))
logging.warning('tree fields: {}'.format(sorted(tree_fields)))
logging.warning('type fields: {}'.format(sorted(type_fields)))
status_writer.csv_write(status_writer.input_sarif_extra)
_destructure_dict_1(typegraph, node, tree)
elif set(tree_fields).issubset(set(type_fields)):
# create a string list of the missing expected properties from the sarif
specific_missing = f"{set(type_fields) - set(tree_fields)}, "
if specific_missing not in status_writer.input_sarif_missing["extra_info"]:
status_writer.input_sarif_missing["extra_info"] += specific_missing
status_writer.warning_set["input_sarif_missing"]+=1
raise MissingFieldException(
f"(Sub)tree is missing fields required by typedef.\n"
f"Expected {type_fields}, found {tree_fields}.\n"
@@ -177,6 +186,9 @@ def _destructure_dict(typegraph: Typegraph, node, tree):
)
else:
# TODO knewbury error handling
status_writer.unknown_sarif_parsing_shape["extra_info"] = "type fields {} do not match tree fields {}.".format(type_fields, tree_fields)
status_writer.csv_write(status_writer.unknown_sarif_parsing_shape)
raise Exception("typegraph: unhandled case reached: cannot match type "
"fields {} to tree fields {}. Data is invalid."
.format(type_fields, tree_fields))
@@ -243,6 +255,7 @@ def _destructure_list(typegraph, node: str, tree: List):
id(value)))
# Next `value` on success
break
# status reporting under this handled already in each case
except MissingFieldException:
# Re-raise if last available signature failed, otherwise try
# next `signature`