- Introduce content_hash as authoritative identifier (hash of full db.zip) - Demote previous CID to build_cid (build context only) - Store full content_hash in metadata; use short hash only for filenames - Keep build metadata (cliVersion, creationTime, language, source SHA) as fields - Update schema to make content_hash the primary key - Filenames now include both build_cid and content hash for human inspection This separates byte identity from provenance, enabling deduplication, reproducibility checks, and scale-safe indexing.
286 lines
8.7 KiB
Python
Executable File
286 lines
8.7 KiB
Python
Executable File
#!/usr/bin/env -S uv run python
|
|
# -*- python -*-
|
|
import json
|
|
import hashlib
|
|
import yaml
|
|
import sys
|
|
import os
|
|
import sqlite3
|
|
import queue
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from plumbum import cli, local
|
|
from plumbum.cmd import find, mkdir, ln, rm, mktemp, unzip, date, env
|
|
|
|
# ------------------------------------------------------------
|
|
# Logging
|
|
# ------------------------------------------------------------
|
|
|
|
def log(level, message):
|
|
colors = {
|
|
"INFO": "\033[1;34m",
|
|
"WARN": "\033[1;33m",
|
|
"ERROR": "\033[1;31m",
|
|
"RESET": "\033[0m",
|
|
}
|
|
timestamp = date("+%Y-%m-%d %H:%M:%S").strip()
|
|
print(f"{colors[level]}[{timestamp}] [{level}] {message}{colors['RESET']}",
|
|
file=sys.stderr)
|
|
|
|
# ------------------------------------------------------------
|
|
# Identifiers
|
|
# ------------------------------------------------------------
|
|
|
|
def generate_build_cid(cli_version, creation_time, primary_language, source_sha):
|
|
"""
|
|
Context / build identifier.
|
|
NOT content addressing.
|
|
"""
|
|
s = f"{cli_version} {creation_time} {primary_language} {source_sha}"
|
|
return hashlib.sha256(s.encode()).hexdigest()[:10]
|
|
|
|
|
|
def hash_file(path, algo="sha256", chunk_size=1024 * 1024):
|
|
"""
|
|
True content hash of the DB artifact.
|
|
"""
|
|
h = hashlib.new(algo)
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(chunk_size), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
# ------------------------------------------------------------
|
|
# Paths
|
|
# ------------------------------------------------------------
|
|
|
|
def expand_path(path):
|
|
return local.env.expand(path)
|
|
|
|
# ------------------------------------------------------------
|
|
# SQLite metadata DB
|
|
# ------------------------------------------------------------
|
|
|
|
def init_metadata_db(db_path):
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
content_hash TEXT PRIMARY KEY,
|
|
build_cid TEXT,
|
|
git_branch TEXT,
|
|
git_commit_id TEXT,
|
|
git_owner TEXT,
|
|
git_repo TEXT,
|
|
ingestion_datetime_utc TEXT,
|
|
primary_language TEXT,
|
|
result_url TEXT,
|
|
tool_name TEXT,
|
|
tool_version TEXT,
|
|
projname TEXT,
|
|
db_file_size INTEGER
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# ------------------------------------------------------------
|
|
# DB processing
|
|
# ------------------------------------------------------------
|
|
|
|
def process_db_file(zip_path, db_collection_dir, result_queue):
|
|
temp_dir = mktemp("-d").strip()
|
|
try:
|
|
unzip("-o", "-q", zip_path, "*codeql-database.yml", "-d", temp_dir)
|
|
|
|
yaml_files = list(local.path(temp_dir).walk(
|
|
filter=lambda p: p.name == "codeql-database.yml"))
|
|
if not yaml_files:
|
|
log("WARN", f"No codeql-database.yml found in {zip_path}")
|
|
return
|
|
|
|
yaml_path = yaml_files[0]
|
|
with yaml_path.open("r") as f:
|
|
yaml_data = yaml.safe_load(f)
|
|
|
|
primary_language = yaml_data["primaryLanguage"]
|
|
creation_metadata = yaml_data["creationMetadata"]
|
|
source_sha = creation_metadata["sha"]
|
|
cli_version = creation_metadata["cliVersion"]
|
|
creation_time = creation_metadata["creationTime"]
|
|
|
|
source_location_prefix = local.path(
|
|
yaml_data["sourceLocationPrefix"])
|
|
repo = source_location_prefix.name
|
|
owner = source_location_prefix.parent.name
|
|
|
|
# ---- identities ----
|
|
|
|
build_cid = generate_build_cid(
|
|
cli_version, creation_time, primary_language, source_sha)
|
|
|
|
content_hash = hash_file(zip_path)
|
|
|
|
# short form only for filenames / URLs
|
|
content_hash_short = content_hash[:16]
|
|
|
|
new_db_fname = (
|
|
f"{owner}-{repo}-ctsj-{build_cid}-{content_hash_short}.zip"
|
|
)
|
|
|
|
hepc_endpoint = os.getenv("MRVA_HEPC_ENDPOINT")
|
|
if hepc_endpoint is None:
|
|
log("WARN",
|
|
"Environment variable 'MRVA_HEPC_ENDPOINT' is not set. "
|
|
"Using default 'http://hepc'.")
|
|
hepc_endpoint = "http://hepc"
|
|
|
|
result_url = f"{hepc_endpoint}/db/{db_collection_dir}/{new_db_fname}"
|
|
|
|
file_size = local.path(zip_path).stat().st_size
|
|
|
|
metadata = {
|
|
"content_hash" : content_hash,
|
|
"build_cid" : build_cid,
|
|
"git_branch" : "HEAD",
|
|
"git_commit_id" : source_sha,
|
|
"git_owner" : owner,
|
|
"git_repo" : repo,
|
|
"ingestion_datetime_utc" : str(creation_time),
|
|
"primary_language" : primary_language,
|
|
"result_url" : result_url,
|
|
"tool_name" : f"codeql-{primary_language}",
|
|
"tool_version" : cli_version,
|
|
"projname" : f"{owner}/{repo}",
|
|
"db_file_size" : file_size,
|
|
}
|
|
|
|
result_queue.put(metadata)
|
|
|
|
copy_path = local.path(db_collection_dir) / new_db_fname
|
|
if not copy_path.exists():
|
|
ln("-s", zip_path, copy_path)
|
|
|
|
except Exception as e:
|
|
log("WARN", f"Error processing {zip_path}: {e}")
|
|
finally:
|
|
rm("-rf", temp_dir)
|
|
|
|
# ------------------------------------------------------------
|
|
# Batch DB write
|
|
# ------------------------------------------------------------
|
|
|
|
def write_metadata_to_db(result_queue, db_collection_dir):
|
|
metadata_db = local.path(db_collection_dir) / "metadata.sql"
|
|
|
|
init_metadata_db(str(metadata_db))
|
|
|
|
all_metadata = []
|
|
while not result_queue.empty():
|
|
try:
|
|
all_metadata.append(result_queue.get_nowait())
|
|
except queue.Empty:
|
|
break
|
|
|
|
if not all_metadata:
|
|
log("INFO", "No metadata to write to database")
|
|
return
|
|
|
|
conn = sqlite3.connect(str(metadata_db))
|
|
cursor = conn.cursor()
|
|
|
|
cursor.executemany('''
|
|
INSERT OR IGNORE INTO metadata (
|
|
content_hash, build_cid,
|
|
git_branch, git_commit_id, git_owner, git_repo,
|
|
ingestion_datetime_utc, primary_language,
|
|
result_url, tool_name, tool_version,
|
|
projname, db_file_size
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', [
|
|
(
|
|
m["content_hash"],
|
|
m["build_cid"],
|
|
m["git_branch"],
|
|
m["git_commit_id"],
|
|
m["git_owner"],
|
|
m["git_repo"],
|
|
m["ingestion_datetime_utc"],
|
|
m["primary_language"],
|
|
m["result_url"],
|
|
m["tool_name"],
|
|
m["tool_version"],
|
|
m["projname"],
|
|
m["db_file_size"]
|
|
)
|
|
for m in all_metadata
|
|
])
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
log("INFO",
|
|
f"Successfully wrote {len(all_metadata)} metadata records to database")
|
|
|
|
# ------------------------------------------------------------
|
|
# CLI
|
|
# ------------------------------------------------------------
|
|
|
|
class HEPC(cli.Application):
|
|
"""
|
|
HEPC processes db.zip files, creates content-addressed artifacts,
|
|
and records metadata in metadata.sql.
|
|
"""
|
|
|
|
db_collection_dir = cli.SwitchAttr(
|
|
"--db_collection_dir", str, mandatory=True)
|
|
|
|
starting_path = cli.SwitchAttr(
|
|
"--starting_path", str, mandatory=True)
|
|
|
|
max_dbs = cli.SwitchAttr(
|
|
"--max_dbs", int, default=100)
|
|
|
|
max_workers = cli.SwitchAttr(
|
|
"--max_workers", int, default=4)
|
|
|
|
def main(self):
|
|
db_collection_dir = expand_path(self.db_collection_dir)
|
|
starting_path = expand_path(self.starting_path)
|
|
|
|
mkdir("-p", db_collection_dir)
|
|
log("INFO", f"Searching for db.zip files in {starting_path}")
|
|
|
|
db_files = find(
|
|
starting_path, "-type", "f", "-name", "db.zip", "-size", "+0c"
|
|
).splitlines()
|
|
|
|
if not db_files:
|
|
log("WARN", "No db.zip files found.")
|
|
return
|
|
|
|
db_files = db_files[:self.max_dbs]
|
|
log("INFO",
|
|
f"Processing {len(db_files)} db.zip files "
|
|
f"with {self.max_workers} workers")
|
|
|
|
result_queue = queue.Queue()
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
futures = {
|
|
executor.submit(
|
|
process_db_file, z, db_collection_dir, result_queue): z
|
|
for z in db_files
|
|
}
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
log("ERROR", f"Worker failure: {e}")
|
|
|
|
write_metadata_to_db(result_queue, db_collection_dir)
|
|
log("INFO", "Processing completed.")
|
|
|
|
if __name__ == "__main__":
|
|
HEPC.run()
|