Code quality improvements on coverage report generator script

This commit is contained in:
Tamas Vajk
2021-05-06 11:03:05 +02:00
parent d0a46eb7b7
commit 2adb3e992a

View File

@@ -6,7 +6,9 @@ import os
"""
This script runs the CSV coverage report QL query, and transforms it to a more readable format.
"""
There are two main outputs: (i) a CSV file containing the coverage data, and (ii) an RST page containing the coverage
data.
"""
def subprocess_run(cmd):
@@ -48,8 +50,20 @@ def append_csv_dict_item(list, dictionary, key):
list.append(None)
def collect_package_stats(packages, filter):
"""Collects coverage statistics for packages matching the given filter."""
def increment_dict_item(value, dictionary, key):
"""Increments the value of the dictionary[key] by value."""
if key not in dictionary:
dictionary[key] = 0
dictionary[key] += int(value)
def collect_package_stats(packages, cwes, filter):
"""
Collects coverage statistics for packages matching the given filter. `filter` is a `lambda` that for example (i) matches
packages to frameworks, or (2) matches packages that were previously not processed.
The returned statistics are used to generate a single row in a CSV file.
"""
sources = 0
steps = 0
sinks = 0
@@ -75,7 +89,11 @@ def collect_package_stats(packages, filter):
def add_package_stats_to_row(row, sorted_cwes, collect):
""" Adds collected statistic to the row. """
"""
Adds collected statistic to the row. `collect` is a `lambda` that returns the statistics for example for (i) individual
frameworks, (ii) leftout frameworks summarized in the 'Others' row, or (iii) all frameworks summarized in the 'Totals'
row.
"""
sources, steps, sinks, framework_cwes, processed_packages = collect()
append_csv_number(row, sources)
@@ -112,11 +130,19 @@ configs = [
"java", "Java", ".java", prefix + "java/ql/src/meta/frameworks/Coverage.ql")
]
with open("flow-model-coverage.rst", 'w') as rst_file:
# The names of input and output files. The placeholder {language} is replaced with the language name.
output_rst = "flow-model-coverage.rst"
output_rst_csv = "rst-csv-flow-model-coverage-{language}.csv"
output_ql_csv = "output-{language}.csv"
output_csv = "csv-flow-model-coverage-{language}.csv"
input_framework_csv = prefix + "misc/scripts/frameworks-{language}.csv"
input_cwe_sink_csv = prefix + "misc/scripts/cwe-sink-{language}.csv"
with open(output_rst, 'w') as rst_file:
for config in configs:
lang = config.lang
db = "empty-" + lang
ql_output = "output-" + lang + ".csv"
ql_output = output_ql_csv.format(language=lang)
create_empty_database(lang, config.ext, db)
run_codeql_query(config.ql_path, db, ql_output)
@@ -128,36 +154,37 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
with open(ql_output) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
# row: "android.util",1,"remote","source",16
package = row[0]
if package not in packages:
packages[package] = {
"count": row[1],
# part: "summary", "sink", or "source"
"part": {},
# kind: "source:remote", "sink:create-file", ...
"kind": {}
}
part = row[3]
parts.add(part)
if part not in packages[package]["part"]:
packages[package]["part"][part] = 0
packages[package]["part"][part] += int(row[4])
increment_dict_item(row[4], packages[package]["part"], part)
kind = part + ":" + row[2]
kinds.add(kind)
if kind not in packages[package]["kind"]:
packages[package]["kind"][kind] = 0
packages[package]["kind"][kind] += int(row[4])
increment_dict_item(row[4], packages[package]["kind"], kind)
parts = sorted(parts)
kinds = sorted(kinds)
# Write the denormalized package statistics to a CSV file.
with open("csv-flow-model-coverage-" + lang + ".csv", 'w', newline='') as csvfile:
with open(output_csv.format(language=lang), 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
parts = sorted(parts)
kinds = sorted(kinds)
headers = ["package"]
headers.extend(parts)
headers.extend(kinds)
columns = ["package"]
columns.extend(parts)
columns.extend(kinds)
csvwriter.writerow(columns)
csvwriter.writerow(headers)
for package in sorted(packages):
row = [package]
@@ -170,10 +197,11 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
# Read the additional framework data, such as URL, friendly name
frameworks = {}
with open(prefix + "misc/scripts/frameworks-" + lang + ".csv") as csvfile:
with open(input_framework_csv.format(language=lang)) as csvfile:
reader = csv.reader(csvfile)
next(reader)
for row in reader:
# row: Hibernate,https://hibernate.org/,org.hibernate
framwork = row[0]
if framwork not in frameworks:
frameworks[framwork] = {
@@ -184,10 +212,11 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
# Read the additional CWE data
cwes = {}
with open(prefix + "misc/scripts/cwe-sink-" + lang + ".csv") as csvfile:
with open(input_cwe_sink_csv.format(language=lang)) as csvfile:
reader = csv.reader(csvfile)
next(reader)
for row in reader:
# row: CWE-89,sql,SQL injection
cwe = row[0]
if cwe not in cwes:
cwes[cwe] = {
@@ -195,7 +224,9 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
"label": row[2]
}
file_name = "rst-csv-flow-model-coverage-" + lang + ".csv"
sorted_cwes = sorted(cwes)
file_name = output_rst_csv.format(language=lang)
rst_file.write(
config.capitalized_lang + " framework & library support\n")
@@ -210,17 +241,23 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
with open(file_name, 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
columns = ["Framework / library", "package",
"remote flow sources", "taint & value steps", "sinks (total)"]
for cwe in sorted(cwes):
columns.append("`" + cwe + "` :sub:`" +
cwes[cwe]["label"] + "`")
csvwriter.writerow(columns)
# Write CSV header.
headers = ["Framework / library",
"Package",
"Remote flow sources",
"Taint & value steps",
"Sinks (total)"]
for cwe in sorted_cwes:
headers.append(
"`{0}` :sub:`{1}`".format(cwe, cwes[cwe]["label"]))
csvwriter.writerow(headers)
processed_packages = set()
# Write a row for each framework.
for framework in sorted(frameworks):
row = []
# Add the framework name to the row
if not frameworks[framework]["url"]:
row.append(framework)
@@ -234,12 +271,12 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
prefix = frameworks[framework]["package"]
# Collect statistics on the current framework
# package name is either full name, such as "org.hibernate", or a prefix, such as "java.*"
def collect_framework(): return collect_package_stats(
packages,
lambda p: (prefix.endswith("*") and p.startswith(prefix[:-1])) or (not prefix.endswith("*") and prefix == p))
packages, cwes, lambda p: (prefix.endswith("*") and p.startswith(prefix[:-1])) or (not prefix.endswith("*") and prefix == p))
row, f_processed_packages = add_package_stats_to_row(
row, sorted(cwes), collect_framework)
row, sorted_cwes, collect_framework)
csvwriter.writerow(row)
processed_packages.update(f_processed_packages)
@@ -248,11 +285,10 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
row = ["Others", None]
def collect_others(): return collect_package_stats(
packages,
lambda p: p not in processed_packages)
packages, cwes, lambda p: p not in processed_packages)
row, other_packages = add_package_stats_to_row(
row, sorted(cwes), collect_others)
row, sorted_cwes, collect_others)
row[1] = ", ".join("``{0}``".format(p)
for p in sorted(other_packages))
@@ -262,11 +298,9 @@ with open("flow-model-coverage.rst", 'w') as rst_file:
# Collect statistics on all packages
row = ["Totals", None]
def collect_total(): return collect_package_stats(
packages,
lambda p: True)
def collect_total(): return collect_package_stats(packages, cwes, lambda p: True)
row, _ = add_package_stats_to_row(
row, sorted(cwes), collect_total)
row, sorted_cwes, collect_total)
csvwriter.writerow(row)