initial mrvahepc commit

This commit is contained in:
Michael Hohn
2024-12-17 21:29:27 -08:00
committed by =Michael Hohn
commit fadc669586
8 changed files with 560 additions and 0 deletions

120
bin/mc-hepc-init Executable file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
import json
import hashlib
import yaml
import sys
from plumbum import cli, local
from plumbum.cmd import find, mkdir, cp, rm, mktemp, unzip, date, env
# Logging function
def log(level, message):
colors = {
"INFO": "\033[1;34m",
"WARN": "\033[1;33m",
"ERROR": "\033[1;31m",
"RESET": "\033[0m",
}
timestamp = date("+%Y-%m-%d %H:%M:%S").strip()
print(f"{colors[level]}[{timestamp}] [{level}] {message}{colors['RESET']}", file=sys.stderr)
# Generate a CID (cumulative id)
def generate_cid(cli_version, creation_time, primary_language, sha):
hash_input = f"{cli_version} {creation_time} {primary_language} {sha}".encode()
return hashlib.sha256(hash_input).hexdigest()[:6]
# Expand environment variables in paths
def expand_path(path):
return local.env.expand(path)
# Process a single db.zip file
def process_db_file(zip_path, db_collection_dir):
temp_dir = mktemp("-d").strip()
try:
unzip("-o", "-q", zip_path, "*codeql-database.yml", "-d", temp_dir)
# Locate the YAML file regardless of its depth
yaml_files = list(local.path(temp_dir).walk(
filter=lambda p: p.name == "codeql-database.yml"))
if not yaml_files:
log("WARN", f"No codeql-database.yml found in {zip_path}")
return
yaml_path = yaml_files[0]
with yaml_path.open("r") as f:
yaml_data = yaml.safe_load(f)
primary_language = yaml_data["primaryLanguage"]
creation_metadata = yaml_data["creationMetadata"]
sha = creation_metadata["sha"]
cli_version = creation_metadata["cliVersion"]
creation_time = creation_metadata["creationTime"]
source_location_prefix = local.path(yaml_data["sourceLocationPrefix"])
repo = source_location_prefix.name
owner = source_location_prefix.parent.name
cid = generate_cid(cli_version, creation_time, primary_language, sha)
new_db_fname = f"{owner}-{repo}-ctsj-{cid}.zip"
result_url = f"http://hepc/{db_collection_dir}/{new_db_fname}"
metadata = {
"git_branch" : "HEAD",
"git_commit_id" : sha,
"git_repo" : repo,
"ingestion_datetime_utc" : str(creation_time),
"result_url" : result_url,
"tool_id" : "9f2f9642-febb-4435-9204-fb50bbd43de4",
"tool_name" : f"codeql-{primary_language}",
"tool_version" : cli_version,
"projname" : f"{owner}/{repo}",
}
metadata_file = local.path(db_collection_dir) / "metadata.json"
with metadata_file.open("a") as f:
json.dump(metadata, f)
f.write("\n")
copy_path = local.path(db_collection_dir) / new_db_fname
if not copy_path.exists():
cp(zip_path, copy_path)
except Exception as e:
log("WARN", f"Error processing {zip_path}: {e}")
finally:
rm("-rf", temp_dir)
# Main application class
class DBProcessor(cli.Application):
"""
DBProcessor processes db.zip files found in a starting directory,
copies updated names in a collection directory,
and adds a metadata information file "metadata.json" to the directory.
"""
db_collection_dir = cli.SwitchAttr(
"--db_collection_dir", str, mandatory=True, help="Specify the database collection directory"
)
starting_path = cli.SwitchAttr(
"--starting_path", str, mandatory=True, help="Specify the starting path"
)
def main(self):
db_collection_dir = expand_path(self.db_collection_dir)
starting_path = expand_path(self.starting_path)
mkdir("-p", db_collection_dir)
log("INFO", f"Searching for db.zip files in {starting_path}")
db_files = find(starting_path, "-type", "f", "-name", "db.zip",
"-size", "+0c").splitlines()
if not db_files:
log("WARN", "No db.zip files found in the specified starting path.")
return
for zip_path in db_files:
process_db_file(zip_path, db_collection_dir)
log("INFO", "Processing completed.")
if __name__ == "__main__":
DBProcessor.run()

88
bin/mc-hepc-serve Executable file
View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
import logging
from pathlib import Path
from plumbum import cli
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
import uvicorn
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# FastAPI application
app = FastAPI()
db_dir = None # This will be set by the CLI application
@app.get("/{file_path:path}")
def serve_file(file_path: str):
"""
Serve files from the database directory, such as .zip files or metadata.json.
"""
logger.info(f"Requested file: {file_path}")
resolved_path = Path(file_path).resolve(strict=True)
logger.info(f"file resolved to: {resolved_path}")
if not resolved_path.exists():
logger.error(f"File not found: {resolved_path}")
raise HTTPException(status_code=404, detail=f"{resolved_path} not found")
return FileResponse(resolved_path)
@app.get("/index")
@app.get("/api/v1/latest_results/codeql-all")
def serve_metadata_json():
"""
Serve the metadata.json file for multiple routes.
"""
metadata_path = Path(db_dir) / "metadata.json"
logger.info(f"Requested metadata.json at: {metadata_path}")
if not metadata_path.exists():
logger.error("metadata.json not found.")
raise HTTPException(status_code=404, detail="metadata.json not found")
logger.info(f"Serving metadata.json from: {metadata_path}")
return FileResponse(metadata_path)
@app.middleware("http")
async def log_request(request, call_next):
logger.info(f"Incoming request: {request.method} {request.url}")
response = await call_next(request)
return response
class MRVAHepc(cli.Application):
"""
MRVAHepc serves:
1. CodeQL database .zip files found in the --codeql-db-dir
2. Metadata for those zip files, contained in metadata.json in the same
directory.
The HTTP endpoints are:
1. /{filename}
2. /index
3. /api/v1/latest_results/codeql-all
"""
codeql_db_dir = cli.SwitchAttr("--codeql-db-dir", str, mandatory=True,
help="Directory containing CodeQL database files")
host = cli.SwitchAttr("--host", str, default="127.0.0.1",
help="Host address for the HTTP server")
port = cli.SwitchAttr("--port", int, default=8070, help="Port for the HTTP server")
def main(self):
global db_dir
db_dir = Path(self.codeql_db_dir)
if not db_dir.is_dir():
logger.error(f"Invalid directory: {db_dir}")
return 1
logger.info(f"Starting server at {self.host}:{self.port}")
logger.info(f"Serving files from directory: {db_dir}")
# Run the FastAPI server using Uvicorn
uvicorn.run(app, host=self.host, port=self.port)
if __name__ == "__main__":
MRVAHepc.run()