mirror of
https://github.com/hohn/sarif-cli.git
synced 2025-12-16 09:13:04 +01:00
Force all column types to ensure appropriate formatting for writing. In particular, no character data in place of integers, no floats, no objects in place of strings. Table formation for the functions - st.joins_for_results - st.joins_for_scans - st.joins_for_projects enforces types.
293 lines
11 KiB
Python
293 lines
11 KiB
Python
""" Collection of joins for the derived tables
|
|
|
|
"""
|
|
import pandas as pd
|
|
import numpy
|
|
import re
|
|
from . import snowflake_id
|
|
|
|
class ZeroResults(Exception):
|
|
pass
|
|
|
|
#
|
|
# Projects table
|
|
#
|
|
def joins_for_projects(basetables, external_info, scantables):
|
|
"""
|
|
Form the 'projects' table for the ScanTables dataclass
|
|
"""
|
|
b = basetables; e = external_info
|
|
|
|
# For a repository url of the form
|
|
# (git|https)://*/org/project.*
|
|
# use the org/project part as the project_name.
|
|
#
|
|
repo_url = b.project.repositoryUri[0]
|
|
url_parts = re.match(r'(git|https)://[^/]+/([^/]+)/([^/.]+).*', repo_url)
|
|
if url_parts:
|
|
project_name = f"{url_parts.group(2)}/{url_parts.group(3)}"
|
|
else:
|
|
project_name = pd.NA
|
|
|
|
res = pd.DataFrame(data={
|
|
"id" : e.project_id,
|
|
"project_name" : project_name,
|
|
"creation_date" : pd.Timestamp(0.0, unit='s'), # TODO: external info
|
|
"repo_url" : repo_url,
|
|
"primary_language" : b.project['semmle.sourceLanguage'][0], # TODO: external info
|
|
"languages_analyzed" : ",".join(list(b.project['semmle.sourceLanguage']))
|
|
},index=[0])
|
|
|
|
# Force all column types to ensure appropriate formatting
|
|
res1 = res.astype({
|
|
"id" : pd.UInt64Dtype(),
|
|
"project_name" : pd.StringDtype(),
|
|
"creation_date" : numpy.datetime64(),
|
|
"repo_url" : pd.StringDtype(),
|
|
"primary_language" : pd.StringDtype(),
|
|
"languages_analyzed" : pd.StringDtype(),
|
|
}).reset_index(drop=True)
|
|
|
|
return res1
|
|
|
|
#
|
|
# Scans table
|
|
#
|
|
def joins_for_scans(basetables, external_info, scantables):
|
|
"""
|
|
Form the `scans` table for the ScanTables dataclass
|
|
"""
|
|
b = basetables; e = external_info
|
|
driver_name = b.project.driver_name.unique()
|
|
assert len(driver_name) == 1, "More than one driver name found for single sarif file."
|
|
driver_version = b.project.driver_version.unique()
|
|
assert len(driver_version) == 1, \
|
|
"More than one driver version found for single sarif file."
|
|
res = pd.DataFrame(data={
|
|
"id" : e.scan_id,
|
|
"commit_id" : b.project.revisionId[0],
|
|
"project_id" : e.project_id,
|
|
#
|
|
"db_create_start" : pd.NA,
|
|
"db_create_stop" : pd.NA,
|
|
"scan_start_date" : pd.NA,
|
|
"scan_stop_date" : pd.NA,
|
|
#
|
|
"tool_name" : driver_name[0],
|
|
"tool_version" : driver_version[0],
|
|
"tool_query_commit_id" : pd.NA,
|
|
"sarif_file_name" : e.sarif_file_name,
|
|
"results_count" : scantables.results.shape[0],
|
|
"rules_count" : len(b.rules['id'].unique()),
|
|
},index=[0])
|
|
|
|
# Force all column types to ensure correct writing and type checks on reading.
|
|
res1 = res.astype({
|
|
"id" : pd.UInt64Dtype(),
|
|
"commit_id" : pd.StringDtype(),
|
|
"project_id" : pd.UInt64Dtype(),
|
|
"db_create_start" : numpy.datetime64(),
|
|
"db_create_stop" : numpy.datetime64(),
|
|
"scan_start_date" : numpy.datetime64(),
|
|
"scan_stop_date" : numpy.datetime64(),
|
|
"tool_name" : pd.StringDtype(),
|
|
"tool_version" : pd.StringDtype(),
|
|
"tool_query_commit_id" : pd.StringDtype(),
|
|
"sarif_file_name" : pd.StringDtype(),
|
|
"results_count" : pd.Int64Dtype(),
|
|
"rules_count" : pd.Int64Dtype(),
|
|
}).reset_index(drop=True)
|
|
|
|
return res1
|
|
|
|
#
|
|
# Results table
|
|
#
|
|
def joins_for_results(basetables, external_info):
|
|
"""
|
|
Form and return the `results` table
|
|
"""
|
|
# Get one table per result_type, then stack them,
|
|
# kind_problem
|
|
# kind_pathproblem
|
|
#
|
|
# Concatenation with an empty table triggers type conversion to float, so don't
|
|
# include empty tables.
|
|
tables = [_results_from_kind_problem(basetables, external_info),
|
|
_results_from_kind_pathproblem(basetables, external_info)]
|
|
stack = [table for table in tables if len(table) > 0]
|
|
|
|
# Concatenation fails without at least one table, so avoid that.
|
|
if len(stack) > 0:
|
|
res = pd.concat(stack)
|
|
else:
|
|
if stack == []:
|
|
# Sanity check: The case of zero results must be handled at
|
|
# sarif read time and should never reach here.
|
|
raise ZeroResults("Zero problem/path_problem results found in sarif "
|
|
"file but processing anyway. Internal error.")
|
|
res = tables[0]
|
|
|
|
# Force all column types to ensure appropriate formatting
|
|
res1 = res.astype({
|
|
'id' : pd.UInt64Dtype(),
|
|
'scan_id' : pd.UInt64Dtype(),
|
|
'query_id' : pd.StringDtype(),
|
|
|
|
'result_type' : pd.StringDtype(),
|
|
'codeFlow_id' : pd.UInt64Dtype(),
|
|
|
|
'message' : pd.StringDtype(),
|
|
'message_object' : numpy.dtype('O'),
|
|
'location' : pd.StringDtype(),
|
|
|
|
'source_startLine' : pd.Int64Dtype(),
|
|
'source_startCol' : pd.Int64Dtype(),
|
|
'source_endLine' : pd.Int64Dtype(),
|
|
'source_endCol' : pd.Int64Dtype(),
|
|
|
|
'sink_startLine' : pd.Int64Dtype(),
|
|
'sink_startCol' : pd.Int64Dtype(),
|
|
'sink_endLine' : pd.Int64Dtype(),
|
|
'sink_endCol' : pd.Int64Dtype(),
|
|
|
|
# TODO Find high-level info from query name or tags?
|
|
'source_object' : numpy.dtype('O'),
|
|
'sink_object' : numpy.dtype('O'),
|
|
}).reset_index(drop=True)
|
|
|
|
return res1
|
|
|
|
def _results_from_kind_problem(basetables, external_info):
|
|
b = basetables; e = external_info
|
|
flakegen = snowflake_id.Snowflake(2)
|
|
res = pd.DataFrame(
|
|
data={
|
|
'id': [flakegen.next() for _ in range(len(b.kind_problem))],
|
|
|
|
'scan_id' : e.scan_id,
|
|
'query_id' : e.ql_query_id,
|
|
|
|
'result_type' : "kind_problem",
|
|
'codeFlow_id' : 0, # link to codeflows (kind_pathproblem only, NULL here)
|
|
|
|
'message': b.kind_problem.message_text,
|
|
'message_object' : pd.NA,
|
|
'location': b.kind_problem.location_uri,
|
|
|
|
# for kind_problem, use the same location for source and sink
|
|
'source_startLine' : b.kind_problem.location_startLine,
|
|
'source_startCol' : b.kind_problem.location_startColumn,
|
|
'source_endLine' : b.kind_problem.location_endLine,
|
|
'source_endCol' : b.kind_problem.location_endColumn,
|
|
|
|
'sink_startLine' : b.kind_problem.location_startLine,
|
|
'sink_startCol' : b.kind_problem.location_startColumn,
|
|
'sink_endLine' : b.kind_problem.location_endLine,
|
|
'sink_endCol' : b.kind_problem.location_endColumn,
|
|
|
|
'source_object' : pd.NA, # TODO: find high-level info from query name or tags?
|
|
'sink_object' : pd.NA,
|
|
})
|
|
# Force column type(s) to avoid floats in output.
|
|
res1 = res.astype({ 'id' : 'uint64', 'scan_id': 'uint64'}).reset_index(drop=True)
|
|
return res1
|
|
|
|
|
|
def _results_from_kind_pathproblem(basetables, external_info):
|
|
#
|
|
# Only get source and sink, no paths. This implies one codeflow_index and one
|
|
# threadflow_index, no repetitions.
|
|
#
|
|
b = basetables; e = external_info
|
|
flakegen = snowflake_id.Snowflake(3)
|
|
|
|
# The sarif tables have relatedLocation information, which result in multiple
|
|
# results for a single codeFlows_id -- the expression
|
|
# b.kind_pathproblem[b.kind_pathproblem['codeFlows_id'] == cfid0]
|
|
# produces multiple rows.
|
|
#
|
|
# The `result` table has no entry to distinguish these, so we use a simplified
|
|
# version of `kind_pathproblem`.
|
|
|
|
reduced_kind_pathp = b.kind_pathproblem.drop(
|
|
columns=[
|
|
'relatedLocation_array_index',
|
|
'relatedLocation_endColumn',
|
|
'relatedLocation_endLine',
|
|
'relatedLocation_id',
|
|
'relatedLocation_index',
|
|
'relatedLocation_message',
|
|
'relatedLocation_startColumn',
|
|
'relatedLocation_startLine',
|
|
'relatedLocation_uri',
|
|
'relatedLocation_uriBaseId',
|
|
])
|
|
|
|
# Per codeflow_id taken from b.kind_pathproblem table, it should suffice to
|
|
# take one codeflow_index, one threadflow_index, first and last location_index
|
|
# from the b.codeflows table.
|
|
#
|
|
# To ensure nothing is missed, collect all the entries and then check for
|
|
# unique rows.
|
|
cfids = reduced_kind_pathp['codeFlows_id'].unique()
|
|
|
|
source_sink_coll = []
|
|
for cfid0 in cfids:
|
|
cfid0t0 = b.codeflows[b.codeflows['codeflow_id'] == cfid0]
|
|
cfid0ppt0 = reduced_kind_pathp[reduced_kind_pathp['codeFlows_id'] ==
|
|
cfid0].drop_duplicates()
|
|
assert cfid0ppt0.shape[0] == 1, \
|
|
"Reduced kind_pathproblem table still has multiple entries"
|
|
for cfi0 in range(0, cfid0t0['codeflow_index'].max()+1):
|
|
cf0 = cfid0t0[cfid0t0['codeflow_index'] == cfi0]
|
|
for tfi0 in range(0, cf0['threadflow_index'].max()+1):
|
|
tf0 = cf0[ cf0['threadflow_index'] == tfi0 ]
|
|
loc_first = tf0['location_index'].min()
|
|
loc_last = tf0['location_index'].max()
|
|
source = tf0[tf0['location_index'] == loc_first]
|
|
sink = tf0[tf0['location_index'] == loc_last]
|
|
# Note that we're adding the unique row ids after the full table
|
|
# is done, below.
|
|
res = {
|
|
'scan_id' : e.scan_id,
|
|
'query_id' : e.ql_query_id,
|
|
#
|
|
'result_type' : "kind_pathproblem",
|
|
'codeFlow_id' : cfid0,
|
|
#
|
|
'message': cfid0ppt0.message_text.values[0],
|
|
'message_object' : pd.NA,
|
|
'location': cfid0ppt0.location_uri.values[0],
|
|
#
|
|
'source_location' : source.uri.values[0],
|
|
'source_startLine' : source.startLine.values[0],
|
|
'source_startCol' : source.startColumn.values[0],
|
|
'source_endLine' : source.endLine.values[0],
|
|
'source_endCol' : source.endColumn.values[0],
|
|
#
|
|
'sink_location' : sink.uri.values[0],
|
|
'sink_startLine' : sink.startLine.values[0],
|
|
'sink_startCol' : sink.startColumn.values[0],
|
|
'sink_endLine' : sink.endLine.values[0],
|
|
'sink_endCol' : sink.endColumn.values[0],
|
|
#
|
|
'source_object' : pd.NA, # TODO: find high-level info from
|
|
# query name or tags?
|
|
'sink_object' : pd.NA,
|
|
}
|
|
source_sink_coll.append(res)
|
|
results0 = pd.DataFrame(data=source_sink_coll).drop_duplicates().reset_index(drop=True)
|
|
|
|
# Add the snowflake ids
|
|
results0['id'] = [flakegen.next() for _ in range(len(results0))]
|
|
|
|
# The 'scan_id' column is needed for astype
|
|
if len(results0) == 0:
|
|
results0['scan_id'] = []
|
|
|
|
# Force column type(s) to avoid floats in output.
|
|
results1 = results0.astype({ 'id' : 'uint64', 'scan_id': 'uint64'}).reset_index(drop=True)
|
|
|
|
return results1
|