mirror of
https://github.com/hohn/sarif-cli.git
synced 2025-12-16 17:23:03 +01:00
prev missing cols: source_location and sink_location missing from prev patch but were missing in problem only results case anyways
337 lines
14 KiB
Python
337 lines
14 KiB
Python
""" Collection of joins for the derived tables
|
|
|
|
"""
|
|
from . import snowflake_id
|
|
|
|
import logging
|
|
import numpy
|
|
import pandas as pd
|
|
import re
|
|
from sarif_cli import hash
|
|
from sarif_cli import status_writer
|
|
|
|
class ZeroResults(Exception):
|
|
pass
|
|
|
|
#
|
|
# Column types for scan-related pandas tables
|
|
#
|
|
class ScanTablesTypes:
|
|
scans = {
|
|
"id" : pd.UInt64Dtype(),
|
|
"commit_id" : pd.StringDtype(),
|
|
"project_id" : pd.UInt64Dtype(),
|
|
"db_create_start" : numpy.dtype('M'),
|
|
"db_create_stop" : numpy.dtype('M'),
|
|
"scan_start_date" : numpy.dtype('M'),
|
|
"scan_stop_date" : numpy.dtype('M'),
|
|
"tool_name" : pd.StringDtype(),
|
|
"tool_version" : pd.StringDtype(),
|
|
"tool_query_commit_id" : pd.StringDtype(),
|
|
"sarif_file_name" : pd.StringDtype(),
|
|
"results_count" : pd.Int64Dtype(),
|
|
"rules_count" : pd.Int64Dtype(),
|
|
}
|
|
results = {
|
|
'id' : pd.UInt64Dtype(),
|
|
'scan_id' : pd.UInt64Dtype(),
|
|
'query_id' : pd.StringDtype(),
|
|
'query_kind' : pd.StringDtype(),
|
|
'query_precision' : pd.StringDtype(),
|
|
'query_severity' : pd.StringDtype(),
|
|
'query_tags' : pd.StringDtype(),
|
|
|
|
'codeFlow_id' : pd.UInt64Dtype(),
|
|
|
|
'message' : pd.StringDtype(),
|
|
'message_object' : numpy.dtype('O'),
|
|
'location' : pd.StringDtype(),
|
|
|
|
'source_location' : pd.StringDtype(),
|
|
'source_startLine' : pd.Int64Dtype(),
|
|
'source_startCol' : pd.Int64Dtype(),
|
|
'source_endLine' : pd.Int64Dtype(),
|
|
'source_endCol' : pd.Int64Dtype(),
|
|
|
|
'sink_location' : pd.StringDtype(),
|
|
'sink_startLine' : pd.Int64Dtype(),
|
|
'sink_startCol' : pd.Int64Dtype(),
|
|
'sink_endLine' : pd.Int64Dtype(),
|
|
'sink_endCol' : pd.Int64Dtype(),
|
|
|
|
# TODO Find high-level info from query name or tags?
|
|
'source_object' : numpy.dtype('O'),
|
|
'sink_object' : numpy.dtype('O'),
|
|
}
|
|
projects = {
|
|
"id" : pd.UInt64Dtype(),
|
|
"project_name" : pd.StringDtype(),
|
|
"creation_date" : numpy.dtype('M'),
|
|
"repo_url" : pd.StringDtype(),
|
|
"primary_language" : pd.StringDtype(),
|
|
"languages_analyzed" : pd.StringDtype(),
|
|
}
|
|
|
|
#
|
|
# Projects table
|
|
#
|
|
def joins_for_projects(basetables, external_info):
|
|
"""
|
|
Form the 'projects' table for the ScanTables dataclass
|
|
"""
|
|
b = basetables; e = external_info
|
|
|
|
extra = ""
|
|
# if the sarif does have automationDetails
|
|
if "automationDetails" in b.project:
|
|
extra = b.project.automationDetails[0]
|
|
# if the sarif does have versionControlProvenance
|
|
if "repositoryUri" in b.project:
|
|
repoUri = b.project.repositoryUri[0]
|
|
e.project_id = hash.hash_unique((repoUri+extra).encode())
|
|
else:
|
|
repoUri = "unknown"
|
|
|
|
res = pd.DataFrame(data={
|
|
"id" : e.project_id,
|
|
"project_name" : repoUri,
|
|
"creation_date" : pd.Timestamp(0.0, unit='s'), # TODO: external info
|
|
"repo_url" : repoUri,
|
|
"primary_language" : b.project['semmle.sourceLanguage'][0],
|
|
"languages_analyzed" : ",".join(list(b.project['semmle.sourceLanguage']))
|
|
}, index=[0])
|
|
|
|
# Force all column types to ensure appropriate formatting
|
|
res1 = res.astype(ScanTablesTypes.projects).reset_index(drop=True)
|
|
return res1
|
|
|
|
#
|
|
# Scans table
|
|
#
|
|
def joins_for_scans(basetables, external_info, scantables, sarif_type):
|
|
"""
|
|
Form the `scans` table for the ScanTables dataclass
|
|
"""
|
|
b = basetables; e = external_info
|
|
driver_name = b.project.driver_name.unique()
|
|
assert len(driver_name) == 1, "More than one driver name found for single sarif file."
|
|
driver_version = b.project.driver_version.unique()
|
|
assert len(driver_version) == 1, \
|
|
"More than one driver version found for single sarif file."
|
|
# TODO if commit id exists in external info for CLI gen'd sarif, add?
|
|
if sarif_type == "LGTM":
|
|
commit_id = b.project.revisionId[0]
|
|
else:
|
|
commit_id = "unknown"
|
|
res = pd.DataFrame(data={
|
|
"id" : e.scan_id,
|
|
"commit_id" : commit_id,
|
|
"project_id" : e.project_id,
|
|
# TODO extract real date information from somewhere external
|
|
"db_create_start" : pd.Timestamp(0.0, unit='s'),
|
|
"db_create_stop" : pd.Timestamp(0.0, unit='s'),
|
|
"scan_start_date" : pd.Timestamp(0.0, unit='s'),
|
|
"scan_stop_date" : pd.Timestamp(0.0, unit='s'),
|
|
#
|
|
"tool_name" : driver_name[0],
|
|
"tool_version" : driver_version[0],
|
|
"tool_query_commit_id" : pd.NA,
|
|
"sarif_file_name" : e.sarif_file_name,
|
|
"results_count" : scantables.results.shape[0],
|
|
"rules_count" : len(b.rules['id'].unique()),
|
|
},index=[0])
|
|
# Force all column types to ensure correct writing and type checks on reading.
|
|
res1 = res.astype(ScanTablesTypes.scans).reset_index(drop=True)
|
|
return res1
|
|
|
|
#
|
|
# Results table
|
|
#
|
|
def joins_for_results(basetables, external_info):
|
|
"""
|
|
Form and return the `results` table
|
|
"""
|
|
# Get one table per query_kind, then stack them,
|
|
# problem
|
|
# path-problem
|
|
#
|
|
# Concatenation with an empty table triggers type conversion to float, so don't
|
|
# include empty tables.
|
|
tables = [_results_from_kind_problem(basetables, external_info),
|
|
_results_from_kind_pathproblem(basetables, external_info)]
|
|
stack = [table for table in tables if len(table) > 0]
|
|
|
|
# Concatenation fails without at least one table, so avoid that.
|
|
if len(stack) > 0:
|
|
res = pd.concat(stack)
|
|
else:
|
|
if stack == []:
|
|
logging.warning("Zero problem/path_problem results found in sarif "
|
|
"file but processing anyway.")
|
|
status_writer.csv_write(status_writer.zero_results)
|
|
res = tables[0]
|
|
|
|
# Force all column types to ensure appropriate formatting
|
|
res1 = res.astype(ScanTablesTypes.results).reset_index(drop=True)
|
|
return res1
|
|
|
|
#id as primary key
|
|
def _populate_from_rule_table_code_flow_tag_text(basetable, flowtable):
|
|
val = flowtable.rule_id.values[0]
|
|
return basetable.rules.query("id == @val")["tag_text"].str.cat(sep='_')
|
|
|
|
#id as primary key
|
|
def _populate_from_rule_table_tag_text(basetable, i):
|
|
val = basetable.kind_problem.rule_id[i]
|
|
return basetable.rules.query("id == @val")["tag_text"].str.cat(sep='_')
|
|
|
|
#id as primary key
|
|
def _populate_from_rule_table(column_name, basetable, i):
|
|
val = basetable.kind_problem.rule_id[i]
|
|
return basetable.rules.query("id == @val")[column_name].head(1).item()
|
|
|
|
#id as primary key
|
|
def _populate_from_rule_table_code_flow(column_name, basetable, flowtable):
|
|
val = flowtable.rule_id.values[0]
|
|
return basetable.rules.query("id == @val")[column_name].head(1).item()
|
|
|
|
def _results_from_kind_problem(basetables, external_info):
|
|
b = basetables; e = external_info
|
|
flakegen = snowflake_id.Snowflake(2)
|
|
res = pd.DataFrame(
|
|
data={
|
|
'id': [flakegen.next() for _ in range(len(b.kind_problem))],
|
|
|
|
'scan_id' : e.scan_id,
|
|
'query_id' : b.kind_problem.rule_id,
|
|
'query_kind' : "problem",
|
|
'query_precision' : [_populate_from_rule_table("precision", b, i) for i in range(len(b.kind_problem))],
|
|
'query_severity' : [_populate_from_rule_table("problem.severity", b, i) for i in range(len(b.kind_problem))],
|
|
'query_tags' : [_populate_from_rule_table_tag_text(b, i) for i in range(len(b.kind_problem))],
|
|
'codeFlow_id' : 0, # link to codeflows (kind_pathproblem only, NULL here)
|
|
|
|
'message': b.kind_problem.message_text,
|
|
'message_object' : pd.NA,
|
|
'location': b.kind_problem.location_uri,
|
|
|
|
# for kind_problem, use the same location for source and sink
|
|
'source_location' : pd.NA,
|
|
'source_startLine' : b.kind_problem.location_startLine,
|
|
'source_startCol' : b.kind_problem.location_startColumn,
|
|
'source_endLine' : b.kind_problem.location_endLine,
|
|
'source_endCol' : b.kind_problem.location_endColumn,
|
|
|
|
'sink_location' : pd.NA,
|
|
'sink_startLine' : b.kind_problem.location_startLine,
|
|
'sink_startCol' : b.kind_problem.location_startColumn,
|
|
'sink_endLine' : b.kind_problem.location_endLine,
|
|
'sink_endCol' : b.kind_problem.location_endColumn,
|
|
|
|
'source_object' : pd.NA, # TODO: find high-level info from query name or tags?
|
|
'sink_object' : pd.NA,
|
|
})
|
|
# Force column type(s) to avoid floats in output.
|
|
res1 = res.astype({ 'id' : 'uint64', 'scan_id': 'uint64'}).reset_index(drop=True)
|
|
return res1
|
|
|
|
|
|
def _results_from_kind_pathproblem(basetables, external_info):
|
|
#
|
|
# Only get source and sink, no paths. This implies one codeflow_index and one
|
|
# threadflow_index, no repetitions.
|
|
#
|
|
b = basetables; e = external_info
|
|
flakegen = snowflake_id.Snowflake(3)
|
|
|
|
# The sarif tables have relatedLocation information, which result in multiple
|
|
# results for a single codeFlows_id -- the expression
|
|
# b.kind_pathproblem[b.kind_pathproblem['codeFlows_id'] == cfid0]
|
|
# produces multiple rows.
|
|
#
|
|
# The `result` table has no entry to distinguish these, so we use a simplified
|
|
# version of `kind_pathproblem`.
|
|
|
|
|
|
reduced_kind_pathp = b.kind_pathproblem.drop(
|
|
columns=[
|
|
'relatedLocation_array_index',
|
|
'relatedLocation_endColumn',
|
|
'relatedLocation_endLine',
|
|
'relatedLocation_id',
|
|
'relatedLocation_index',
|
|
'relatedLocation_message',
|
|
'relatedLocation_startColumn',
|
|
'relatedLocation_startLine',
|
|
'relatedLocation_uri',
|
|
'relatedLocation_uriBaseId',
|
|
])
|
|
|
|
# Per codeflow_id taken from b.kind_pathproblem table, it should suffice to
|
|
# take one codeflow_index, one threadflow_index, first and last location_index
|
|
# from the b.codeflows table.
|
|
#
|
|
# To ensure nothing is missed, collect all the entries and then check for
|
|
# unique rows.
|
|
cfids = reduced_kind_pathp['codeFlows_id'].unique()
|
|
|
|
source_sink_coll = []
|
|
for cfid0 in cfids:
|
|
cfid0t0 = b.codeflows[b.codeflows['codeflow_id'] == cfid0]
|
|
cfid0ppt0 = reduced_kind_pathp[reduced_kind_pathp['codeFlows_id'] ==
|
|
cfid0].drop_duplicates()
|
|
assert cfid0ppt0.shape[0] == 1, \
|
|
"Reduced kind_pathproblem table still has multiple entries"
|
|
for cfi0 in range(0, cfid0t0['codeflow_index'].max()+1):
|
|
cf0 = cfid0t0[cfid0t0['codeflow_index'] == cfi0]
|
|
for tfi0 in range(0, cf0['threadflow_index'].max()+1):
|
|
tf0 = cf0[ cf0['threadflow_index'] == tfi0 ]
|
|
loc_first = tf0['location_index'].min()
|
|
loc_last = tf0['location_index'].max()
|
|
source = tf0[tf0['location_index'] == loc_first]
|
|
sink = tf0[tf0['location_index'] == loc_last]
|
|
# Note that we're adding the unique row ids after the full table
|
|
# is done, below.
|
|
res = {
|
|
'scan_id' : e.scan_id,
|
|
'query_id' : cfid0ppt0.rule_id.values[0],
|
|
'query_kind' : "path-problem",
|
|
'query_precision' : _populate_from_rule_table_code_flow("precision", b, cfid0ppt0),
|
|
'query_severity' : _populate_from_rule_table_code_flow("problem.severity", b, cfid0ppt0),
|
|
'query_tags' : _populate_from_rule_table_code_flow_tag_text(b, cfid0ppt0),
|
|
'codeFlow_id' : cfid0,
|
|
#
|
|
'message': cfid0ppt0.message_text.values[0],
|
|
'message_object' : pd.NA,
|
|
'location': cfid0ppt0.location_uri.values[0],
|
|
#
|
|
'source_location' : source.uri.values[0],
|
|
'source_startLine' : source.startLine.values[0],
|
|
'source_startCol' : source.startColumn.values[0],
|
|
'source_endLine' : source.endLine.values[0],
|
|
'source_endCol' : source.endColumn.values[0],
|
|
#
|
|
'sink_location' : sink.uri.values[0],
|
|
'sink_startLine' : sink.startLine.values[0],
|
|
'sink_startCol' : sink.startColumn.values[0],
|
|
'sink_endLine' : sink.endLine.values[0],
|
|
'sink_endCol' : sink.endColumn.values[0],
|
|
#
|
|
'source_object' : pd.NA, # TODO: find high-level info from
|
|
# query name or tags?
|
|
'sink_object' : pd.NA,
|
|
}
|
|
source_sink_coll.append(res)
|
|
results0 = pd.DataFrame(data=source_sink_coll).drop_duplicates().reset_index(drop=True)
|
|
|
|
# Add the snowflake ids
|
|
results0['id'] = [flakegen.next() for _ in range(len(results0))]
|
|
|
|
# The 'scan_id' column is needed for astype
|
|
if len(results0) == 0:
|
|
results0['scan_id'] = []
|
|
|
|
# Force column type(s) to avoid floats in output.
|
|
results1 = results0.astype({ 'id' : 'uint64', 'scan_id': 'uint64'}).reset_index(drop=True)
|
|
|
|
return results1
|