#!/usr/bin/env python3 import json import hashlib import yaml import sys from plumbum import cli, local from plumbum.cmd import find, mkdir, ln, rm, mktemp, unzip, date, env # Logging function def log(level, message): colors = { "INFO": "\033[1;34m", "WARN": "\033[1;33m", "ERROR": "\033[1;31m", "RESET": "\033[0m", } timestamp = date("+%Y-%m-%d %H:%M:%S").strip() print(f"{colors[level]}[{timestamp}] [{level}] {message}{colors['RESET']}", file=sys.stderr) # Generate a CID (cumulative id) def generate_cid(cli_version, creation_time, primary_language, sha): hash_input = f"{cli_version} {creation_time} {primary_language} {sha}".encode() return hashlib.sha256(hash_input).hexdigest()[:6] # Expand environment variables in paths def expand_path(path): return local.env.expand(path) # Process a single db.zip file def process_db_file(zip_path, db_collection_dir): temp_dir = mktemp("-d").strip() try: unzip("-o", "-q", zip_path, "*codeql-database.yml", "-d", temp_dir) # Locate the YAML file regardless of its depth yaml_files = list(local.path(temp_dir).walk( filter=lambda p: p.name == "codeql-database.yml")) if not yaml_files: log("WARN", f"No codeql-database.yml found in {zip_path}") return yaml_path = yaml_files[0] with yaml_path.open("r") as f: yaml_data = yaml.safe_load(f) primary_language = yaml_data["primaryLanguage"] creation_metadata = yaml_data["creationMetadata"] sha = creation_metadata["sha"] cli_version = creation_metadata["cliVersion"] creation_time = creation_metadata["creationTime"] source_location_prefix = local.path(yaml_data["sourceLocationPrefix"]) repo = source_location_prefix.name owner = source_location_prefix.parent.name cid = generate_cid(cli_version, creation_time, primary_language, sha) new_db_fname = f"{owner}-{repo}-ctsj-{cid}.zip" result_url = f"http://hepc/{db_collection_dir}/{new_db_fname}" metadata = { "git_branch" : "HEAD", "git_commit_id" : sha, "git_repo" : repo, "ingestion_datetime_utc" : str(creation_time), "result_url" : result_url, "tool_id" : "9f2f9642-febb-4435-9204-fb50bbd43de4", "tool_name" : f"codeql-{primary_language}", "tool_version" : cli_version, "projname" : f"{owner}/{repo}", } metadata_file = local.path(db_collection_dir) / "metadata.json" with metadata_file.open("a") as f: json.dump(metadata, f) f.write("\n") link_path = local.path(db_collection_dir) / new_db_fname if not link_path.exists(): ln("-sf", zip_path, link_path) except Exception as e: log("WARN", f"Error processing {zip_path}: {e}") finally: rm("-rf", temp_dir) # Main application class class DBProcessor(cli.Application): """ DBProcessor processes db.zip files found in a starting directory, symlinks updated names in a collection directory, and adds a metadata information file "metadata.json" to the directory. """ db_collection_dir = cli.SwitchAttr( "--db_collection_dir", str, mandatory=True, help="Specify the database collection directory" ) starting_path = cli.SwitchAttr( "--starting_path", str, mandatory=True, help="Specify the starting path" ) def main(self): db_collection_dir = expand_path(self.db_collection_dir) starting_path = expand_path(self.starting_path) mkdir("-p", db_collection_dir) log("INFO", f"Searching for db.zip files in {starting_path}") db_files = find(starting_path, "-type", "f", "-name", "db.zip", "-size", "+0c").splitlines() if not db_files: log("WARN", "No db.zip files found in the specified starting path.") return for zip_path in db_files: process_db_file(zip_path, db_collection_dir) log("INFO", "Processing completed.") if __name__ == "__main__": DBProcessor.run()