Files
mrvahepc/bin/host-hepc-init
michael hohn cee1cd4da6 Switch DB identity to true content addressing
- Introduce content_hash as authoritative identifier (hash of full db.zip)
- Demote previous CID to build_cid (build context only)
- Store full content_hash in metadata; use short hash only for filenames
- Keep build metadata (cliVersion, creationTime, language, source SHA) as fields
- Update schema to make content_hash the primary key
- Filenames now include both build_cid and content hash for human inspection

This separates byte identity from provenance, enabling
deduplication, reproducibility checks, and scale-safe indexing.
2025-12-24 15:11:03 -08:00

286 lines
8.7 KiB
Python
Executable File

#!/usr/bin/env -S uv run python
# -*- python -*-
import json
import hashlib
import yaml
import sys
import os
import sqlite3
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from plumbum import cli, local
from plumbum.cmd import find, mkdir, ln, rm, mktemp, unzip, date, env
# ------------------------------------------------------------
# Logging
# ------------------------------------------------------------
def log(level, message):
colors = {
"INFO": "\033[1;34m",
"WARN": "\033[1;33m",
"ERROR": "\033[1;31m",
"RESET": "\033[0m",
}
timestamp = date("+%Y-%m-%d %H:%M:%S").strip()
print(f"{colors[level]}[{timestamp}] [{level}] {message}{colors['RESET']}",
file=sys.stderr)
# ------------------------------------------------------------
# Identifiers
# ------------------------------------------------------------
def generate_build_cid(cli_version, creation_time, primary_language, source_sha):
"""
Context / build identifier.
NOT content addressing.
"""
s = f"{cli_version} {creation_time} {primary_language} {source_sha}"
return hashlib.sha256(s.encode()).hexdigest()[:10]
def hash_file(path, algo="sha256", chunk_size=1024 * 1024):
"""
True content hash of the DB artifact.
"""
h = hashlib.new(algo)
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
return h.hexdigest()
# ------------------------------------------------------------
# Paths
# ------------------------------------------------------------
def expand_path(path):
return local.env.expand(path)
# ------------------------------------------------------------
# SQLite metadata DB
# ------------------------------------------------------------
def init_metadata_db(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS metadata (
content_hash TEXT PRIMARY KEY,
build_cid TEXT,
git_branch TEXT,
git_commit_id TEXT,
git_owner TEXT,
git_repo TEXT,
ingestion_datetime_utc TEXT,
primary_language TEXT,
result_url TEXT,
tool_name TEXT,
tool_version TEXT,
projname TEXT,
db_file_size INTEGER
)
''')
conn.commit()
conn.close()
# ------------------------------------------------------------
# DB processing
# ------------------------------------------------------------
def process_db_file(zip_path, db_collection_dir, result_queue):
temp_dir = mktemp("-d").strip()
try:
unzip("-o", "-q", zip_path, "*codeql-database.yml", "-d", temp_dir)
yaml_files = list(local.path(temp_dir).walk(
filter=lambda p: p.name == "codeql-database.yml"))
if not yaml_files:
log("WARN", f"No codeql-database.yml found in {zip_path}")
return
yaml_path = yaml_files[0]
with yaml_path.open("r") as f:
yaml_data = yaml.safe_load(f)
primary_language = yaml_data["primaryLanguage"]
creation_metadata = yaml_data["creationMetadata"]
source_sha = creation_metadata["sha"]
cli_version = creation_metadata["cliVersion"]
creation_time = creation_metadata["creationTime"]
source_location_prefix = local.path(
yaml_data["sourceLocationPrefix"])
repo = source_location_prefix.name
owner = source_location_prefix.parent.name
# ---- identities ----
build_cid = generate_build_cid(
cli_version, creation_time, primary_language, source_sha)
content_hash = hash_file(zip_path)
# short form only for filenames / URLs
content_hash_short = content_hash[:16]
new_db_fname = (
f"{owner}-{repo}-ctsj-{build_cid}-{content_hash_short}.zip"
)
hepc_endpoint = os.getenv("MRVA_HEPC_ENDPOINT")
if hepc_endpoint is None:
log("WARN",
"Environment variable 'MRVA_HEPC_ENDPOINT' is not set. "
"Using default 'http://hepc'.")
hepc_endpoint = "http://hepc"
result_url = f"{hepc_endpoint}/db/{db_collection_dir}/{new_db_fname}"
file_size = local.path(zip_path).stat().st_size
metadata = {
"content_hash" : content_hash,
"build_cid" : build_cid,
"git_branch" : "HEAD",
"git_commit_id" : source_sha,
"git_owner" : owner,
"git_repo" : repo,
"ingestion_datetime_utc" : str(creation_time),
"primary_language" : primary_language,
"result_url" : result_url,
"tool_name" : f"codeql-{primary_language}",
"tool_version" : cli_version,
"projname" : f"{owner}/{repo}",
"db_file_size" : file_size,
}
result_queue.put(metadata)
copy_path = local.path(db_collection_dir) / new_db_fname
if not copy_path.exists():
ln("-s", zip_path, copy_path)
except Exception as e:
log("WARN", f"Error processing {zip_path}: {e}")
finally:
rm("-rf", temp_dir)
# ------------------------------------------------------------
# Batch DB write
# ------------------------------------------------------------
def write_metadata_to_db(result_queue, db_collection_dir):
metadata_db = local.path(db_collection_dir) / "metadata.sql"
init_metadata_db(str(metadata_db))
all_metadata = []
while not result_queue.empty():
try:
all_metadata.append(result_queue.get_nowait())
except queue.Empty:
break
if not all_metadata:
log("INFO", "No metadata to write to database")
return
conn = sqlite3.connect(str(metadata_db))
cursor = conn.cursor()
cursor.executemany('''
INSERT OR IGNORE INTO metadata (
content_hash, build_cid,
git_branch, git_commit_id, git_owner, git_repo,
ingestion_datetime_utc, primary_language,
result_url, tool_name, tool_version,
projname, db_file_size
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [
(
m["content_hash"],
m["build_cid"],
m["git_branch"],
m["git_commit_id"],
m["git_owner"],
m["git_repo"],
m["ingestion_datetime_utc"],
m["primary_language"],
m["result_url"],
m["tool_name"],
m["tool_version"],
m["projname"],
m["db_file_size"]
)
for m in all_metadata
])
conn.commit()
conn.close()
log("INFO",
f"Successfully wrote {len(all_metadata)} metadata records to database")
# ------------------------------------------------------------
# CLI
# ------------------------------------------------------------
class HEPC(cli.Application):
"""
HEPC processes db.zip files, creates content-addressed artifacts,
and records metadata in metadata.sql.
"""
db_collection_dir = cli.SwitchAttr(
"--db_collection_dir", str, mandatory=True)
starting_path = cli.SwitchAttr(
"--starting_path", str, mandatory=True)
max_dbs = cli.SwitchAttr(
"--max_dbs", int, default=100)
max_workers = cli.SwitchAttr(
"--max_workers", int, default=4)
def main(self):
db_collection_dir = expand_path(self.db_collection_dir)
starting_path = expand_path(self.starting_path)
mkdir("-p", db_collection_dir)
log("INFO", f"Searching for db.zip files in {starting_path}")
db_files = find(
starting_path, "-type", "f", "-name", "db.zip", "-size", "+0c"
).splitlines()
if not db_files:
log("WARN", "No db.zip files found.")
return
db_files = db_files[:self.max_dbs]
log("INFO",
f"Processing {len(db_files)} db.zip files "
f"with {self.max_workers} workers")
result_queue = queue.Queue()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(
process_db_file, z, db_collection_dir, result_queue): z
for z in db_files
}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
log("ERROR", f"Worker failure: {e}")
write_metadata_to_db(result_queue, db_collection_dir)
log("INFO", "Processing completed.")
if __name__ == "__main__":
HEPC.run()