mirror of
https://github.com/github/codeql.git
synced 2025-12-17 01:03:14 +01:00
Bulk generator: Format file and add a note at the top of the file specifying the formatting requirements.
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
"""
|
||||
Experimental script for bulk generation of MaD models based on a list of projects.
|
||||
|
||||
Note: This file must be formatted using the Black Python formatter.
|
||||
"""
|
||||
|
||||
import os.path
|
||||
@@ -24,6 +26,7 @@ gitroot = (
|
||||
)
|
||||
build_dir = os.path.join(gitroot, "mad-generation-build")
|
||||
|
||||
|
||||
# A project to generate models for
|
||||
class Project(TypedDict):
|
||||
"""
|
||||
@@ -132,7 +135,9 @@ def clone_projects(projects: List[Project]) -> List[tuple[Project, str]]:
|
||||
return project_dirs
|
||||
|
||||
|
||||
def build_database(language: str, extractor_options, project: Project, project_dir: str) -> str | None:
|
||||
def build_database(
|
||||
language: str, extractor_options, project: Project, project_dir: str
|
||||
) -> str | None:
|
||||
"""
|
||||
Build a CodeQL database for a project.
|
||||
|
||||
@@ -179,6 +184,7 @@ def build_database(language: str, extractor_options, project: Project, project_d
|
||||
|
||||
return database_dir
|
||||
|
||||
|
||||
def generate_models(args, name: str, database_dir: str) -> None:
|
||||
"""
|
||||
Generate models for a project.
|
||||
@@ -196,7 +202,10 @@ def generate_models(args, name: str, database_dir: str) -> None:
|
||||
generator.setenvironment(database=database_dir, folder=name)
|
||||
generator.run()
|
||||
|
||||
def build_databases_from_projects(language: str, extractor_options, projects: List[Project]) -> List[tuple[str, str | None]]:
|
||||
|
||||
def build_databases_from_projects(
|
||||
language: str, extractor_options, projects: List[Project]
|
||||
) -> List[tuple[str, str | None]]:
|
||||
"""
|
||||
Build databases for all projects in parallel.
|
||||
|
||||
@@ -215,11 +224,15 @@ def build_databases_from_projects(language: str, extractor_options, projects: Li
|
||||
# Phase 2: Build databases for all projects
|
||||
print("\n=== Phase 2: Building databases ===")
|
||||
database_results = [
|
||||
(project["name"], build_database(language, extractor_options, project, project_dir))
|
||||
(
|
||||
project["name"],
|
||||
build_database(language, extractor_options, project, project_dir),
|
||||
)
|
||||
for project, project_dir in project_dirs
|
||||
]
|
||||
return database_results
|
||||
|
||||
|
||||
def github(url: str, pat: str, extra_headers: dict[str, str] = {}) -> dict:
|
||||
"""
|
||||
Download a JSON file from GitHub using a personal access token (PAT).
|
||||
@@ -238,6 +251,7 @@ def github(url: str, pat: str, extra_headers: dict[str, str] = {}) -> dict:
|
||||
else:
|
||||
return response.json()
|
||||
|
||||
|
||||
def download_artifact(url: str, artifact_name: str, pat: str) -> str:
|
||||
"""
|
||||
Download a GitHub Actions artifact from a given URL.
|
||||
@@ -262,15 +276,20 @@ def download_artifact(url: str, artifact_name: str, pat: str) -> str:
|
||||
print(f"Failed to download file. Status code: {response.status_code}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def remove_extension(filename: str) -> str:
|
||||
while "." in filename:
|
||||
filename, _ = os.path.splitext(filename)
|
||||
return filename
|
||||
|
||||
|
||||
def pretty_name_from_artifact_name(artifact_name: str) -> str:
|
||||
return artifact_name.split("___")[1]
|
||||
|
||||
def download_dca_databases(experiment_name: str, pat: str, projects) -> List[tuple[str, str | None]]:
|
||||
|
||||
def download_dca_databases(
|
||||
experiment_name: str, pat: str, projects
|
||||
) -> List[tuple[str, str | None]]:
|
||||
"""
|
||||
Download databases from a DCA experiment.
|
||||
Args:
|
||||
@@ -282,7 +301,10 @@ def download_dca_databases(experiment_name: str, pat: str, projects) -> List[tup
|
||||
"""
|
||||
database_results = []
|
||||
print("\n=== Finding projects ===")
|
||||
response = github(f"https://raw.githubusercontent.com/github/codeql-dca-main/data/{experiment_name}/reports/downloads.json", pat)
|
||||
response = github(
|
||||
f"https://raw.githubusercontent.com/github/codeql-dca-main/data/{experiment_name}/reports/downloads.json",
|
||||
pat,
|
||||
)
|
||||
targets = response["targets"]
|
||||
for target, data in targets.items():
|
||||
downloads = data["downloads"]
|
||||
@@ -297,16 +319,22 @@ def download_dca_databases(experiment_name: str, pat: str, projects) -> List[tup
|
||||
repository = analyzed_database["repository"]
|
||||
run_id = analyzed_database["run_id"]
|
||||
print(f"=== Finding artifact: {artifact_name} ===")
|
||||
response = github(f"https://api.github.com/repos/{repository}/actions/runs/{run_id}/artifacts", pat, { "Accept": "application/vnd.github+json" })
|
||||
response = github(
|
||||
f"https://api.github.com/repos/{repository}/actions/runs/{run_id}/artifacts",
|
||||
pat,
|
||||
{"Accept": "application/vnd.github+json"},
|
||||
)
|
||||
artifacts = response["artifacts"]
|
||||
artifact_map = {artifact["name"]: artifact for artifact in artifacts}
|
||||
print(f"=== Downloading artifact: {artifact_name} ===")
|
||||
archive_download_url = artifact_map[artifact_name]["archive_download_url"]
|
||||
artifact_zip_location = download_artifact(archive_download_url, artifact_name, pat)
|
||||
artifact_zip_location = download_artifact(
|
||||
archive_download_url, artifact_name, pat
|
||||
)
|
||||
print(f"=== Extracting artifact: {artifact_name} ===")
|
||||
# The database is in a zip file, which contains a tar.gz file with the DB
|
||||
# First we open the zip file
|
||||
with zipfile.ZipFile(artifact_zip_location, 'r') as zip_ref:
|
||||
with zipfile.ZipFile(artifact_zip_location, "r") as zip_ref:
|
||||
artifact_unzipped_location = os.path.join(build_dir, artifact_name)
|
||||
# And then we extract it to build_dir/artifact_name
|
||||
zip_ref.extractall(artifact_unzipped_location)
|
||||
@@ -317,23 +345,37 @@ def download_dca_databases(experiment_name: str, pat: str, projects) -> List[tup
|
||||
with tarfile.open(artifact_tar_location, "r:gz") as tar_ref:
|
||||
# And we just untar it to the same directory as the zip file
|
||||
tar_ref.extractall(artifact_unzipped_location)
|
||||
database_results.append((pretty_name, os.path.join(artifact_unzipped_location, remove_extension(entry))))
|
||||
database_results.append(
|
||||
(
|
||||
pretty_name,
|
||||
os.path.join(
|
||||
artifact_unzipped_location, remove_extension(entry)
|
||||
),
|
||||
)
|
||||
)
|
||||
print(f"\n=== Extracted {len(database_results)} databases ===")
|
||||
|
||||
def compare(a, b):
|
||||
a_index = next(i for i, project in enumerate(projects) if project["name"] == a[0])
|
||||
b_index = next(i for i, project in enumerate(projects) if project["name"] == b[0])
|
||||
a_index = next(
|
||||
i for i, project in enumerate(projects) if project["name"] == a[0]
|
||||
)
|
||||
b_index = next(
|
||||
i for i, project in enumerate(projects) if project["name"] == b[0]
|
||||
)
|
||||
return a_index - b_index
|
||||
|
||||
# Sort the database results based on the order in the projects file
|
||||
return sorted(database_results, key=cmp_to_key(compare))
|
||||
|
||||
|
||||
def get_destination_for_project(config, name: str) -> str:
|
||||
return os.path.join(config["destination"], name)
|
||||
|
||||
|
||||
def get_strategy(config) -> str:
|
||||
return config["strategy"].lower()
|
||||
|
||||
|
||||
def main(config, args) -> None:
|
||||
"""
|
||||
Main function to handle the bulk generation of MaD models.
|
||||
@@ -371,7 +413,9 @@ To avoid loss of data, please commit your changes."""
|
||||
match get_strategy(config):
|
||||
case "repo":
|
||||
extractor_options = config.get("extractor_options", [])
|
||||
database_results = build_databases_from_projects(language, extractor_options, projects)
|
||||
database_results = build_databases_from_projects(
|
||||
language, extractor_options, projects
|
||||
)
|
||||
case "dca":
|
||||
experiment_name = args.dca
|
||||
if experiment_name is None:
|
||||
@@ -386,9 +430,7 @@ To avoid loss of data, please commit your changes."""
|
||||
# Phase 3: Generate models for all projects
|
||||
print("\n=== Phase 3: Generating models ===")
|
||||
|
||||
failed_builds = [
|
||||
project for project, db_dir in database_results if db_dir is None
|
||||
]
|
||||
failed_builds = [project for project, db_dir in database_results if db_dir is None]
|
||||
if failed_builds:
|
||||
print(
|
||||
f"ERROR: {len(failed_builds)} database builds failed: {', '.join(failed_builds)}"
|
||||
@@ -406,15 +448,36 @@ To avoid loss of data, please commit your changes."""
|
||||
if database_dir is not None:
|
||||
generate_models(args, project, database_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--config", type=str, help="Path to the configuration file.", required=True)
|
||||
parser.add_argument("--dca", type=str, help="Name of a DCA run that built all the projects", required=False)
|
||||
parser.add_argument("--pat", type=str, help="PAT token to grab DCA databases (the same as the one you use for DCA)", required=False)
|
||||
parser.add_argument("--lang", type=str, help="The language to generate models for", required=True)
|
||||
parser.add_argument("--with-sources", action="store_true", help="Generate sources", required=False)
|
||||
parser.add_argument("--with-sinks", action="store_true", help="Generate sinks", required=False)
|
||||
parser.add_argument("--with-summaries", action="store_true", help="Generate sinks", required=False)
|
||||
parser.add_argument(
|
||||
"--config", type=str, help="Path to the configuration file.", required=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dca",
|
||||
type=str,
|
||||
help="Name of a DCA run that built all the projects",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pat",
|
||||
type=str,
|
||||
help="PAT token to grab DCA databases (the same as the one you use for DCA)",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lang", type=str, help="The language to generate models for", required=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-sources", action="store_true", help="Generate sources", required=False
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-sinks", action="store_true", help="Generate sinks", required=False
|
||||
)
|
||||
parser.add_argument(
|
||||
"--with-summaries", action="store_true", help="Generate sinks", required=False
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load config file
|
||||
|
||||
Reference in New Issue
Block a user