mirror of
https://github.com/hohn/codeql-for-firefox.git
synced 2025-12-16 15:43:05 +01:00
248 lines
8.3 KiB
Python
Executable File
248 lines
8.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Extract only results from SARIF files while maintaining valid SARIF structure.
|
|
Removes artifacts, conversion, invocations, and other non-essential data.
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
def create_minimal_sarif(original_sarif: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Create a minimal valid SARIF file containing only results and required metadata.
|
|
|
|
Args:
|
|
original_sarif: Complete SARIF data structure
|
|
|
|
Returns:
|
|
Minimal valid SARIF with only results
|
|
"""
|
|
|
|
# Start with minimal required SARIF structure
|
|
minimal_sarif = {
|
|
"version": original_sarif.get("version", "2.1.0"),
|
|
"$schema": original_sarif.get("$schema",
|
|
"https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json"),
|
|
"runs": []
|
|
}
|
|
|
|
# Process each run
|
|
for run in original_sarif.get("runs", []):
|
|
minimal_run = {}
|
|
|
|
# Required: tool information (minimal)
|
|
if "tool" in run:
|
|
minimal_tool = {"driver": {}}
|
|
|
|
# Keep only essential tool.driver fields
|
|
if "driver" in run["tool"]:
|
|
driver = run["tool"]["driver"]
|
|
minimal_driver = {}
|
|
|
|
# Required fields
|
|
if "name" in driver:
|
|
minimal_driver["name"] = driver["name"]
|
|
else:
|
|
minimal_driver["name"] = "Unknown Tool" # Fallback
|
|
|
|
# Optional but useful fields
|
|
for field in ["version", "informationUri", "semanticVersion"]:
|
|
if field in driver:
|
|
minimal_driver[field] = driver[field]
|
|
|
|
# Keep rules if they exist (needed for result rule references)
|
|
if "rules" in driver:
|
|
minimal_driver["rules"] = driver["rules"]
|
|
|
|
minimal_tool["driver"] = minimal_driver
|
|
|
|
minimal_run["tool"] = minimal_tool
|
|
else:
|
|
# Fallback if no tool information
|
|
minimal_run["tool"] = {"driver": {"name": "Unknown Tool"}}
|
|
|
|
# Main content: results
|
|
if "results" in run:
|
|
minimal_run["results"] = run["results"]
|
|
else:
|
|
minimal_run["results"] = []
|
|
|
|
# Optional: keep taxonomies if present (sometimes referenced in results)
|
|
if "taxonomies" in run:
|
|
minimal_run["taxonomies"] = run["taxonomies"]
|
|
|
|
# Optional: keep threadFlowLocations if present
|
|
if "threadFlowLocations" in run:
|
|
minimal_run["threadFlowLocations"] = run["threadFlowLocations"]
|
|
|
|
# Optional: keep graphs if present (sometimes used for data flow)
|
|
if "graphs" in run:
|
|
minimal_run["graphs"] = run["graphs"]
|
|
|
|
# Optional: keep logical locations if present
|
|
if "logicalLocations" in run:
|
|
minimal_run["logicalLocations"] = run["logicalLocations"]
|
|
|
|
minimal_sarif["runs"].append(minimal_run)
|
|
|
|
return minimal_sarif
|
|
|
|
def calculate_size_reduction(original_size: int, minimal_size: int) -> Dict[str, Any]:
|
|
"""
|
|
Calculate size reduction metrics.
|
|
|
|
Args:
|
|
original_size: Size of original file in bytes
|
|
minimal_size: Size of minimal file in bytes
|
|
|
|
Returns:
|
|
Dictionary with size metrics
|
|
"""
|
|
reduction_bytes = original_size - minimal_size
|
|
reduction_percent = (reduction_bytes / original_size) * 100 if original_size > 0 else 0
|
|
|
|
return {
|
|
"original_size": original_size,
|
|
"minimal_size": minimal_size,
|
|
"reduction_bytes": reduction_bytes,
|
|
"reduction_percent": reduction_percent,
|
|
"original_size_mb": original_size / (1024 * 1024),
|
|
"minimal_size_mb": minimal_size / (1024 * 1024),
|
|
"reduction_mb": reduction_bytes / (1024 * 1024)
|
|
}
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""Format size in human-readable format."""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} TB"
|
|
|
|
def process_sarif_file(input_path: Path, output_path: Path = None,
|
|
verbose: bool = False) -> None:
|
|
"""
|
|
Process a SARIF file to extract only results.
|
|
|
|
Args:
|
|
input_path: Path to input SARIF file
|
|
output_path: Path to output file (optional, auto-generated if not provided)
|
|
verbose: Print detailed information
|
|
"""
|
|
|
|
if not input_path.exists():
|
|
print(f"Error: Input file '{input_path}' does not exist.", file=sys.stderr)
|
|
return
|
|
|
|
# Auto-generate output path if not provided
|
|
if output_path is None:
|
|
output_path = input_path.parent / f"{input_path.stem}.minimal.sarif"
|
|
|
|
try:
|
|
# Read original SARIF
|
|
if verbose:
|
|
print(f"Reading: {input_path}")
|
|
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
original_sarif = json.load(f)
|
|
|
|
# Create minimal SARIF
|
|
minimal_sarif = create_minimal_sarif(original_sarif)
|
|
|
|
# Count results
|
|
total_results = sum(len(run.get("results", []))
|
|
for run in minimal_sarif.get("runs", []))
|
|
|
|
# Write minimal SARIF
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(minimal_sarif, f, indent=2, ensure_ascii=False)
|
|
|
|
# Calculate size reduction
|
|
original_size = input_path.stat().st_size
|
|
minimal_size = output_path.stat().st_size
|
|
metrics = calculate_size_reduction(original_size, minimal_size)
|
|
|
|
# Report results
|
|
print(f"\n✓ Processed: {input_path.name}")
|
|
print(f" Output: {output_path.name}")
|
|
print(f" Results extracted: {total_results:,}")
|
|
print(f" Original size: {format_size(metrics['original_size'])}")
|
|
print(f" Minimal size: {format_size(metrics['minimal_size'])}")
|
|
print(f" Reduction: {format_size(metrics['reduction_bytes'])} ({metrics['reduction_percent']:.1f}%)")
|
|
|
|
if verbose:
|
|
# Show what was removed
|
|
original_runs = original_sarif.get("runs", [])
|
|
if original_runs:
|
|
run = original_runs[0]
|
|
removed_keys = set(run.keys()) - set(minimal_sarif["runs"][0].keys())
|
|
if removed_keys:
|
|
print(f" Removed sections: {', '.join(sorted(removed_keys))}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON in '{input_path}': {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"Error processing '{input_path}': {e}", file=sys.stderr)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Extract only results from SARIF files while maintaining valid SARIF structure.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Process single file (auto-generates .minimal.sarif output)
|
|
%(prog)s input.sarif
|
|
|
|
# Process with custom output name
|
|
%(prog)s input.sarif -o output.sarif
|
|
|
|
# Process multiple files
|
|
%(prog)s file1.sarif file2.sarif file3.sarif
|
|
|
|
# Process with verbose output
|
|
%(prog)s -v large-scan.sarif
|
|
|
|
# Process all SARIF files in current directory
|
|
%(prog)s *.sarif
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"input_files",
|
|
nargs="+",
|
|
type=Path,
|
|
help="Input SARIF file(s) to process"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-o", "--output",
|
|
type=Path,
|
|
help="Output file path (only valid with single input file)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Show detailed information about processing"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate arguments
|
|
if args.output and len(args.input_files) > 1:
|
|
parser.error("Cannot specify --output with multiple input files")
|
|
|
|
# Process files
|
|
for input_file in args.input_files:
|
|
process_sarif_file(
|
|
input_file,
|
|
args.output if len(args.input_files) == 1 else None,
|
|
args.verbose
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|