Add type information

This commit is contained in:
Michael Hohn
2024-11-19 15:24:41 -08:00
committed by =Michael Hohn
parent 18333bfdb1
commit dd776e312a
5 changed files with 59 additions and 31 deletions

View File

@@ -6,6 +6,11 @@
"""
import argparse
import logging
from argparse import Namespace
from typing import List
from pandas import DataFrame
import qldbtools.utils as utils
import numpy as np
@@ -41,18 +46,18 @@ parser.add_argument('-l', '--list-name', type=str,
help='Name of the repository list',
default='mirva-list')
args = parser.parse_args()
args: Namespace = parser.parse_args()
#
#* Load the information
#
import pandas as pd
import sys
df0 = pd.read_csv(sys.stdin)
df0: DataFrame = pd.read_csv(sys.stdin)
if args.num_entries == None:
# Use all entries
df1 = df0
df1: DataFrame = df0
else:
# Use num_entries, chosen via pseudo-random numbers
df1 = df0.sample(n=args.num_entries,
@@ -61,12 +66,12 @@ else:
#
#* Form and save structures
#
repos = []
repos: list[str] = []
for index, row in df1[['owner', 'name', 'CID', 'path']].iterrows():
owner, name, CID, path = row
repos.append(utils.form_db_req_name(owner, name, CID))
repo_list_name = args.list_name
repo_list_name: str = args.list_name
vsc = {
"version": 1,
"databases": {

View File

@@ -2,11 +2,19 @@
""" Collect information about CodeQL databases from the file system and write out
a table in CSV format.
"""
from argparse import ArgumentParser
from typing import List
from pandas import DataFrame
import qldbtools.utils as utils
import argparse
import logging
import sys
import pandas as pd
from qldbtools.utils import DBInfo
#
#* Configure logger
#
@@ -15,7 +23,7 @@ logging.basicConfig(format='%(asctime)s %(message)s')
#
#* Process command line
#
parser = argparse.ArgumentParser(
parser: ArgumentParser = argparse.ArgumentParser(
description="""Find all CodeQL DBs in and below starting_dir and export a CSV
file with relevant data.""")
parser.add_argument('starting_dir', type=str,
@@ -26,9 +34,9 @@ args = parser.parse_args()
#* Collect info
#
# Get the db information in list of DBInfo form
db_base = args.starting_dir
dbs = list(utils.collect_dbs(db_base))
dbdf = pd.DataFrame([d.__dict__ for d in dbs])
db_base: str = args.starting_dir
dbs: list[DBInfo] = list(utils.collect_dbs(db_base))
dbdf: DataFrame = pd.DataFrame([d.__dict__ for d in dbs])
#
#
#* Write info out

View File

@@ -3,6 +3,11 @@
mc-db-initial-info, and collect more detailed information from the database
files. Write out an extended table in CSV format.
"""
from argparse import ArgumentParser
from typing import List
from pandas import DataFrame
import qldbtools.utils as utils
import argparse
import logging
@@ -17,7 +22,7 @@ logging.basicConfig(format='%(asctime)s %(message)s')
#
#* Process command line
#
parser = argparse.ArgumentParser(
parser: ArgumentParser = argparse.ArgumentParser(
description="""Read an initial table of CodeQL DB information, produced by
mc-db-initial-info, and collect more detailed information from the database
files. Write out an extended table in CSV format. """)
@@ -26,22 +31,24 @@ args = parser.parse_args()
#
#* Collect the information
# This step is time-intensive so we save the results right after.
d = pd.read_csv(sys.stdin)
joiners = []
d: DataFrame = pd.read_csv(sys.stdin)
joiners: list[DataFrame] = []
for left_index in range(0, len(d)-1):
try:
metac: object
cqlc: object
cqlc, metac = utils.extract_metadata(d.path[left_index])
except utils.ExtractNotZipfile:
continue
except utils.ExtractNoCQLDB:
continue
try:
detail_df = utils.metadata_details(left_index, cqlc, metac)
detail_df: DataFrame = utils.metadata_details(left_index, cqlc, metac)
except utils.DetailsMissing:
continue
joiners.append(detail_df)
joiners_df = pd.concat(joiners, axis=0)
full_df = pd.merge(d, joiners_df, left_index=True, right_on='left_index', how='outer')
joiners_df: DataFrame = pd.concat(joiners, axis=0)
full_df: DataFrame = pd.merge(d, joiners_df, left_index=True, right_on='left_index', how='outer')
#
#* Save results

View File

@@ -18,6 +18,10 @@
"""
import argparse
import logging
from argparse import Namespace
from typing import Any
from pandas import DataFrame, Series
#
#* Configure logger
@@ -39,7 +43,7 @@ parser = argparse.ArgumentParser(
parser.add_argument('language', type=str,
help='The language to be analyzed.')
args = parser.parse_args()
args: Namespace = parser.parse_args()
#
#* Collect the information and select subset
#
@@ -47,7 +51,7 @@ import pandas as pd
import sys
import qldbtools.utils as utils
df2 = pd.read_csv(sys.stdin)
df2: DataFrame = pd.read_csv(sys.stdin)
#
#* Add single uniqueness field -- CID (Cumulative ID)
@@ -88,7 +92,7 @@ df2['CID'] = df2.apply(lambda row:
# | primaryLanguage |
# | finalised |
df3 = df2.reindex( columns=['owner', 'name', 'cliVersion', 'creationTime',
df3: DataFrame = df2.reindex( columns=['owner', 'name', 'cliVersion', 'creationTime',
'language', 'sha','CID',
'baselineLinesOfCode', 'path', 'db_lang',
'db_lang_displayName', 'db_lang_file_count',
@@ -101,14 +105,14 @@ rows = ( df3['cliVersion'].isna() |
df3['creationTime'].isna() |
df3['language'].isna() |
df3['sha'].isna() )
df4 = df3[~rows]
df4: DataFrame = df3[~rows]
# XX: Limit to one language
# Limit to one language
df5 = df4[df4['language'] == args.language]
# Sort and group
df_sorted = df5.sort_values(by=['owner', 'name', 'CID', 'creationTime'])
df_unique = df_sorted.groupby(['owner', 'name', 'CID']).first().reset_index()
df_sorted: DataFrame = df5.sort_values(by=['owner', 'name', 'CID', 'creationTime'])
df_unique: DataFrame = df_sorted.groupby(['owner', 'name', 'CID']).first().reset_index()
# Write output
df_unique.to_csv(sys.stdout, index=False)

View File

@@ -9,11 +9,15 @@ import datetime
import json
import logging
import os
from typing import List, Dict, Any
import pandas as pd
import time
import yaml
import zipfile
from pandas import DataFrame
#* Setup
logging.basicConfig(
level=logging.DEBUG,
@@ -30,7 +34,7 @@ def log_and_raise_e(message, exception):
logging.error(message)
raise exception(message)
def traverse_tree(root):
def traverse_tree(root: str) -> Path:
root_path = Path(os.path.expanduser(root))
if not root_path.exists() or not root_path.is_dir():
log_and_raise(f"The specified root path '{root}' does not exist or "
@@ -51,7 +55,7 @@ class DBInfo:
size : int = 63083064
def collect_dbs(db_base):
def collect_dbs(db_base: str) -> DBInfo:
for path in traverse_tree(db_base):
if path.name == "db.zip":
# For the current repository, we have
@@ -69,7 +73,7 @@ def collect_dbs(db_base):
yield db
def extract_metadata(zipfile_path):
def extract_metadata(zipfile_path: str) -> tuple[object,object]:
"""
extract_metadata(zipfile)
@@ -111,7 +115,7 @@ def extract_metadata(zipfile_path):
class ExtractNotZipfile(Exception): pass
class ExtractNoCQLDB(Exception): pass
def metadata_details(left_index, codeql_content, meta_content):
def metadata_details(left_index: int, codeql_content: object, meta_content: object) -> pd.DataFrame:
"""
metadata_details(codeql_content, meta_content)
@@ -143,11 +147,11 @@ def metadata_details(left_index, codeql_content, meta_content):
'finalised': cqlc.get('finalised', pd.NA),
}
f = pd.DataFrame(d, index=[0])
joiners = []
joiners: list[dict[str, int | Any]] = []
if not ('languages' in metac):
log_and_raise_e("Missing 'languages' in metadata", DetailsMissing)
for lang, lang_cont in metac['languages'].items():
d1 = { 'left_index' : left_index,
d1: dict[str, int | Any] = { 'left_index' : left_index,
'db_lang': lang }
for prop, val in lang_cont.items():
if prop == 'files':
@@ -157,8 +161,8 @@ def metadata_details(left_index, codeql_content, meta_content):
elif prop == 'displayName':
d1['db_lang_displayName'] = val
joiners.append(d1)
fj = pd.DataFrame(joiners)
full_df = pd.merge(f, fj, on='left_index', how='outer')
fj: DataFrame = pd.DataFrame(joiners)
full_df: DataFrame = pd.merge(f, fj, on='left_index', how='outer')
return full_df
class DetailsMissing(Exception): pass
@@ -185,7 +189,7 @@ def form_db_bucket_name(owner, name, CID):
"""
return f'{owner}${name}ctsj{CID}.zip'
def form_db_req_name(owner, name, CID):
def form_db_req_name(owner: str, name: str, CID: str) -> str:
"""
form_db_req_name(owner, name, CID)
Return the name to use in mrva requests; this function is trivial and used to