Add CSV file with framework and CWE info to be used in RST file

This commit is contained in:
Tamas Vajk
2021-05-05 14:58:57 +02:00
parent 979034a17f
commit beea36191b
3 changed files with 159 additions and 9 deletions

View File

@@ -0,0 +1,3 @@
CWE,Sink identifier,Label
CWE-89,sql,SQL injection
CWE-22,create-file,Path injection
1 CWE Sink identifier Label
2 CWE-89 sql SQL injection
3 CWE-22 create-file Path injection

View File

@@ -0,0 +1,5 @@
Framework name,URL,Package prefix
Hibernate,https://hibernate.org/,org.hibernate
Java Standard Library,,java.*
Google,,com.google.common.*
Apache,,org.apache.*
1 Framework name URL Package prefix
2 Hibernate https://hibernate.org/ org.hibernate
3 Java Standard Library java.*
4 Google com.google.common.*
5 Apache org.apache.*

View File

@@ -32,6 +32,62 @@ def run_codeql_query(query, database, output):
"--format=csv", "--no-titles", "--output", output])
def append_csv_number(list, value):
"""Adds a number to the list or None if the value is not greater than 0."""
if value > 0:
list.append(value)
else:
list.append(None)
def append_csv_dict_item(list, dictionary, key):
"""Adds a dictionary item to the list if the key is in the dictionary."""
if key in dictionary:
list.append(dictionary[key])
else:
list.append(None)
def collect_package_stats(packages, filter):
"""Collects coverage statistics for packages matching the given filter."""
sources = 0
steps = 0
sinks = 0
framework_cwes = {}
processed_packages = set()
for package in packages:
if filter(package):
processed_packages.add(package)
sources += int(packages[package]["kind"].get("source:remote", 0))
steps += int(packages[package]["part"].get("summary", 0))
sinks += int(packages[package]["part"].get("sink", 0))
for cwe in cwes:
sink = "sink:" + cwes[cwe]["sink"]
if sink in packages[package]["kind"]:
if cwe not in framework_cwes:
framework_cwes[cwe] = 0
framework_cwes[cwe] += int(
packages[package]["kind"][sink])
return sources, steps, sinks, framework_cwes, processed_packages
def add_package_stats_to_row(row, sorted_cwes, collect):
""" Adds collected statistic to the row. """
sources, steps, sinks, framework_cwes, processed_packages = collect()
append_csv_number(row, sources)
append_csv_number(row, steps)
append_csv_number(row, sinks)
for cwe in sorted_cwes:
append_csv_dict_item(row, framework_cwes, cwe)
return row, processed_packages
class LanguageConfig:
def __init__(self, lang, ext, ql_path):
self.lang = lang
@@ -61,13 +117,14 @@ for config in configs:
query_path = config.ql_path
db = "empty-" + lang
ql_output = "output-" + lang + ".csv"
create_empty_database(lang, ext, db)
# create_empty_database(lang, ext, db)
run_codeql_query(query_path, db, ql_output)
packages = {}
parts = set()
kinds = set()
# Read the generated CSV file, and collect package statistics.
with open(ql_output) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
@@ -89,6 +146,7 @@ for config in configs:
packages[package]["kind"][kind] = 0
packages[package]["kind"][kind] += int(row[4])
# Write the denormalized package statistics to a CSV file.
with open("csv-flow-model-coverage-" + lang + ".csv", 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
@@ -104,13 +162,97 @@ for config in configs:
for package in sorted(packages):
row = [package]
for part in parts:
if part in packages[package]["part"]:
row.append(packages[package]["part"][part])
else:
row.append(None)
append_csv_dict_item(row, packages[package]["part"], part)
for kind in kinds:
if kind in packages[package]["kind"]:
row.append(packages[package]["kind"][kind])
else:
row.append(None)
append_csv_dict_item(row, packages[package]["kind"], kind)
csvwriter.writerow(row)
# Read the additional framework data, such as URL, friendly name
frameworks = {}
with open(prefix + "misc/scripts/frameworks-" + lang + ".csv") as csvfile:
reader = csv.reader(csvfile)
next(reader)
for row in reader:
framwork = row[0]
if framwork not in frameworks:
frameworks[framwork] = {
"package": row[2],
"url": row[1]
}
# Read the additional CWE data
cwes = {}
with open(prefix + "misc/scripts/cwe-sink-" + lang + ".csv") as csvfile:
reader = csv.reader(csvfile)
next(reader)
for row in reader:
cwe = row[0]
if cwe not in cwes:
cwes[cwe] = {
"sink": row[1],
"label": row[2]
}
with open("rst-csv-flow-model-coverage-" + lang + ".csv", 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
columns = ["Framework / library", "package",
"remote flow sources", "taint & value steps", "sinks (total)"]
for cwe in sorted(cwes):
columns.append("`" + cwe + "` :sub:`" + cwes[cwe]["label"] + "`")
csvwriter.writerow(columns)
processed_packages = set()
for framework in sorted(frameworks):
row = []
# Add the framework name to the row
if not frameworks[framework]["url"]:
row.append(framework)
else:
row.append(
"`" + framework + " <" + frameworks[framework]["url"] + ">`_")
# Add the package name to the row
row.append(frameworks[framework]["package"])
prefix = frameworks[framework]["package"]
# Collect statistics on the current framework
def collect_framework(): return collect_package_stats(
packages,
lambda p: (prefix.endswith("*") and p.startswith(prefix[:-1])) or (not prefix.endswith("*") and prefix == p))
row, f_processed_packages = add_package_stats_to_row(
row, sorted(cwes), collect_framework)
csvwriter.writerow(row)
processed_packages.update(f_processed_packages)
# Collect statistics on all packages that are not part of a framework
row = ["Others", None]
def collect_others(): return collect_package_stats(
packages,
lambda p: p not in processed_packages)
row, _ = add_package_stats_to_row(
row, sorted(cwes), collect_others)
csvwriter.writerow(row)
# Collect statistics on all packages
row = ["Total", None]
def collect_total(): return collect_package_stats(
packages,
lambda p: True)
row, _ = add_package_stats_to_row(
row, sorted(cwes), collect_total)
csvwriter.writerow(row)
# todo: generate rst page referencing the csv files