From e0cbc01d213817f0b572dc0e7dc786d2a73ae163 Mon Sep 17 00:00:00 2001 From: Nicolas Will Date: Mon, 17 Jun 2024 13:16:24 +0200 Subject: [PATCH] Fully implement local and container MRVA --- .env.container | 12 + .gitignore | 2 + cmd/agent/Dockerfile | 10 +- cmd/agent/main.go | 165 +---- cmd/server/Dockerfile | 76 ++- cmd/server/main.go | 132 ++-- config/mcc/system.go | 6 +- docker-compose.yml | 72 +- pkg/agent/agent.go | 237 +++++-- pkg/agent/interfaces.go | 11 +- pkg/artifactstore/common.go | 23 + pkg/artifactstore/interfaces.go | 20 + pkg/artifactstore/store_memory.go | 94 +++ pkg/artifactstore/store_minio.go | 117 ++++ pkg/common/extapi.go | 19 +- pkg/common/interfaces.go | 4 - pkg/common/jobspec.go | 29 + pkg/common/minio.go | 38 ++ pkg/common/types.go | 22 +- pkg/deploy/init.go | 96 +++ pkg/logger/interfaces.go | 4 - pkg/logger/types.go | 14 - pkg/qldbstore/interfaces.go | 42 +- pkg/qldbstore/qldbstore_local.go | 66 ++ pkg/qldbstore/qldbstore_minio.go | 98 +++ pkg/qpstore/interfaces.go | 5 - pkg/qpstore/qpstore.go | 16 - pkg/queue/interfaces.go | 13 +- pkg/queue/queue.go | 31 - pkg/queue/{rabbitmq.go => queue_rabbitmq.go} | 33 +- pkg/queue/queue_single.go | 29 + pkg/queue/types.go | 38 +- pkg/server/interfaces.go | 4 +- pkg/server/server.go | 652 ++++++++++++------- pkg/server/types.go | 48 +- pkg/state/interfaces.go | 41 ++ pkg/state/state_local.go | 126 ++++ pkg/storage/interfaces.go | 10 - pkg/storage/storage.go | 244 ------- pkg/storage/types.go | 66 -- populate-dbstore.sh | 15 +- test/storage_json_test.go | 29 - utils/archive.go | 28 + 43 files changed, 1700 insertions(+), 1137 deletions(-) create mode 100644 .env.container create mode 100644 pkg/artifactstore/common.go create mode 100644 pkg/artifactstore/interfaces.go create mode 100644 pkg/artifactstore/store_memory.go create mode 100644 pkg/artifactstore/store_minio.go delete mode 100644 pkg/common/interfaces.go create mode 100644 pkg/common/jobspec.go create mode 100644 pkg/common/minio.go create mode 100644 pkg/deploy/init.go delete mode 100644 pkg/logger/interfaces.go delete mode 100644 pkg/logger/types.go create mode 100644 pkg/qldbstore/qldbstore_local.go create mode 100644 pkg/qldbstore/qldbstore_minio.go delete mode 100644 pkg/qpstore/interfaces.go delete mode 100644 pkg/qpstore/qpstore.go delete mode 100644 pkg/queue/queue.go rename pkg/queue/{rabbitmq.go => queue_rabbitmq.go} (85%) create mode 100644 pkg/queue/queue_single.go create mode 100644 pkg/state/interfaces.go create mode 100644 pkg/state/state_local.go delete mode 100644 pkg/storage/interfaces.go delete mode 100644 pkg/storage/storage.go delete mode 100644 pkg/storage/types.go mode change 100644 => 100755 populate-dbstore.sh delete mode 100644 test/storage_json_test.go diff --git a/.env.container b/.env.container new file mode 100644 index 0000000..ecab58c --- /dev/null +++ b/.env.container @@ -0,0 +1,12 @@ +MRVA_RABBITMQ_HOST="rabbitmq" +MRVA_RABBITMQ_PORT="5672" +MRVA_RABBITMQ_USER="user" +MRVA_RABBITMQ_PASSWORD="password" +MINIO_ROOT_USER="user" +MINIO_ROOT_PASSWORD="mmusty8432" +ARTIFACT_MINIO_ENDPOINT="artifactstore:9000" +ARTIFACT_MINIO_ID=${MINIO_ROOT_USER} +ARTIFACT_MINIO_SECRET=${MINIO_ROOT_PASSWORD} +QLDB_MINIO_ENDPOINT="dbstore:9000" +QLDB_MINIO_ID=${MINIO_ROOT_USER} +QLDB_MINIO_SECRET=${MINIO_ROOT_PASSWORD} diff --git a/.gitignore b/.gitignore index 37010d6..a3553d5 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,5 @@ go.work.sum # env file .env +/artifactstore-data/.minio.sys +/qldbminio/qldb diff --git a/cmd/agent/Dockerfile b/cmd/agent/Dockerfile index 6b506f7..b38d796 100644 --- a/cmd/agent/Dockerfile +++ b/cmd/agent/Dockerfile @@ -23,7 +23,8 @@ ARG CODEQL_VERSION=latest RUN apt-get update && apt-get install --no-install-recommends --assume-yes \ unzip \ curl \ - ca-certificates + ca-certificates \ + default-jdk # If the version is 'latest', lsget the latest release version from GitHub, unzip the bundle into /opt, and delete the archive RUN if [ "$CODEQL_VERSION" = "latest" ]; then \ @@ -32,14 +33,15 @@ RUN if [ "$CODEQL_VERSION" = "latest" ]; then \ echo "Using CodeQL version $CODEQL_VERSION" && \ curl -L "https://github.com/github/codeql-cli-binaries/releases/download/$CODEQL_VERSION/codeql-linux64.zip" -o /tmp/codeql.zip && \ unzip /tmp/codeql.zip -d /opt && \ - rm /tmp/codeql.zip + rm /tmp/codeql.zip && \ + chmod -R +x /opt/codeql # Set environment variables for CodeQL -ENV CODEQL_CLI_PATH=/opt/codeql +ENV CODEQL_CLI_PATH=/opt/codeql/codeql # Set environment variable for CodeQL for `codeql database analyze` support on ARM # This env var has no functional effect on CodeQL when running on x86_64 linux -ENV CODEQL_JAVA_HOME=/usr/ +ENV CODEQL_JAVA_HOME=/usr # Copy built agent binary from the builder stage WORKDIR /app diff --git a/cmd/agent/main.go b/cmd/agent/main.go index c99ce2e..99bbb42 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -3,170 +3,71 @@ package main import ( "context" "flag" + "log" + "log/slog" "os" "os/signal" - "runtime" - "strconv" "sync" "syscall" - "time" - - "github.com/elastic/go-sysinfo" - "golang.org/x/exp/slog" "mrvacommander/pkg/agent" - "mrvacommander/pkg/queue" + "mrvacommander/pkg/deploy" ) -const ( - workerMemoryMB = 2048 // 2 GB - monitorIntervalSec = 10 // Monitor every 10 seconds -) - -func calculateWorkers() int { - host, err := sysinfo.Host() - if err != nil { - slog.Error("failed to get host info", "error", err) - os.Exit(1) - } - - memInfo, err := host.Memory() - if err != nil { - slog.Error("failed to get memory info", "error", err) - os.Exit(1) - } - - // Get available memory in MB - totalMemoryMB := memInfo.Available / (1024 * 1024) - - // Ensure we have at least one worker - workers := int(totalMemoryMB / workerMemoryMB) - if workers < 1 { - workers = 1 - } - - // Limit the number of workers to the number of CPUs - cpuCount := runtime.NumCPU() - if workers > cpuCount { - workers = max(cpuCount, 1) - } - - return workers -} - -func startAndMonitorWorkers(ctx context.Context, queue queue.Queue, desiredWorkerCount int, wg *sync.WaitGroup) { - currentWorkerCount := 0 - stopChans := make([]chan struct{}, 0) - - if desiredWorkerCount != 0 { - slog.Info("Starting workers", slog.Int("count", desiredWorkerCount)) - for i := 0; i < desiredWorkerCount; i++ { - stopChan := make(chan struct{}) - stopChans = append(stopChans, stopChan) - wg.Add(1) - go agent.RunWorker(ctx, stopChan, queue, wg) - } - return - } - - slog.Info("Worker count not specified, managing based on available memory and CPU") - - for { - select { - case <-ctx.Done(): - // signal all workers to stop - for _, stopChan := range stopChans { - close(stopChan) - } - return - default: - newWorkerCount := calculateWorkers() - - if newWorkerCount != currentWorkerCount { - slog.Info( - "Modifying worker count", - slog.Int("current", currentWorkerCount), - slog.Int("new", newWorkerCount)) - } - - if newWorkerCount > currentWorkerCount { - for i := currentWorkerCount; i < newWorkerCount; i++ { - stopChan := make(chan struct{}) - stopChans = append(stopChans, stopChan) - wg.Add(1) - go agent.RunWorker(ctx, stopChan, queue, wg) - } - } else if newWorkerCount < currentWorkerCount { - for i := newWorkerCount; i < currentWorkerCount; i++ { - close(stopChans[i]) - } - stopChans = stopChans[:newWorkerCount] - } - currentWorkerCount = newWorkerCount - - time.Sleep(monitorIntervalSec * time.Second) - } - } -} - func main() { slog.Info("Starting agent") - workerCount := flag.Int("workers", 0, "number of workers") + logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") flag.Parse() - requiredEnvVars := []string{ - "MRVA_RABBITMQ_HOST", - "MRVA_RABBITMQ_PORT", - "MRVA_RABBITMQ_USER", - "MRVA_RABBITMQ_PASSWORD", - "CODEQL_JAVA_HOME", - "CODEQL_CLI_PATH", - } - - for _, envVar := range requiredEnvVars { - if _, ok := os.LookupEnv(envVar); !ok { - slog.Error("Missing required environment variable", "key", envVar) - os.Exit(1) - } - } - - rmqHost := os.Getenv("MRVA_RABBITMQ_HOST") - rmqPort := os.Getenv("MRVA_RABBITMQ_PORT") - rmqUser := os.Getenv("MRVA_RABBITMQ_USER") - rmqPass := os.Getenv("MRVA_RABBITMQ_PASSWORD") - - rmqPortAsInt, err := strconv.ParseInt(rmqPort, 10, 16) - if err != nil { - slog.Error("Failed to parse RabbitMQ port", slog.Any("error", err)) + // Apply 'loglevel' flag + switch *logLevel { + case "debug": + slog.SetLogLoggerLevel(slog.LevelDebug) + case "info": + slog.SetLogLoggerLevel(slog.LevelInfo) + case "warn": + slog.SetLogLoggerLevel(slog.LevelWarn) + case "error": + slog.SetLogLoggerLevel(slog.LevelError) + default: + log.Printf("Invalid logging verbosity level: %s", *logLevel) os.Exit(1) } - slog.Info("Initializing RabbitMQ queue") + isAgent := true - rabbitMQQueue, err := queue.InitializeRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, false) + rabbitMQQueue, err := deploy.InitRabbitMQ(isAgent) if err != nil { - slog.Error("failed to initialize RabbitMQ", slog.Any("error", err)) + slog.Error("Failed to initialize RabbitMQ", slog.Any("error", err)) os.Exit(1) } defer rabbitMQQueue.Close() + artifacts, err := deploy.InitMinIOArtifactStore() + if err != nil { + slog.Error("Failed to initialize artifact store", slog.Any("error", err)) + os.Exit(1) + } + + databases, err := deploy.InitMinIOCodeQLDatabaseStore() + if err != nil { + slog.Error("Failed to initialize database store", slog.Any("error", err)) + os.Exit(1) + } + var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) - - go startAndMonitorWorkers(ctx, rabbitMQQueue, *workerCount, &wg) - + go agent.StartAndMonitorWorkers(ctx, artifacts, databases, rabbitMQQueue, *workerCount, &wg) slog.Info("Agent started") // Gracefully exit on SIGINT/SIGTERM sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan - slog.Info("Shutting down agent") - // TODO: fix this to gracefully terminate agent workers during jobs + slog.Info("Shutting down agent") cancel() wg.Wait() - slog.Info("Agent shutdown complete") } diff --git a/cmd/server/Dockerfile b/cmd/server/Dockerfile index 23ce81a..0a07a15 100644 --- a/cmd/server/Dockerfile +++ b/cmd/server/Dockerfile @@ -1,38 +1,56 @@ -# Use the ubuntu 22.04 base image -FROM ubuntu:24.10 +FROM golang:1.22 AS builder -# Set architecture to arm64 -ARG ARCH=arm64 -ARG AARCH=aarch64 +# Copy the entire project +WORKDIR /app +COPY . . -# Set environment variables +# Download dependencies +RUN go mod download + +# Set the working directory to the cmd/server subproject +WORKDIR /app/cmd/server + +# Build the server +RUN go build -o /bin/mrva_server ./main.go + +FROM ubuntu:24.10 as runner ENV DEBIAN_FRONTEND=noninteractive -ENV CODEQL_VERSION=codeql-bundle-v2.17.5 -ENV CODEQL_DOWNLOAD_URL=https://github.com/github/codeql-action/releases/download/${CODEQL_VERSION}/codeql-bundle-linux64.tar.gz -ENV JDK_VERSION=22.0.1 -ENV JDK_DOWNLOAD_URL=https://download.oracle.com/java/21/latest/jdk-${JDK_VERSION}_linux-${AARCH}_bin.tar.gz -ENV JDK_DOWNLOAD_URL=https://download.java.net/java/GA/jdk${JDK_VERSION}/c7ec1332f7bb44aeba2eb341ae18aca4/8/GPL/openjdk-${JDK_VERSION}_linux-${AARCH}_bin.tar.gz -ENV CODEQL_JAVA_HOME=/usr/local/jdk-${JDK_VERSION} +# Build argument for CodeQL version, defaulting to the latest release +ARG CODEQL_VERSION=latest -# Install necessary tools -RUN apt-get update && \ - apt-get install -y curl tar && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* +# Install packages +RUN apt-get update && apt-get install --no-install-recommends --assume-yes \ + unzip \ + curl \ + ca-certificates \ + default-jdk -# Add and extract the CodeQL bundle -RUN curl -L $CODEQL_DOWNLOAD_URL -o /tmp/${CODEQL_VERSION}.tar.gz && \ - tar -xzf /tmp/${CODEQL_VERSION}.tar.gz -C /opt && \ - rm /tmp/${CODEQL_VERSION}.tar.gz +# If the version is 'latest', lsget the latest release version from GitHub, unzip the bundle into /opt, and delete the archive +RUN if [ "$CODEQL_VERSION" = "latest" ]; then \ + CODEQL_VERSION=$(curl -s https://api.github.com/repos/github/codeql-cli-binaries/releases/latest | grep '"tag_name"' | sed -E 's/.*"([^"]+)".*/\1/'); \ + fi && \ + echo "Using CodeQL version $CODEQL_VERSION" && \ + curl -L "https://github.com/github/codeql-cli-binaries/releases/download/$CODEQL_VERSION/codeql-linux64.zip" -o /tmp/codeql.zip && \ + unzip /tmp/codeql.zip -d /opt && \ + rm /tmp/codeql.zip && \ + chmod -R +x /opt/codeql -# Add and extract the JDK -RUN curl -L $JDK_DOWNLOAD_URL -o /tmp/jdk-${JDK_VERSION}.tar.gz && \ - tar -xzf /tmp/jdk-${JDK_VERSION}.tar.gz -C /usr/local && \ - rm /tmp/jdk-${JDK_VERSION}.tar.gz +# Set environment variables for CodeQL +ENV CODEQL_CLI_PATH=/opt/codeql/codeql -# Set PATH -ENV PATH=/opt/codeql:"$PATH" +# Set environment variable for CodeQL for `codeql database analyze` support on ARM +# This env var has no functional effect on CodeQL when running on x86_64 linux +ENV CODEQL_JAVA_HOME=/usr -# Prepare host mount point -RUN mkdir /mrva +# Set working directory to /app + +# Copy built server binary from the builder stage +COPY --from=builder /bin/mrva_server ./mrva_server + +# Copy the CodeQL database directory from the builder stage (for standalone mode) +COPY --from=builder /app/cmd/server/codeql ./codeql + +# Run the server with the default mode set to container +ENTRYPOINT ["./mrva_server"] +CMD ["--mode=container"] \ No newline at end of file diff --git a/cmd/server/main.go b/cmd/server/main.go index 2ad201b..8641c84 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -4,20 +4,25 @@ package main import ( + "context" "flag" "log" "log/slog" "os" + "os/signal" + "path/filepath" + "sync" + "syscall" "mrvacommander/config/mcc" "mrvacommander/pkg/agent" - "mrvacommander/pkg/logger" + "mrvacommander/pkg/artifactstore" + "mrvacommander/pkg/deploy" "mrvacommander/pkg/qldbstore" - "mrvacommander/pkg/qpstore" "mrvacommander/pkg/queue" "mrvacommander/pkg/server" - "mrvacommander/pkg/storage" + "mrvacommander/pkg/state" ) func main() { @@ -25,13 +30,14 @@ func main() { helpFlag := flag.Bool("help", false, "Display help message") logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") mode := flag.String("mode", "standalone", "Set mode: standalone, container, cluster") + dbPathRoot := flag.String("dbpath", "", "Set the root path for the database store if using standalone mode.") // Custom usage function for the help flag flag.Usage = func() { log.Printf("Usage of %s:\n", os.Args[0]) flag.PrintDefaults() log.Println("\nExamples:") - log.Println(" go run main.go --loglevel=debug --mode=container") + log.Println("go run main.go --loglevel=debug --mode=container --dbpath=/path/to/db_dir") } // Parse the flags @@ -58,6 +64,20 @@ func main() { os.Exit(1) } + // Process database root if standalone and not provided + if *mode == "standalone" && *dbPathRoot == "" { + slog.Warn("No database root path provided.") + // Current directory of the Executable has a codeql directory. There. + // Resolve the absolute directory based on os.Executable() + execPath, err := os.Executable() + if err != nil { + slog.Error("Failed to get executable path", slog.Any("error", err)) + os.Exit(1) + } + *dbPathRoot = filepath.Dir(execPath) + "/codeql/dbs/" + slog.Info("Using default database root path", "dbPathRoot", *dbPathRoot) + } + // Read configuration config := mcc.LoadConfig("mcconfig.toml") @@ -66,91 +86,73 @@ func main() { log.Printf("Log Level: %s\n", *logLevel) log.Printf("Mode: %s\n", *mode) + // Handle signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Apply 'mode' flag switch *mode { case "standalone": // Assemble single-process version - - sl := logger.NewLoggerSingle(&logger.Visibles{}) - - // FIXME take value from configuration - sq := queue.NewQueueSingle(2, &queue.Visibles{ - Logger: sl, - }) - - ss := storage.NewStorageSingle(config.Storage.StartingID, &storage.Visibles{}) - - qp, err := qpstore.NewStore(&qpstore.Visibles{}) - if err != nil { - slog.Error("Unable to initialize query pack storage") - os.Exit(1) - } - - ql, err := qldbstore.NewStore(&qldbstore.Visibles{}) - if err != nil { - slog.Error("Unable to initialize ql database storage") - os.Exit(1) - } + sq := queue.NewQueueSingle(2) + ss := state.NewLocalState(config.Storage.StartingID) + as := artifactstore.NewInMemoryArtifactStore() + ql := qldbstore.NewLocalFilesystemCodeQLDatabaseStore(*dbPathRoot) server.NewCommanderSingle(&server.Visibles{ - Logger: sl, - Queue: sq, - ServerStore: ss, - QueryPackStore: qp, - QLDBStore: ql, + Queue: sq, + State: ss, + Artifacts: as, + CodeQLDBStore: ql, }) - // FIXME take value from configuration - agent.NewAgentSingle(2, &agent.Visibles{ - Logger: sl, - Queue: sq, - QueryPackStore: qp, - QLDBStore: ql, - }) + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + + go agent.StartAndMonitorWorkers(ctx, as, ql, sq, 2, &wg) + + slog.Info("Started server and standalone agent") + <-sigChan + slog.Info("Shutting down...") + cancel() + wg.Wait() + slog.Info("Agent shutdown complete") case "container": - // Assemble container version - sl := logger.NewLoggerSingle(&logger.Visibles{}) + isAgent := false - // FIXME take value from configuration - sq := queue.NewQueueSingle(2, &queue.Visibles{ - Logger: sl, - }) - - ss := storage.NewStorageSingle(config.Storage.StartingID, &storage.Visibles{}) - - qp, err := qpstore.NewStore(&qpstore.Visibles{}) + rabbitMQQueue, err := deploy.InitRabbitMQ(isAgent) if err != nil { - slog.Error("Unable to initialize query pack storage") + slog.Error("Failed to initialize RabbitMQ", slog.Any("error", err)) + os.Exit(1) + } + defer rabbitMQQueue.Close() + + artifacts, err := deploy.InitMinIOArtifactStore() + if err != nil { + slog.Error("Failed to initialize artifact store", slog.Any("error", err)) os.Exit(1) } - ql, err := qldbstore.NewStore(&qldbstore.Visibles{}) + databases, err := deploy.InitMinIOCodeQLDatabaseStore() if err != nil { - slog.Error("Unable to initialize ql database storage") + slog.Error("Failed to initialize database store", slog.Any("error", err)) os.Exit(1) } - agent.NewAgentSingle(2, &agent.Visibles{ - Logger: sl, - Queue: sq, - QueryPackStore: qp, - QLDBStore: ql, - }) - server.NewCommanderSingle(&server.Visibles{ - Logger: sl, - Queue: sq, - ServerStore: ss, - QueryPackStore: qp, - QLDBStore: ql, + Queue: rabbitMQQueue, + State: state.NewLocalState(config.Storage.StartingID), + Artifacts: artifacts, + CodeQLDBStore: databases, }) - case "cluster": - // Assemble cluster version + slog.Info("Started server in container mode.") + <-sigChan default: - slog.Error("Invalid value for --mode. Allowed values are: standalone, container, cluster\n") + slog.Error("Invalid value for --mode. Allowed values are: standalone, container, cluster") os.Exit(1) } + slog.Info("Server shutdown complete") } diff --git a/config/mcc/system.go b/config/mcc/system.go index 1821f44..f3bf99e 100644 --- a/config/mcc/system.go +++ b/config/mcc/system.go @@ -17,15 +17,15 @@ type System struct { func LoadConfig(fname string) *System { if _, err := os.Stat(fname); err != nil { - slog.Error("Configuration file %s not found", fname) - os.Exit(1) + slog.Warn("Configuration file not found", "name", fname) + return &System{} } var config System _, err := toml.DecodeFile(fname, &config) if err != nil { - slog.Error("", err) + slog.Error("Error decoding configuration file", err) os.Exit(1) } diff --git a/docker-compose.yml b/docker-compose.yml index 966cb4d..5319daf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,37 +7,35 @@ services: volumes: - ./init/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro - ./init/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json:ro - expose: - - "5672" - - "15672" ports: - "5672:5672" - "15672:15672" networks: - backend healthcheck: - test: [ "CMD", "nc", "-z", "localhost", "5672" ] - interval: 5s - timeout: 15s - retries: 1 + test: rabbitmq-diagnostics check_port_connectivity + interval: 30s + timeout: 30s + retries: 10 server: build: - context: ./cmd/server - dockerfile: Dockerfile + context: . + dockerfile: ./cmd/server/Dockerfile + command: [ '--mode=container', '--loglevel=debug' ] container_name: server - stop_grace_period: 1s # Reduce the timeout period for testing - environment: - - MRVA_SERVER_ROOT=/mrva/mrvacommander/cmd/server - command: sh -c "tail -f /dev/null" + stop_grace_period: 1s ports: - - "8080:8080" - volumes: - - ./:/mrva/mrvacommander + - "8080:8080" depends_on: - rabbitmq + - dbstore + - artifactstore networks: - backend + env_file: + - path: ./.env.container + required: true dbstore: image: minio/minio:RELEASE.2024-06-11T03-13-30Z @@ -45,52 +43,46 @@ services: ports: - "9000:9000" - "9001:9001" - environment: - MINIO_ROOT_USER: user - MINIO_ROOT_PASSWORD: mmusty8432 - + env_file: + - path: .env.container + required: true command: server /data --console-address ":9001" volumes: - ./dbstore-data:/data + networks: + - backend - - qpstore: + artifactstore: image: minio/minio:RELEASE.2024-06-11T03-13-30Z - container_name: qpstore + container_name: artifactstore ports: - "19000:9000" # host:container - "19001:9001" - environment: - MINIO_ROOT_USER: user - MINIO_ROOT_PASSWORD: mmusty8432 - + env_file: + - path: ./.env.container + required: true command: server /data --console-address ":9001" volumes: - ./qpstore-data:/data - + networks: + - backend agent: build: context: . dockerfile: ./cmd/agent/Dockerfile + command: [ '--loglevel=debug' ] container_name: agent depends_on: - rabbitmq - - minio - environment: - MRVA_RABBITMQ_HOST: rabbitmq - MRVA_RABBITMQ_PORT: 5672 - MRVA_RABBITMQ_USER: user - MRVA_RABBITMQ_PASSWORD: password + - dbstore + - artifactstore + env_file: + - path: ./.env.container + required: true networks: - backend - networks: backend: driver: bridge - -# Remove named volumes to use bind mounts -# volumes: -# minio-data: - diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 6adcdfa..38ef41a 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -4,26 +4,29 @@ import ( "context" "fmt" "log/slog" + "mrvacommander/pkg/artifactstore" "mrvacommander/pkg/codeql" "mrvacommander/pkg/common" - "mrvacommander/pkg/logger" "mrvacommander/pkg/qldbstore" - "mrvacommander/pkg/qpstore" "mrvacommander/pkg/queue" "mrvacommander/utils" "os" "path/filepath" + "runtime" "sync" + "time" + "github.com/elastic/go-sysinfo" "github.com/google/uuid" ) +/* type RunnerSingle struct { queue queue.Queue } -func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle { - r := RunnerSingle{queue: av.Queue} +func NewAgentSingle(numWorkers int, v *Visibles) *RunnerSingle { + r := RunnerSingle{queue: v.Queue} for id := 1; id <= numWorkers; id++ { go r.worker(id) @@ -31,72 +34,175 @@ func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle { return &r } -type Visibles struct { - Logger logger.Logger - Queue queue.Queue - // TODO extra package for query pack storage - QueryPackStore qpstore.Storage - // TODO extra package for ql db storage - QLDBStore qldbstore.Storage +func (r *RunnerSingle) worker(wid int) { + var job common.AnalyzeJob + + for { + job = <-r.queue.Jobs() + result, err := RunAnalysisJob(job) + if err != nil { + slog.Error("Failed to run analysis job", slog.Any("error", err)) + continue + } + r.queue.Results() <- result + } +} +*/ + +const ( + workerMemoryMB = 2048 // 2 GB + monitorIntervalSec = 10 // Monitor every 10 seconds +) + +func calculateWorkers() int { + host, err := sysinfo.Host() + if err != nil { + slog.Error("failed to get host info", "error", err) + os.Exit(1) + } + + memInfo, err := host.Memory() + if err != nil { + slog.Error("failed to get memory info", "error", err) + os.Exit(1) + } + + // Get available memory in MB + totalMemoryMB := memInfo.Available / (1024 * 1024) + + // Ensure we have at least one worker + workers := int(totalMemoryMB / workerMemoryMB) + if workers < 1 { + workers = 1 + } + + // Limit the number of workers to the number of CPUs + cpuCount := runtime.NumCPU() + if workers > cpuCount { + workers = max(cpuCount, 1) + } + + return workers } -func (r *RunnerSingle) worker(wid int) { - // TODO: reimplement this later - /* - var job common.AnalyzeJob +func StartAndMonitorWorkers(ctx context.Context, + artifacts artifactstore.Store, + databases qldbstore.Store, + queue queue.Queue, + desiredWorkerCount int, + wg *sync.WaitGroup) { - for { - job = <-r.queue.Jobs() - - slog.Debug("Picking up job", "job", job, "worker", wid) - - slog.Debug("Analysis: running", "job", job) - storage.SetStatus(job.QueryPackId, job.NWO, common.StatusQueued) - - resultFile, err := RunAnalysis(job) - if err != nil { - continue - } - - slog.Debug("Analysis run finished", "job", job) - - // TODO: FIX THIS - res := common.AnalyzeResult{ - RunAnalysisSARIF: resultFile, - RunAnalysisBQRS: "", // FIXME ? - } - r.queue.Results() <- res - storage.SetStatus(job.QueryPackId, job.NWO, common.StatusSuccess) - storage.SetResult(job.QueryPackId, job.NWO, res) + currentWorkerCount := 0 + stopChans := make([]chan struct{}, 0) + if desiredWorkerCount != 0 { + slog.Info("Starting workers", slog.Int("count", desiredWorkerCount)) + for i := 0; i < desiredWorkerCount; i++ { + stopChan := make(chan struct{}) + stopChans = append(stopChans, stopChan) + wg.Add(1) + go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) } - */ + return + } + + slog.Info("Worker count not specified, managing based on available memory and CPU") + + for { + select { + case <-ctx.Done(): + // signal all workers to stop + for _, stopChan := range stopChans { + close(stopChan) + } + return + default: + newWorkerCount := calculateWorkers() + + if newWorkerCount != currentWorkerCount { + slog.Info( + "Modifying worker count", + slog.Int("current", currentWorkerCount), + slog.Int("new", newWorkerCount)) + } + + if newWorkerCount > currentWorkerCount { + for i := currentWorkerCount; i < newWorkerCount; i++ { + stopChan := make(chan struct{}) + stopChans = append(stopChans, stopChan) + wg.Add(1) + go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) + } + } else if newWorkerCount < currentWorkerCount { + for i := newWorkerCount; i < currentWorkerCount; i++ { + close(stopChans[i]) + } + stopChans = stopChans[:newWorkerCount] + } + currentWorkerCount = newWorkerCount + + time.Sleep(monitorIntervalSec * time.Second) + } + } } // RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult -func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) { - var result = common.AnalyzeResult{ - RequestId: job.RequestId, - ResultCount: 0, - ResultArchiveURL: "", - Status: common.StatusError, +func RunAnalysisJob( + job queue.AnalyzeJob, artifacts artifactstore.Store, dbs qldbstore.Store) (queue.AnalyzeResult, error) { + var result = queue.AnalyzeResult{ + Spec: job.Spec, + ResultCount: 0, + ResultLocation: artifactstore.ArtifactLocation{}, + Status: common.StatusError, } // Create a temporary directory tempDir := filepath.Join(os.TempDir(), uuid.New().String()) - if err := os.MkdirAll(tempDir, 0755); err != nil { + if err := os.MkdirAll(tempDir, 0600); err != nil { return result, fmt.Errorf("failed to create temporary directory: %v", err) } defer os.RemoveAll(tempDir) - // Extract the query pack - // TODO: download from the 'job' query pack URL - // utils.downloadFile - queryPackPath := filepath.Join(tempDir, "qp-54674") - utils.UntarGz("qp-54674.tgz", queryPackPath) + // Download the query pack as a byte slice + queryPackData, err := artifacts.GetQueryPack(job.QueryPackLocation) + if err != nil { + return result, fmt.Errorf("failed to download query pack: %w", err) + } + + // Write the query pack data to the filesystem + queryPackArchivePath := filepath.Join(tempDir, "query-pack.tar.gz") + if err := os.WriteFile(queryPackArchivePath, queryPackData, 0600); err != nil { + return result, fmt.Errorf("failed to write query pack archive to disk: %w", err) + } + + // Make a directory and extract the query pack + queryPackPath := filepath.Join(tempDir, "pack") + if err := os.Mkdir(queryPackPath, 0600); err != nil { + return result, fmt.Errorf("failed to create query pack directory: %w", err) + } + if err := utils.UntarGz(queryPackArchivePath, queryPackPath); err != nil { + return result, fmt.Errorf("failed to extract query pack: %w", err) + } + + // Download the CodeQL database as a byte slice + location, err := dbs.GetDatabaseLocationByNWO(job.Spec.NameWithOwner) + if err != nil { + return result, fmt.Errorf("failed to get database location: %w", err) + } + + databaseData, err := dbs.GetDatabase(location) + if err != nil { + return result, fmt.Errorf("failed to get database: %w", err) + } + + // Write the CodeQL database data to the filesystem + databasePath := filepath.Join(tempDir, "database.zip") + if err := os.WriteFile(databasePath, databaseData, 0600); err != nil { + return result, fmt.Errorf("failed to write CodeQL database to disk: %w", err) + } // Perform the CodeQL analysis - runResult, err := codeql.RunQuery("google_flatbuffers_db.zip", "cpp", queryPackPath, tempDir) + runResult, err := codeql.RunQuery(databasePath, job.QueryLanguage, queryPackPath, tempDir) if err != nil { return result, fmt.Errorf("failed to run analysis: %w", err) } @@ -107,21 +213,32 @@ func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) { return result, fmt.Errorf("failed to generate results archive: %w", err) } - // TODO: Upload the archive to storage + // Upload the archive to storage slog.Debug("Results archive size", slog.Int("size", len(resultsArchive))) + resultsLocation, err := artifacts.SaveResult(job.Spec, resultsArchive) + if err != nil { + return result, fmt.Errorf("failed to save results archive: %w", err) + } - result = common.AnalyzeResult{ - RequestId: job.RequestId, - ResultCount: runResult.ResultCount, - ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE", // TODO - Status: common.StatusSuccess, + result = queue.AnalyzeResult{ + Spec: job.Spec, + ResultCount: runResult.ResultCount, + ResultLocation: resultsLocation, + Status: common.StatusSuccess, + SourceLocationPrefix: runResult.SourceLocationPrefix, + DatabaseSHA: runResult.DatabaseSHA, } return result, nil } // RunWorker runs a worker that processes jobs from queue -func RunWorker(ctx context.Context, stopChan chan struct{}, queue queue.Queue, wg *sync.WaitGroup) { +func RunWorker(ctx context.Context, + artifacts artifactstore.Store, + databases qldbstore.Store, + queue queue.Queue, + stopChan chan struct{}, + wg *sync.WaitGroup) { const ( WORKER_COUNT_STOP_MESSAGE = "Worker stopping due to reduction in worker count" WORKER_CONTEXT_STOP_MESSAGE = "Worker stopping due to context cancellation" @@ -144,7 +261,7 @@ func RunWorker(ctx context.Context, stopChan chan struct{}, queue queue.Queue, w return } slog.Info("Running analysis job", slog.Any("job", job)) - result, err := RunAnalysisJob(job) + result, err := RunAnalysisJob(job, artifacts, databases) if err != nil { slog.Error("Failed to run analysis job", slog.Any("error", err)) continue diff --git a/pkg/agent/interfaces.go b/pkg/agent/interfaces.go index 3a21fb3..b10987b 100644 --- a/pkg/agent/interfaces.go +++ b/pkg/agent/interfaces.go @@ -1,4 +1,13 @@ package agent -type Runner interface { +import ( + "mrvacommander/pkg/artifactstore" + "mrvacommander/pkg/qldbstore" + "mrvacommander/pkg/queue" +) + +type Visibles struct { + Queue queue.Queue + Artifacts artifactstore.Store + CodeQLDBStore qldbstore.Store } diff --git a/pkg/artifactstore/common.go b/pkg/artifactstore/common.go new file mode 100644 index 0000000..6b96398 --- /dev/null +++ b/pkg/artifactstore/common.go @@ -0,0 +1,23 @@ +package artifactstore + +import ( + "fmt" + "mrvacommander/pkg/common" +) + +type ArtifactLocation struct { + // Data is a map of key-value pairs that describe the location of the artifact. + // For example, a simple key-value pair could be "path" -> "/path/to/artifact.tgz". + // Alternatively, a more complex example could be "bucket" -> "example", "key" -> "UNIQUE_ARTIFACT_IDENTIFIER". + Data map[string]string `json:"data"` +} + +// deriveKeyFromSessionId generates a key for a query pack based on the job ID +func deriveKeyFromSessionId(sessionId int) string { + return fmt.Sprintf("%d", sessionId) +} + +// deriveKeyFromJobSpec generates a key for a result based on the JobSpec +func deriveKeyFromJobSpec(jobSpec common.JobSpec) string { + return fmt.Sprintf("%d-%s", jobSpec.SessionID, jobSpec.NameWithOwner) +} diff --git a/pkg/artifactstore/interfaces.go b/pkg/artifactstore/interfaces.go new file mode 100644 index 0000000..a4aea26 --- /dev/null +++ b/pkg/artifactstore/interfaces.go @@ -0,0 +1,20 @@ +package artifactstore + +import "mrvacommander/pkg/common" + +type Store interface { + // GetQueryPack retrieves the query pack from the specified location. + GetQueryPack(location ArtifactLocation) ([]byte, error) + + // SaveQueryPack saves the query pack using the session ID and returns the artifact location. + SaveQueryPack(sessionId int, data []byte) (ArtifactLocation, error) + + // GetResult retrieves the result from the specified location. + GetResult(location ArtifactLocation) ([]byte, error) + + // GetResultSize retrieves the size of the result from the specified location. + GetResultSize(location ArtifactLocation) (int, error) + + // SaveResult saves the result using the JobSpec and returns the artifact location. + SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error) +} diff --git a/pkg/artifactstore/store_memory.go b/pkg/artifactstore/store_memory.go new file mode 100644 index 0000000..9d4acc0 --- /dev/null +++ b/pkg/artifactstore/store_memory.go @@ -0,0 +1,94 @@ +package artifactstore + +import ( + "fmt" + "mrvacommander/pkg/common" + "sync" +) + +// InMemoryArtifactStore is an in-memory implementation of the ArtifactStore interface +type InMemoryArtifactStore struct { + mu sync.Mutex + packs map[string][]byte + results map[string][]byte +} + +func NewInMemoryArtifactStore() *InMemoryArtifactStore { + return &InMemoryArtifactStore{ + packs: make(map[string][]byte), + results: make(map[string][]byte), + } +} + +// GetQueryPack retrieves the query pack from the specified location +func (store *InMemoryArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) { + store.mu.Lock() + defer store.mu.Unlock() + + key := location.Data["key"] + data, exists := store.packs[key] + if !exists { + return nil, fmt.Errorf("query pack not found: %s", key) + } + return data, nil +} + +// SaveQueryPack saves the query pack using the session ID and returns the artifact location +func (store *InMemoryArtifactStore) SaveQueryPack(sessionId int, data []byte) (ArtifactLocation, error) { + store.mu.Lock() + defer store.mu.Unlock() + + key := deriveKeyFromSessionId(sessionId) + store.packs[key] = data + + location := ArtifactLocation{ + Data: map[string]string{ + "bucket": "packs", + "key": key, + }, + } + return location, nil +} + +// GetResult retrieves the result from the specified location +func (store *InMemoryArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) { + store.mu.Lock() + defer store.mu.Unlock() + + key := location.Data["key"] + data, exists := store.results[key] + if !exists { + return nil, fmt.Errorf("result not found: %s", key) + } + return data, nil +} + +// GetResultSize retrieves the size of the result from the specified location +func (store *InMemoryArtifactStore) GetResultSize(location ArtifactLocation) (int, error) { + store.mu.Lock() + defer store.mu.Unlock() + + key := location.Data["key"] + data, exists := store.results[key] + if !exists { + return 0, fmt.Errorf("result not found: %s", key) + } + return len(data), nil +} + +// SaveResult saves the result using the JobSpec and returns the artifact location +func (store *InMemoryArtifactStore) SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error) { + store.mu.Lock() + defer store.mu.Unlock() + + key := deriveKeyFromJobSpec(jobSpec) + store.results[key] = data + + location := ArtifactLocation{ + Data: map[string]string{ + "bucket": "results", + "key": key, + }, + } + return location, nil +} diff --git a/pkg/artifactstore/store_minio.go b/pkg/artifactstore/store_minio.go new file mode 100644 index 0000000..3730d17 --- /dev/null +++ b/pkg/artifactstore/store_minio.go @@ -0,0 +1,117 @@ +package artifactstore + +import ( + "bytes" + "context" + "fmt" + "io" + "log/slog" + "math" + "mrvacommander/pkg/common" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + RESULTS_BUCKET_NAME = "results" + PACKS_BUCKET_NAME = "packs" +) + +type MinIOArtifactStore struct { + client *minio.Client +} + +func NewMinIOArtifactStore(endpoint, id, secret string) (*MinIOArtifactStore, error) { + minioClient, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(id, secret, ""), + Secure: false, + }) + if err != nil { + return nil, err + } + + slog.Info("Connected to MinIO artifact store server") + + // Create "results" bucket + if err := common.CreateMinIOBucketIfNotExists(minioClient, RESULTS_BUCKET_NAME); err != nil { + return nil, fmt.Errorf("could not create results bucket: %v", err) + } + + // Create "packs" bucket + if err := common.CreateMinIOBucketIfNotExists(minioClient, PACKS_BUCKET_NAME); err != nil { + return nil, fmt.Errorf("could not create packs bucket: %v", err) + } + + return &MinIOArtifactStore{ + client: minioClient, + }, nil +} + +func (store *MinIOArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) { + return store.getArtifact(location) +} + +func (store *MinIOArtifactStore) SaveQueryPack(jobId int, data []byte) (ArtifactLocation, error) { + return store.saveArtifact(PACKS_BUCKET_NAME, deriveKeyFromSessionId(jobId), data, "application/gzip") +} + +func (store *MinIOArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) { + return store.getArtifact(location) +} + +func (store *MinIOArtifactStore) GetResultSize(location ArtifactLocation) (int, error) { + bucket := location.Data["bucket"] + key := location.Data["key"] + + objectInfo, err := store.client.StatObject(context.Background(), bucket, key, minio.StatObjectOptions{}) + if err != nil { + return 0, err + } + + if objectInfo.Size > math.MaxInt32 { + return 0, fmt.Errorf("object size %d exceeds max int size", objectInfo.Size) + } + + return int(objectInfo.Size), nil +} + +func (store *MinIOArtifactStore) SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error) { + return store.saveArtifact(RESULTS_BUCKET_NAME, deriveKeyFromJobSpec(jobSpec), data, "application/zip") +} + +func (store *MinIOArtifactStore) getArtifact(location ArtifactLocation) ([]byte, error) { + bucket := location.Data["bucket"] + key := location.Data["key"] + + object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + defer object.Close() + + data, err := io.ReadAll(object) + if err != nil { + return nil, err + } + + return data, nil +} + +func (store *MinIOArtifactStore) saveArtifact(bucket, key string, data []byte, contentType string) (ArtifactLocation, error) { + _, err := store.client.PutObject(context.Background(), bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ + ContentType: contentType, + }) + if err != nil { + return ArtifactLocation{}, err + } + + location := ArtifactLocation{ + Data: map[string]string{ + "bucket": bucket, + "key": key, + }, + } + + return location, nil +} diff --git a/pkg/common/extapi.go b/pkg/common/extapi.go index 0190484..e25d863 100644 --- a/pkg/common/extapi.go +++ b/pkg/common/extapi.go @@ -1,10 +1,9 @@ package common type JobInfo struct { - QueryLanguage string - CreatedAt string - UpdatedAt string - + QueryLanguage string + CreatedAt string + UpdatedAt string SkippedRepositories SkippedRepositories } @@ -16,8 +15,8 @@ type SkippedRepositories struct { } type AccessMismatchRepos struct { - RepositoryCount int `json:"repository_count"` - Repositories []string `json:"repositories"` + RepositoryCount int `json:"repository_count"` + Repositories []Repository `json:"repositories"` } type NotFoundRepos struct { @@ -26,13 +25,13 @@ type NotFoundRepos struct { } type NoCodeqlDBRepos struct { - RepositoryCount int `json:"repository_count"` - Repositories []string `json:"repositories"` + RepositoryCount int `json:"repository_count"` + Repositories []Repository `json:"repositories"` } type OverLimitRepos struct { - RepositoryCount int `json:"repository_count"` - Repositories []string `json:"repositories"` + RepositoryCount int `json:"repository_count"` + Repositories []Repository `json:"repositories"` } type StatusResponse struct { diff --git a/pkg/common/interfaces.go b/pkg/common/interfaces.go deleted file mode 100644 index 98b3017..0000000 --- a/pkg/common/interfaces.go +++ /dev/null @@ -1,4 +0,0 @@ -package common - -type Common interface { -} diff --git a/pkg/common/jobspec.go b/pkg/common/jobspec.go new file mode 100644 index 0000000..1464b89 --- /dev/null +++ b/pkg/common/jobspec.go @@ -0,0 +1,29 @@ +package common + +import ( + "encoding/base64" + "encoding/json" +) + +// EncodeJobSpec encodes a JobSpec into a base64-encoded string. +func EncodeJobSpec(jobSpec JobSpec) (string, error) { + data, err := json.Marshal(jobSpec) + if err != nil { + return "", err + } + return base64.URLEncoding.EncodeToString(data), nil +} + +// DecodeJobSpec decodes a base64-encoded string into a JobSpec. +func DecodeJobSpec(encoded string) (JobSpec, error) { + data, err := base64.URLEncoding.DecodeString(encoded) + if err != nil { + return JobSpec{}, err + } + var jobSpec JobSpec + err = json.Unmarshal(data, &jobSpec) + if err != nil { + return JobSpec{}, err + } + return jobSpec, nil +} diff --git a/pkg/common/minio.go b/pkg/common/minio.go new file mode 100644 index 0000000..ae7a154 --- /dev/null +++ b/pkg/common/minio.go @@ -0,0 +1,38 @@ +package common + +import ( + "context" + "log/slog" + + "github.com/minio/minio-go/v7" +) + +func CreateMinIOBucketIfNotExists(client *minio.Client, bucketName string) error { + ctx := context.Background() + + exists, err := client.BucketExists(ctx, bucketName) + if err != nil { + return err + } + + if !exists { + slog.Info("Creating bucket", "name", bucketName) + err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}) + if err != nil { + // The bucket might already exist at this stage if another component created it concurrently. + // For example, the server might have attempted to create it at the same time as the agent. + if err.(minio.ErrorResponse).Code == "BucketAlreadyOwnedByYou" { + slog.Info("Failed to create bucket because it already exists", "name", bucketName) + return nil + } else { + return err + } + } else { + slog.Info("Bucket created successfully", "name", bucketName) + } + } else { + slog.Info("Bucket already exists", "name", bucketName) + } + + return nil +} diff --git a/pkg/common/types.go b/pkg/common/types.go index 8c8955b..104afe4 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -6,26 +6,6 @@ type NameWithOwner struct { Repo string } -// AnalyzeJob represents a job specifying a repository and a query pack to analyze it with. -// This is the message format that the agent receives from the queue. -type AnalyzeJob struct { - RequestId int // json:"request_id" - QueryPackId int // json:"query_pack_id" - QueryPackURL string // json:"query_pack_url" - QueryLanguage string // json:"query_language" - NWO NameWithOwner // json:"nwo" -} - -// AnalyzeResult represents the result of an analysis job. -// This is the message format that the agent sends to the queue. -// Status will only ever be StatusSuccess or StatusError when sent in a result. -type AnalyzeResult struct { - Status Status // json:"status" - RequestId int // json:"request_id" - ResultCount int // json:"result_count" - ResultArchiveURL string // json:"result_archive_url" -} - // Status represents the status of a job. type Status int @@ -55,6 +35,6 @@ func (s Status) ToExternalString() string { } type JobSpec struct { - JobID int + SessionID int NameWithOwner } diff --git a/pkg/deploy/init.go b/pkg/deploy/init.go new file mode 100644 index 0000000..3834935 --- /dev/null +++ b/pkg/deploy/init.go @@ -0,0 +1,96 @@ +package deploy + +import ( + "fmt" + "log" + "log/slog" + "mrvacommander/pkg/artifactstore" + "mrvacommander/pkg/qldbstore" + "mrvacommander/pkg/queue" + "os" + "strconv" +) + +func validateEnvVars(requiredEnvVars []string) { + missing := false + + for _, envVar := range requiredEnvVars { + if _, ok := os.LookupEnv(envVar); !ok { + slog.Error("Missing required environment variable", "key", envVar) + missing = true + } + } + + if missing { + os.Exit(1) + } +} + +func InitRabbitMQ(isAgent bool) (queue.Queue, error) { + requiredEnvVars := []string{ + "MRVA_RABBITMQ_HOST", + "MRVA_RABBITMQ_PORT", + "MRVA_RABBITMQ_USER", + "MRVA_RABBITMQ_PASSWORD", + } + validateEnvVars(requiredEnvVars) + + rmqHost := os.Getenv("MRVA_RABBITMQ_HOST") + rmqPort := os.Getenv("MRVA_RABBITMQ_PORT") + rmqUser := os.Getenv("MRVA_RABBITMQ_USER") + rmqPass := os.Getenv("MRVA_RABBITMQ_PASSWORD") + + rmqPortAsInt, err := strconv.ParseInt(rmqPort, 10, 16) + if err != nil { + return nil, fmt.Errorf("failed to parse RabbitMQ port: %v", err) + } + + log.Println("Initializing RabbitMQ queue") + + rabbitMQQueue, err := queue.NewRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, isAgent) + if err != nil { + return nil, fmt.Errorf("failed to initialize RabbitMQ: %v", err) + } + + return rabbitMQQueue, nil +} + +func InitMinIOArtifactStore() (artifactstore.Store, error) { + requiredEnvVars := []string{ + "ARTIFACT_MINIO_ENDPOINT", + "ARTIFACT_MINIO_ID", + "ARTIFACT_MINIO_SECRET", + } + validateEnvVars(requiredEnvVars) + + endpoint := os.Getenv("ARTIFACT_MINIO_ENDPOINT") + id := os.Getenv("ARTIFACT_MINIO_ID") + secret := os.Getenv("ARTIFACT_MINIO_SECRET") + + store, err := artifactstore.NewMinIOArtifactStore(endpoint, id, secret) + if err != nil { + return nil, fmt.Errorf("failed to initialize artifact store: %v", err) + } + + return store, nil +} + +func InitMinIOCodeQLDatabaseStore() (qldbstore.Store, error) { + requiredEnvVars := []string{ + "QLDB_MINIO_ENDPOINT", + "QLDB_MINIO_ID", + "QLDB_MINIO_SECRET", + } + validateEnvVars(requiredEnvVars) + + endpoint := os.Getenv("QLDB_MINIO_ENDPOINT") + id := os.Getenv("QLDB_MINIO_ID") + secret := os.Getenv("QLDB_MINIO_SECRET") + + store, err := qldbstore.NewMinIOCodeQLDatabaseStore(endpoint, id, secret) + if err != nil { + return nil, fmt.Errorf("failed to initialize ql database storage: %v", err) + } + + return store, nil +} diff --git a/pkg/logger/interfaces.go b/pkg/logger/interfaces.go deleted file mode 100644 index 050dead..0000000 --- a/pkg/logger/interfaces.go +++ /dev/null @@ -1,4 +0,0 @@ -package logger - -type Logger interface { -} diff --git a/pkg/logger/types.go b/pkg/logger/types.go deleted file mode 100644 index 2980aef..0000000 --- a/pkg/logger/types.go +++ /dev/null @@ -1,14 +0,0 @@ -package logger - -type LoggerSingle struct { - modules *Visibles -} - -func NewLoggerSingle(v *Visibles) *LoggerSingle { - l := LoggerSingle{} - - l.modules = v - return &l -} - -type Visibles struct{} diff --git a/pkg/qldbstore/interfaces.go b/pkg/qldbstore/interfaces.go index 6cbdbf5..37501a8 100644 --- a/pkg/qldbstore/interfaces.go +++ b/pkg/qldbstore/interfaces.go @@ -4,29 +4,25 @@ import ( "mrvacommander/pkg/common" ) -type DBLocation struct { - Prefix string - File string +type CodeQLDatabaseLocation struct { + // `data` is a map of key-value pairs that describe the location of the database. + // For example, a simple key-value pair could be "path" -> "/path/to/database.zip". + // A more complex implementation could be "bucket" -> "example", "key" -> "unique_identifier". + data map[string]string } -type Storage interface { - FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner, - analysisRepos *map[common.NameWithOwner]DBLocation) -} - -type Visibles struct{} - -type StorageQLDB struct{} - -func NewStore(v *Visibles) (Storage, error) { - s := StorageQLDB{} - - return &s, nil -} - -func (s *StorageQLDB) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) ( - not_found_repos []common.NameWithOwner, - analysisRepos *map[common.NameWithOwner]DBLocation) { - // TODO implement - return nil, nil +type Store interface { + // FindAvailableDBs returns a map of available databases for the requested analysisReposRequested. + // It also returns a list of repository NWOs that do not have available databases. + FindAvailableDBs(analysisReposRequested []common.NameWithOwner) ( + notFoundRepos []common.NameWithOwner, + foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) + + // GetDatabase returns the database as a byte slice for the specified repository. + // A CodeQL database is a zip archive to be processed by the CodeQL CLI. + GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) + + // GetDatabaseByNWO returns the database location for the specified repository. + // FindAvailableDBs should be used in lieu of this method for checking database availability. + GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) } diff --git a/pkg/qldbstore/qldbstore_local.go b/pkg/qldbstore/qldbstore_local.go new file mode 100644 index 0000000..ec24bb3 --- /dev/null +++ b/pkg/qldbstore/qldbstore_local.go @@ -0,0 +1,66 @@ +package qldbstore + +import ( + "fmt" + "mrvacommander/pkg/common" + "os" + "path/filepath" +) + +type FilesystemCodeQLDatabaseStore struct { + basePath string +} + +func NewLocalFilesystemCodeQLDatabaseStore(basePath string) *FilesystemCodeQLDatabaseStore { + return &FilesystemCodeQLDatabaseStore{ + basePath: basePath, + } +} + +func (store *FilesystemCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) ( + notFoundRepos []common.NameWithOwner, + foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) { + + foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation) + for _, repo := range analysisReposRequested { + location, err := store.GetDatabaseLocationByNWO(repo) + if err != nil { + notFoundRepos = append(notFoundRepos, repo) + } else { + foundReposMap[repo] = location + } + } + + return notFoundRepos, &foundReposMap +} + +func (store *FilesystemCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) { + path, exists := location.data["path"] + if !exists { + return nil, fmt.Errorf("path not specified in location") + } + + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + return data, nil +} + +func (store *FilesystemCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) { + filePath := filepath.Join(store.basePath, fmt.Sprintf("%s/%s/%s_%s_db.zip", nwo.Owner, nwo.Repo, nwo.Owner, nwo.Repo)) + + // Check if the file exists + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo) + } + + location := CodeQLDatabaseLocation{ + data: map[string]string{ + "path": filePath, + }, + } + + return location, nil +} diff --git a/pkg/qldbstore/qldbstore_minio.go b/pkg/qldbstore/qldbstore_minio.go new file mode 100644 index 0000000..b1e0166 --- /dev/null +++ b/pkg/qldbstore/qldbstore_minio.go @@ -0,0 +1,98 @@ +package qldbstore + +import ( + "context" + "fmt" + "io" + "log/slog" + "mrvacommander/pkg/common" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const bucketName = "qldb" + +type MinIOCodeQLDatabaseStore struct { + client *minio.Client + bucketName string +} + +func NewMinIOCodeQLDatabaseStore(endpoint, id, secret string) (*MinIOCodeQLDatabaseStore, error) { + minioClient, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(id, secret, ""), + Secure: false, + }) + if err != nil { + return nil, err + } + + slog.Info("Connected to MinIO CodeQL database store server") + + err = common.CreateMinIOBucketIfNotExists(minioClient, bucketName) + if err != nil { + return nil, fmt.Errorf("could not create bucket: %v", err) + } + + return &MinIOCodeQLDatabaseStore{ + client: minioClient, + bucketName: bucketName, + }, nil +} + +func (store *MinIOCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) ( + notFoundRepos []common.NameWithOwner, + foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) { + + foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation) + for _, repo := range analysisReposRequested { + location, err := store.GetDatabaseLocationByNWO(repo) + if err != nil { + notFoundRepos = append(notFoundRepos, repo) + } else { + foundReposMap[repo] = location + } + } + + return notFoundRepos, &foundReposMap +} + +func (store *MinIOCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) { + bucket := location.data["bucket"] + key := location.data["key"] + + object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + defer object.Close() + + data, err := io.ReadAll(object) + if err != nil { + return nil, err + } + + return data, nil +} + +func (store *MinIOCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) { + objectName := fmt.Sprintf("%s$%s.zip", nwo.Owner, nwo.Repo) + + // Check if the object exists + _, err := store.client.StatObject(context.Background(), store.bucketName, objectName, minio.StatObjectOptions{}) + if err != nil { + if minio.ToErrorResponse(err).Code == "NoSuchKey" { + return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo) + } + return CodeQLDatabaseLocation{}, err + } + + location := CodeQLDatabaseLocation{ + data: map[string]string{ + "bucket": store.bucketName, + "key": objectName, + }, + } + + return location, nil +} diff --git a/pkg/qpstore/interfaces.go b/pkg/qpstore/interfaces.go deleted file mode 100644 index ae4cd8f..0000000 --- a/pkg/qpstore/interfaces.go +++ /dev/null @@ -1,5 +0,0 @@ -package qpstore - -type Storage interface { - SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error) -} diff --git a/pkg/qpstore/qpstore.go b/pkg/qpstore/qpstore.go deleted file mode 100644 index f525786..0000000 --- a/pkg/qpstore/qpstore.go +++ /dev/null @@ -1,16 +0,0 @@ -package qpstore - -type Visibles struct{} - -type StorageQP struct{} - -func NewStore(v *Visibles) (Storage, error) { - s := StorageQP{} - - return &s, nil -} - -func (s *StorageQP) SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error) { - // TODO implement - return "", nil -} diff --git a/pkg/queue/interfaces.go b/pkg/queue/interfaces.go index c978878..9412e31 100644 --- a/pkg/queue/interfaces.go +++ b/pkg/queue/interfaces.go @@ -1,14 +1,7 @@ package queue -import ( - "mrvacommander/pkg/common" - "mrvacommander/pkg/storage" -) - type Queue interface { - Jobs() chan common.AnalyzeJob - Results() chan common.AnalyzeResult - StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, - session_id int, - session_language string) + Jobs() chan AnalyzeJob + Results() chan AnalyzeResult + Close() } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go deleted file mode 100644 index 424c4d1..0000000 --- a/pkg/queue/queue.go +++ /dev/null @@ -1,31 +0,0 @@ -package queue - -import ( - "log/slog" - "mrvacommander/pkg/common" - "mrvacommander/pkg/storage" -) - -func (q *QueueSingle) Jobs() chan common.AnalyzeJob { - return q.jobs -} - -func (q *QueueSingle) Results() chan common.AnalyzeResult { - return q.results -} - -func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, - session_language string) { - slog.Debug("Queueing codeql database analyze jobs") - - for nwo := range *analysis_repos { - info := common.AnalyzeJob{ - QueryPackId: session_id, - QueryLanguage: session_language, - NWO: nwo, - } - q.jobs <- info - storage.SetStatus(session_id, nwo, common.StatusQueued) - storage.AddJob(session_id, info) - } -} diff --git a/pkg/queue/rabbitmq.go b/pkg/queue/queue_rabbitmq.go similarity index 85% rename from pkg/queue/rabbitmq.go rename to pkg/queue/queue_rabbitmq.go index 23716e9..f4fe720 100644 --- a/pkg/queue/rabbitmq.go +++ b/pkg/queue/queue_rabbitmq.go @@ -1,13 +1,9 @@ package queue import ( - "mrvacommander/pkg/common" - "mrvacommander/pkg/storage" - "context" "encoding/json" "fmt" - "log" "time" amqp "github.com/rabbitmq/amqp091-go" @@ -15,20 +11,20 @@ import ( ) type RabbitMQQueue struct { - jobs chan common.AnalyzeJob - results chan common.AnalyzeResult + jobs chan AnalyzeJob + results chan AnalyzeResult conn *amqp.Connection channel *amqp.Channel } -// InitializeRabbitMQQueue initializes a RabbitMQ queue. +// NewRabbitMQQueue initializes a RabbitMQ queue. // It returns a pointer to a RabbitMQQueue and an error. // // If isAgent is true, the queue is initialized to be used by an agent. // Otherwise, the queue is initialized to be used by the server. // The difference in behaviour is that the agent consumes jobs and publishes results, // while the server publishes jobs and consumes results. -func InitializeRabbitMQQueue( +func NewRabbitMQQueue( host string, port int16, user string, @@ -94,8 +90,8 @@ func InitializeRabbitMQQueue( result := RabbitMQQueue{ conn: conn, channel: ch, - jobs: make(chan common.AnalyzeJob), - results: make(chan common.AnalyzeResult), + jobs: make(chan AnalyzeJob), + results: make(chan AnalyzeResult), } if isAgent { @@ -115,19 +111,14 @@ func InitializeRabbitMQQueue( return &result, nil } -func (q *RabbitMQQueue) Jobs() chan common.AnalyzeJob { +func (q *RabbitMQQueue) Jobs() chan AnalyzeJob { return q.jobs } -func (q *RabbitMQQueue) Results() chan common.AnalyzeResult { +func (q *RabbitMQQueue) Results() chan AnalyzeResult { return q.results } -func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) { - // TODO: Implement - log.Fatal("unimplemented") -} - func (q *RabbitMQQueue) Close() { q.channel.Close() q.conn.Close() @@ -140,7 +131,7 @@ func (q *RabbitMQQueue) ConsumeJobs(queueName string) { } for msg := range msgs { - job := common.AnalyzeJob{} + job := AnalyzeJob{} err := json.Unmarshal(msg.Body, &job) if err != nil { slog.Error("failed to unmarshal job", slog.Any("error", err)) @@ -157,7 +148,7 @@ func (q *RabbitMQQueue) PublishResults(queueName string) { } } -func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeResult) { +func (q *RabbitMQQueue) publishResult(queueName string, result AnalyzeResult) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -178,7 +169,7 @@ func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeRes } } -func (q *RabbitMQQueue) publishJob(queueName string, job common.AnalyzeJob) { +func (q *RabbitMQQueue) publishJob(queueName string, job AnalyzeJob) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -212,7 +203,7 @@ func (q *RabbitMQQueue) ConsumeResults(queueName string) { } for msg := range msgs { - result := common.AnalyzeResult{} + result := AnalyzeResult{} err := json.Unmarshal(msg.Body, &result) if err != nil { slog.Error("failed to unmarshal result", slog.Any("error", err)) diff --git a/pkg/queue/queue_single.go b/pkg/queue/queue_single.go new file mode 100644 index 0000000..c7a029e --- /dev/null +++ b/pkg/queue/queue_single.go @@ -0,0 +1,29 @@ +package queue + +type QueueSingle struct { + NumWorkers int + jobs chan AnalyzeJob + results chan AnalyzeResult +} + +func NewQueueSingle(numWorkers int) Queue { + q := QueueSingle{ + NumWorkers: numWorkers, + jobs: make(chan AnalyzeJob, 10), + results: make(chan AnalyzeResult, 10), + } + return q +} + +func (q QueueSingle) Jobs() chan AnalyzeJob { + return q.jobs +} + +func (q QueueSingle) Results() chan AnalyzeResult { + return q.results +} + +func (q QueueSingle) Close() { + close(q.jobs) + close(q.results) +} diff --git a/pkg/queue/types.go b/pkg/queue/types.go index cb7ebef..534ec6d 100644 --- a/pkg/queue/types.go +++ b/pkg/queue/types.go @@ -1,28 +1,28 @@ package queue import ( + "mrvacommander/pkg/artifactstore" "mrvacommander/pkg/common" - "mrvacommander/pkg/logger" ) -type QueueSingle struct { - NumWorkers int - jobs chan common.AnalyzeJob - results chan common.AnalyzeResult - modules *Visibles +// AnalyzeJob represents a job specifying a repository and a query pack to analyze it with. +// This is the message format that the agent receives from the queue. +// TODO: make query_pack_location query_pack_url with a presigned URL +type AnalyzeJob struct { + Spec common.JobSpec // json:"job_spec" + QueryPackLocation artifactstore.ArtifactLocation // json:"query_pack_location" + QueryLanguage string // json:"query_language" } -type Visibles struct { - Logger logger.Logger -} - -func NewQueueSingle(numWorkers int, v *Visibles) *QueueSingle { - q := QueueSingle{} - q.jobs = make(chan common.AnalyzeJob, 10) - q.results = make(chan common.AnalyzeResult, 10) - q.NumWorkers = numWorkers - - q.modules = v - - return &q +// AnalyzeResult represents the result of an analysis job. +// This is the message format that the agent sends to the queue. +// Status will only ever be StatusSuccess or StatusError when sent in a result. +// TODO: make result_location result_archive_url with a presigned URL +type AnalyzeResult struct { + Spec common.JobSpec // json:"job_spec" + Status common.Status // json:"status" + ResultCount int // json:"result_count" + ResultLocation artifactstore.ArtifactLocation // json:"result_location" + SourceLocationPrefix string // json:"source_location_prefix" + DatabaseSHA string // json:"database_sha" } diff --git a/pkg/server/interfaces.go b/pkg/server/interfaces.go index 42f75ea..2438f11 100644 --- a/pkg/server/interfaces.go +++ b/pkg/server/interfaces.go @@ -2,13 +2,13 @@ package server import "net/http" -type Commander interface{} - type CommanderAPI interface { MRVARequestID(w http.ResponseWriter, r *http.Request) MRVARequest(w http.ResponseWriter, r *http.Request) RootHandler(w http.ResponseWriter, r *http.Request) + MRVAStatusID(w http.ResponseWriter, r *http.Request) MRVAStatus(w http.ResponseWriter, r *http.Request) + MRVADownloadArtifactID(w http.ResponseWriter, r *http.Request) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) MRVADownloadServe(w http.ResponseWriter, r *http.Request) } diff --git a/pkg/server/server.go b/pkg/server/server.go index fc11d90..d75bdf2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -7,84 +7,159 @@ import ( "errors" "fmt" "io" - "log" "log/slog" "net/http" - "path/filepath" + "os" "strconv" "strings" "time" + "mrvacommander/pkg/artifactstore" "mrvacommander/pkg/common" - "mrvacommander/pkg/storage" + "mrvacommander/pkg/qldbstore" + "mrvacommander/pkg/queue" + "mrvacommander/utils" "github.com/gorilla/mux" ) +func (c *CommanderSingle) startAnalyses( + analysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation, + queryPackLocation artifactstore.ArtifactLocation, + sessionId int, + queryLanguage string) { + + slog.Debug("Queueing analysis jobs", "count", len(*analysisRepos)) + + for nwo := range *analysisRepos { + jobSpec := common.JobSpec{ + SessionID: sessionId, + NameWithOwner: nwo, + } + info := queue.AnalyzeJob{ + Spec: jobSpec, + QueryPackLocation: queryPackLocation, + QueryLanguage: queryLanguage, + } + c.v.Queue.Jobs() <- info + c.v.State.SetStatus(jobSpec, common.StatusQueued) + c.v.State.AddJob(info) + } +} + func setupEndpoints(c CommanderAPI) { r := mux.NewRouter() - // - // First are the API endpoints that mirror those used in the github API - // - r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses", c.MRVARequest) - // /repos/hohn /mrva-controller/code-scanning/codeql/variant-analyses - // Or via - r.HandleFunc("/{repository_id}/code-scanning/codeql/variant-analyses", c.MRVARequestID) - + // Root handler r.HandleFunc("/", c.RootHandler) - // This is the standalone status request. - // It's also the first request made when downloading; the difference is on the - // client side's handling. + // Endpoints for submitting new analyses + r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses", c.MRVARequest) + r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses", c.MRVARequestID) + + // Endpoints for status requests + // This is also the first request made when downloading; the difference is in the client-side handling. r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}", c.MRVAStatus) + r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}", c.MRVAStatusID) + // Endpoints for downloading artifacts r.HandleFunc("/repos/{controller_owner}/{controller_repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}/repos/{repo_owner}/{repo_name}", c.MRVADownloadArtifact) + r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}/repositories/{repository_id}", c.MRVADownloadArtifactID) - // Not implemented: - // r.HandleFunc("/codeql-query-console/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}/{owner_id}/{controller_repo_id}", MRVADownLoad3) - // r.HandleFunc("/github-codeql-query-console-prod/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}", MRVADownLoad4) + // Endpoint to serve downloads using encoded JobSpec + r.HandleFunc("/download/{encoded_job_spec}", c.MRVADownloadServe) - // - // Now some support API endpoints - // - r.HandleFunc("/download-server/{local_path:.*}", c.MRVADownloadServe) + // Handler for unhandled endpoints + r.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slog.Error("Unhandled endpoint", "method", r.Method, "uri", r.RequestURI) + http.Error(w, "Not Found", http.StatusNotFound) + }) - // - // Bind to a port and pass our router in - // - // TODO make this a configuration entry - log.Fatal(http.ListenAndServe(":8080", r)) + go ListenAndServe(r) } -func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo, vaid int) { - slog.Debug("Submitting status response", "session", vaid) +func ListenAndServe(r *mux.Router) { + // Bind to a port and pass our router in + // The port is configurable via environment variable or default to 8080 + port := os.Getenv("SERVER_PORT") + if port == "" { + port = "8080" + } - all_scanned := []common.ScannedRepo{} - jobs := storage.GetJobList(js.JobID) - for _, job := range jobs { - astat := storage.GetStatus(js.JobID, job.NWO).ToExternalString() - all_scanned = append(all_scanned, + err := http.ListenAndServe(":"+port, r) + if err != nil { + slog.Error("Error starting server:", "error", err) + os.Exit(1) + } +} + +func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo) { + slog.Debug("Submitting status response", "job_id", js.SessionID) + + scannedRepos := []common.ScannedRepo{} + + jobs, err := c.v.State.GetJobList(js.SessionID) + if err != nil { + slog.Error("Error getting job list", "error", err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Loop through all jobs under the same session id + // TODO: as a high priority, fix this hacky job IDing by index + // this may break with other state implementations + for jobRepoId, job := range jobs { + // Get the job status + status, err := c.v.State.GetStatus(job.Spec) + if err != nil { + slog.Error("Error getting status", "error", err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Get the job result + result, err := c.v.State.GetResult(job.Spec) + if err != nil { + slog.Error("Error getting result", "error", err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Get the job result artifact size + artifactSize, err := c.v.Artifacts.GetResultSize(result.ResultLocation) + if err != nil { + slog.Error("Error getting artifact size", "error", err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Append all scanned (complete and incomplete) repos to the response + scannedRepos = append(scannedRepos, common.ScannedRepo{ Repository: common.Repository{ - ID: 0, - Name: job.NWO.Repo, - FullName: fmt.Sprintf("%s/%s", job.NWO.Owner, job.NWO.Repo), + ID: jobRepoId, + Name: job.Spec.Repo, + FullName: fmt.Sprintf("%s/%s", job.Spec.Owner, job.Spec.Repo), Private: false, StargazersCount: 0, UpdatedAt: ji.UpdatedAt, }, - AnalysisStatus: astat, - ResultCount: 123, // FIXME 123 is a lie so the client downloads - ArtifactSizeBytes: 123, // FIXME + AnalysisStatus: status.ToExternalString(), + ResultCount: result.ResultCount, + ArtifactSizeBytes: int(artifactSize), }, ) } - astat := storage.GetStatus(js.JobID, js.NameWithOwner).ToExternalString() + jobStatus, err := c.v.State.GetStatus(js) + if err != nil { + slog.Error("Error getting job status", "error", err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } status := common.StatusResponse{ - SessionId: js.JobID, + SessionId: js.SessionID, ControllerRepo: common.ControllerRepo{}, Actor: common.Actor{}, QueryLanguage: ji.QueryLanguage, @@ -92,8 +167,8 @@ func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpe CreatedAt: ji.CreatedAt, UpdatedAt: ji.UpdatedAt, ActionsWorkflowRunID: 0, // FIXME - Status: astat, - ScannedRepositories: all_scanned, + Status: jobStatus.ToExternalString(), + ScannedRepositories: scannedRepos, SkippedRepositories: ji.SkippedRepositories, } @@ -115,111 +190,195 @@ func (c *CommanderSingle) RootHandler(w http.ResponseWriter, r *http.Request) { slog.Info("Request on /") } -func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) +func (c *CommanderSingle) MRVAStatusCommon(w http.ResponseWriter, r *http.Request, owner, repo string, variantAnalysisID string) { slog.Info("MRVA status request for ", - "owner", vars["owner"], - "repo", vars["repo"], - "codeql_variant_analysis_id", vars["codeql_variant_analysis_id"]) - id, err := strconv.Atoi(vars["codeql_variant_analysis_id"]) + "owner", owner, + "repo", repo, + "codeql_variant_analysis_id", variantAnalysisID) + + id, err := strconv.ParseInt(variantAnalysisID, 10, 32) if err != nil { - slog.Error("Variant analysis is is not integer", "id", - vars["codeql_variant_analysis_id"]) - http.Error(w, err.Error(), http.StatusInternalServerError) + slog.Error("Variant analysis ID is not integer", "id", + variantAnalysisID) + http.Error(w, err.Error(), http.StatusBadRequest) return } + + spec, err := c.v.State.GetJobList(int(id)) + if err != nil || len(spec) == 0 { + msg := "No jobs found for given session id" + slog.Error(msg, "id", variantAnalysisID) + http.Error(w, msg, http.StatusNotFound) + return + } + // The status reports one status for all jobs belonging to an id. // So we simply report the status of a job as the status of all. - spec := storage.GetJobList(id) - if spec == nil { - msg := "No jobs found for given job id" - slog.Error(msg, "id", vars["codeql_variant_analysis_id"]) - http.Error(w, msg, http.StatusUnprocessableEntity) + job := spec[0] + + jobInfo, err := c.v.State.GetJobInfo(job.Spec) + if err != nil { + msg := "No job info found for given session id" + slog.Error(msg, "id", variantAnalysisID) + http.Error(w, msg, http.StatusBadRequest) return } - job := spec[0] + c.submitStatusResponse(w, job.Spec, jobInfo) +} - js := common.JobSpec{ - JobID: job.QueryPackId, - NameWithOwner: job.NWO, - } +func (c *CommanderSingle) MRVAStatusID(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("MRVA status request (MRVAStatusID)") + // Mapping to unused/unused and passing variant analysis id + c.MRVAStatusCommon(w, r, "unused", "unused", vars["codeql_variant_analysis_id"]) +} - ji := storage.GetJobInfo(js) - - c.StatusResponse(w, js, ji, id) +func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("MRVA status request (MRVAStatus)") + // Mapping to owner/repo and passing variant analysis id + c.MRVAStatusCommon(w, r, vars["owner"], vars["repo"], vars["codeql_variant_analysis_id"]) } // Download artifacts -func (c *CommanderSingle) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - slog.Info("MRVA artifact download", - "controller_owner", vars["controller_owner"], - "controller_repo", vars["controller_repo"], - "codeql_variant_analysis_id", vars["codeql_variant_analysis_id"], - "repo_owner", vars["repo_owner"], - "repo_name", vars["repo_name"], +func (c *CommanderSingle) MRVADownloadArtifactCommon(w http.ResponseWriter, r *http.Request, jobRepoId int, jobSpec common.JobSpec) { + slog.Debug("MRVA artifact download", + "codeql_variant_analysis_id", jobSpec.SessionID, + "repo_owner", jobSpec.NameWithOwner.Owner, + "repo_name", jobSpec.NameWithOwner.Repo, ) - vaid, err := strconv.Atoi(vars["codeql_variant_analysis_id"]) + + c.sendDownloadResponse(w, jobRepoId, jobSpec) +} + +func (c *CommanderSingle) MRVADownloadArtifactID(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Debug("MRVA artifact download", "id", vars["codeql_variant_analysis_id"], "repo_id", vars["repository_id"]) + + sessionId, err := strconv.ParseInt(vars["codeql_variant_analysis_id"], 10, 32) if err != nil { - slog.Error("Variant analysis is is not integer", "id", - vars["codeql_variant_analysis_id"]) - http.Error(w, err.Error(), http.StatusInternalServerError) + slog.Error("Variant analysis ID is not an integer", "id", vars["codeql_variant_analysis_id"]) + http.Error(w, err.Error(), http.StatusBadRequest) return } - js := common.JobSpec{ - JobID: vaid, + + repoId, err := strconv.ParseInt(vars["repository_id"], 10, 32) + if err != nil { + slog.Error("Repository ID is not an integer", "id", vars["repository_id"]) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + jobSpec, err := c.v.State.GetJobSpecByRepoId(int(sessionId), int(repoId)) + if err != nil { + slog.Error("Failed to get job spec by repo ID", "error", err) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + c.MRVADownloadArtifactCommon(w, r, int(repoId), jobSpec) +} + +func (c *CommanderSingle) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + sessionId, err := strconv.ParseInt(vars["codeql_variant_analysis_id"], 10, 32) + if err != nil { + slog.Error("Variant analysis ID is not an integer", "id", vars["codeql_variant_analysis_id"]) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + jobSpec := common.JobSpec{ + SessionID: int(sessionId), NameWithOwner: common.NameWithOwner{ Owner: vars["repo_owner"], Repo: vars["repo_name"], }, } - c.DownloadResponse(w, js, vaid) + // TODO: THIS IS BROKEN UNLESS REPO ID IS IGNORED + c.MRVADownloadArtifactCommon(w, r, -1, jobSpec) } -func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobSpec, vaid int) { - slog.Debug("Forming download response", "session", vaid, "job", js) +func (c *CommanderSingle) sendDownloadResponse(w http.ResponseWriter, jobRepoId int, jobSpec common.JobSpec) { + var response common.DownloadResponse - astat := storage.GetStatus(vaid, js.NameWithOwner) + slog.Debug("Forming download response", "job", jobSpec) - var dlr common.DownloadResponse - if astat == common.StatusSuccess { + jobStatus, err := c.v.State.GetStatus(jobSpec) + if err != nil { + slog.Error(err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } - au, err := storage.ArtifactURL(js, vaid) + if jobStatus == common.StatusSuccess { + jobResult, err := c.v.State.GetResult(jobSpec) + if err != nil { + slog.Error(err.Error()) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + jobResultData, err := c.v.Artifacts.GetResult(jobResult.ResultLocation) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - dlr = common.DownloadResponse{ + // Generate the artifact URL + encodedJobSpec, err := common.EncodeJobSpec(jobSpec) + if err != nil { + http.Error(w, "Failed to encode job spec", http.StatusInternalServerError) + return + } + + host := os.Getenv("SERVER_HOST") + if host == "" { + host = "localhost" + } + + port := os.Getenv("SERVER_PORT") + if port == "" { + port = "8080" + } + + artifactURL := fmt.Sprintf("http://%s:%s/download/%s", host, port, encodedJobSpec) + + response = common.DownloadResponse{ Repository: common.DownloadRepo{ - Name: js.Repo, - FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo), + // TODO: fix jobRepoID coming from the NWO path. The MRVA extension uses repo ID. + ID: jobRepoId, + Name: jobSpec.Repo, + FullName: fmt.Sprintf("%s/%s", jobSpec.Owner, jobSpec.Repo), }, - AnalysisStatus: astat.ToExternalString(), - ResultCount: 123, // FIXME - ArtifactSizeBytes: 123, // FIXME - DatabaseCommitSha: "do-we-use-dcs-p", - SourceLocationPrefix: "do-we-use-slp-p", - ArtifactURL: au, + AnalysisStatus: jobStatus.ToExternalString(), + ResultCount: jobResult.ResultCount, + ArtifactSizeBytes: len(jobResultData), + DatabaseCommitSha: jobResult.DatabaseSHA, + SourceLocationPrefix: jobResult.SourceLocationPrefix, + ArtifactURL: artifactURL, } } else { - dlr = common.DownloadResponse{ + response = common.DownloadResponse{ Repository: common.DownloadRepo{ - Name: js.Repo, - FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo), + // TODO: fix jobRepoID coming from the NWO path. The MRVA extension uses repo ID. + ID: jobRepoId, + Name: jobSpec.Repo, + FullName: fmt.Sprintf("%s/%s", jobSpec.Owner, jobSpec.Repo), }, - AnalysisStatus: astat.ToExternalString(), + AnalysisStatus: jobStatus.ToExternalString(), ResultCount: 0, ArtifactSizeBytes: 0, DatabaseCommitSha: "", - SourceLocationPrefix: "/not/relevant/here", + SourceLocationPrefix: "", ArtifactURL: "", } } // Encode the response as JSON - jdlr, err := json.Marshal(dlr) + responseJson, err := json.Marshal(response) if err != nil { slog.Error("Error encoding response as JSON:", "error", err) @@ -229,75 +388,74 @@ func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobS // Send analysisReposJSON via ResponseWriter w.Header().Set("Content-Type", "application/json") - w.Write(jdlr) - + w.Write(responseJson) } func (c *CommanderSingle) MRVADownloadServe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - slog.Info("File download request", "local_path", vars["local_path"]) + encodedJobSpec := vars["encoded_job_spec"] - FileDownload(w, vars["local_path"]) -} - -func FileDownload(w http.ResponseWriter, path string) { - slog.Debug("Sending zip file with .sarif/.bqrs", "path", path) - - fpath, res, err := storage.ResultAsFile(path) + jobSpec, err := common.DecodeJobSpec(encodedJobSpec) if err != nil { - http.Error(w, "Failed to read results", http.StatusInternalServerError) + http.Error(w, "Invalid job spec", http.StatusBadRequest) return } - // Set headers - fname := filepath.Base(fpath) - w.Header().Set("Content-Disposition", "attachment; filename="+fname) + + slog.Info("Result download request", "job_spec", jobSpec) + + result, err := c.v.State.GetResult(jobSpec) + if err != nil { + slog.Error("Failed to get result", "error", err) + http.Error(w, "Failed to get result", http.StatusInternalServerError) + return + } + + slog.Debug("Result location", "location", result.ResultLocation) + + data, err := c.v.Artifacts.GetResult(result.ResultLocation) + if err != nil { + slog.Error("Failed to retrieve artifact", "error", err) + http.Error(w, "Failed to retrieve artifact", http.StatusInternalServerError) + return + } + + // Send the file as a response w.Header().Set("Content-Type", "application/octet-stream") + w.Write(data) +} - // Copy the file contents to the response writer - rdr := bytes.NewReader(res) - _, err = io.Copy(w, rdr) +func (c *CommanderSingle) MRVARequestCommon(w http.ResponseWriter, r *http.Request) { + sessionId := c.v.State.NextID() + slog.Info("New MRVA Request", "id", fmt.Sprint(sessionId)) + queryLanguage, repoNWOs, queryPackLocation, err := c.collectRequestInfo(w, r, sessionId) if err != nil { - http.Error(w, "Failed to send file", http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusBadRequest) return } - slog.Debug("Uploaded file", "path", fpath) + slog.Debug("Processed request info", "location", queryPackLocation, "language", queryLanguage) -} + // TODO This returns 0 analysisRepos. 2024/06/19 02:26:47 DEBUG Queueing analysis jobs count=0 + notFoundRepos, analysisRepos := c.v.CodeQLDBStore.FindAvailableDBs(repoNWOs) -func (c *CommanderSingle) MRVARequestID(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - slog.Info("New mrva using repository_id=%v\n", vars["repository_id"]) -} - -func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - slog.Info("New mrva run ", "owner", vars["owner"], "repo", vars["repo"]) - - session_id := c.vis.ServerStore.NextID() - session_owner := vars["owner"] - session_controller_repo := vars["repo"] - slog.Info("new run", "id: ", fmt.Sprint(session_id), session_owner, session_controller_repo) - session_language, session_repositories, session_tgz_ref, err := c.collectRequestInfo(w, r, session_id) - if err != nil { - return + if len(*analysisRepos) == 0 { + slog.Warn("No repositories found for analysis") } - not_found_repos, analysisRepos := c.vis.ServerStore.FindAvailableDBs(session_repositories) - - c.vis.Queue.StartAnalyses(analysisRepos, session_id, session_language) + // XX: session_is is separate from the query pack ref. Value may be equal + c.startAnalyses(analysisRepos, queryPackLocation, sessionId, queryLanguage) si := SessionInfo{ - ID: session_id, - Owner: session_owner, - ControllerRepo: session_controller_repo, + ID: sessionId, + Owner: "unused", + ControllerRepo: "unused", - QueryPack: session_tgz_ref, - Language: session_language, - Repositories: session_repositories, + QueryPack: strconv.Itoa(sessionId), // TODO + Language: queryLanguage, + Repositories: repoNWOs, AccessMismatchRepos: nil, /* FIXME */ - NotFoundRepos: not_found_repos, + NotFoundRepos: notFoundRepos, NoCodeqlDBRepos: nil, /* FIXME */ OverLimitRepos: nil, /* FIXME */ @@ -305,8 +463,9 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) { } slog.Debug("Forming and sending response for submitted analysis job", "id", si.ID) - submit_response, err := submit_response(si) + submit_response, err := c.submitResponse(si) if err != nil { + slog.Error("Error forming submit response", "error", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -315,6 +474,16 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) { w.Write(submit_response) } +func (c *CommanderSingle) MRVARequestID(w http.ResponseWriter, r *http.Request) { + slog.Debug("MRVARequestID") + c.MRVARequestCommon(w, r) +} + +func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) { + slog.Debug("MRVARequest") + c.MRVARequestCommon(w, r) +} + func nwoToNwoStringArray(nwo []common.NameWithOwner) ([]string, int) { repos := []string{} count := len(nwo) @@ -324,111 +493,145 @@ func nwoToNwoStringArray(nwo []common.NameWithOwner) ([]string, int) { return repos, count } -func submit_response(sn SessionInfo) ([]byte, error) { +func nwoToDummyRepositoryArray(nwo []common.NameWithOwner) ([]common.Repository, int) { + repos := []common.Repository{} + for _, repo := range nwo { + repos = append(repos, common.Repository{ + ID: -1, + Name: repo.Repo, + FullName: fmt.Sprintf("%s/%s", repo.Owner, repo.Repo), + Private: false, + StargazersCount: 0, + UpdatedAt: time.Now().Format(time.RFC3339), + }) + } + count := len(nwo) + + return repos, count +} + +// ConsumeResults moves results from 'queue' to server 'state' +func (c *CommanderSingle) ConsumeResults() { + slog.Info("Started server results consumer.") + for { + r := <-c.v.Queue.Results() + slog.Debug("Result consumed:", "r", r, "status", r.Status.ToExternalString()) + c.v.State.SetResult(r.Spec, r) + c.v.State.SetStatus(r.Spec, r.Status) + } +} + +func (c *CommanderSingle) submitResponse(si SessionInfo) ([]byte, error) { // Construct the response bottom-up - var m_cr common.ControllerRepo - var m_ac common.Actor + var controllerRepo common.ControllerRepo + var actor common.Actor - repos, count := nwoToNwoStringArray(sn.NotFoundRepos) - r_nfr := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repos} + repoNames, count := nwoToNwoStringArray(si.NotFoundRepos) + notFoundRepos := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repoNames} - repos, count = nwoToNwoStringArray(sn.AccessMismatchRepos) - r_amr := common.AccessMismatchRepos{RepositoryCount: count, Repositories: repos} + repos, _ := nwoToDummyRepositoryArray(si.AccessMismatchRepos) + accessMismatchRepos := common.AccessMismatchRepos{RepositoryCount: count, Repositories: repos} - repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos) - r_ncd := common.NoCodeqlDBRepos{RepositoryCount: count, Repositories: repos} + repos, _ = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos) + noCodeQLDBRepos := common.NoCodeqlDBRepos{RepositoryCount: count, Repositories: repos} // TODO fill these with real values? - repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos) - r_olr := common.OverLimitRepos{RepositoryCount: count, Repositories: repos} + repos, _ = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos) + overlimitRepos := common.OverLimitRepos{RepositoryCount: count, Repositories: repos} - m_skip := common.SkippedRepositories{ - AccessMismatchRepos: r_amr, - NotFoundRepos: r_nfr, - NoCodeqlDBRepos: r_ncd, - OverLimitRepos: r_olr} + skippedRepositories := common.SkippedRepositories{ + AccessMismatchRepos: accessMismatchRepos, + NotFoundRepos: notFoundRepos, + NoCodeqlDBRepos: noCodeQLDBRepos, + OverLimitRepos: overlimitRepos} - m_sr := common.SubmitResponse{ - Actor: m_ac, - ControllerRepo: m_cr, - ID: sn.ID, - QueryLanguage: sn.Language, - QueryPackURL: sn.QueryPack, + response := common.SubmitResponse{ + Actor: actor, + ControllerRepo: controllerRepo, + ID: si.ID, + QueryLanguage: si.Language, + QueryPackURL: si.QueryPack, CreatedAt: time.Now().Format(time.RFC3339), UpdatedAt: time.Now().Format(time.RFC3339), Status: "in_progress", - SkippedRepositories: m_skip, + SkippedRepositories: skippedRepositories, } // Store data needed later - joblist := storage.GetJobList(sn.ID) + joblist, err := c.v.State.GetJobList(si.ID) + if err != nil { + slog.Error("Error getting job list", "error", err.Error()) + return nil, err + } for _, job := range joblist { - storage.SetJobInfo(common.JobSpec{ - JobID: sn.ID, - NameWithOwner: job.NWO, + c.v.State.SetJobInfo(common.JobSpec{ + SessionID: si.ID, + NameWithOwner: job.Spec.NameWithOwner, }, common.JobInfo{ - QueryLanguage: sn.Language, - CreatedAt: m_sr.CreatedAt, - UpdatedAt: m_sr.UpdatedAt, - SkippedRepositories: m_skip, + QueryLanguage: si.Language, + CreatedAt: response.CreatedAt, + UpdatedAt: response.UpdatedAt, + SkippedRepositories: skippedRepositories, }, ) } // Encode the response as JSON - submit_response, err := json.Marshal(m_sr) + responseJson, err := json.Marshal(response) if err != nil { - slog.Warn("Error encoding response as JSON:", err) + slog.Error("Error encoding response as JSON", "err", err) return nil, err } - return submit_response, nil + return responseJson, nil } -func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, string, error) { +func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, artifactstore.ArtifactLocation, error) { slog.Debug("Collecting session info") if r.Body == nil { err := errors.New("missing request body") - log.Println(err) + slog.Error("Error reading MRVA submission body", "error", err) http.Error(w, err.Error(), http.StatusNoContent) - return "", []common.NameWithOwner{}, "", err + return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err } + buf, err := io.ReadAll(r.Body) if err != nil { var w http.ResponseWriter - slog.Error("Error reading MRVA submission body", "error", err.Error()) + slog.Error("Error reading MRVA submission body", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) - return "", []common.NameWithOwner{}, "", err + return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err } - msg, err := TrySubmitMsg(buf) - if err != nil { - // Unknown message - slog.Error("Unknown MRVA submission body format") - http.Error(w, err.Error(), http.StatusBadRequest) - return "", []common.NameWithOwner{}, "", err - } - // Decompose the SubmitMsg and keep information - // Save the query pack and keep the location - if !isBase64Gzip([]byte(msg.QueryPack)) { + msg, err := tryParseSubmitMsg(buf) + if err != nil { + slog.Error("Unknown MRVA submission body format", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err + } + + // 1. Save the query pack and keep the location + if !utils.IsBase64Gzip([]byte(msg.QueryPack)) { slog.Error("MRVA submission body querypack has invalid format") err := errors.New("MRVA submission body querypack has invalid format") http.Error(w, err.Error(), http.StatusBadRequest) - return "", []common.NameWithOwner{}, "", err + return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err } - session_tgz_ref, err := c.extract_tgz(msg.QueryPack, sessionId) + + queryPackLocation, err := c.processQueryPackArchive(msg.QueryPack, sessionId) if err != nil { + slog.Error("Error processing query pack archive", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) - return "", []common.NameWithOwner{}, "", err + return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err } // 2. Save the language - session_language := msg.Language + sessionLanguage := msg.Language // 3. Save the repositories - var session_repositories []common.NameWithOwner + var sessionRepos []common.NameWithOwner for _, v := range msg.Repositories { t := strings.Split(v, "/") @@ -437,14 +640,15 @@ func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Requ slog.Error(err, "entry", t) http.Error(w, err, http.StatusBadRequest) } - session_repositories = append(session_repositories, + sessionRepos = append(sessionRepos, common.NameWithOwner{Owner: t[0], Repo: t[1]}) } - return session_language, session_repositories, session_tgz_ref, nil + + return sessionLanguage, sessionRepos, queryPackLocation, nil } // Try to extract a SubmitMsg from a json-encoded buffer -func TrySubmitMsg(buf []byte) (common.SubmitMsg, error) { +func tryParseSubmitMsg(buf []byte) (common.SubmitMsg, error) { buf1 := make([]byte, len(buf)) copy(buf1, buf) dec := json.NewDecoder(bytes.NewReader(buf1)) @@ -454,32 +658,7 @@ func TrySubmitMsg(buf []byte) (common.SubmitMsg, error) { return m, err } -// Some important payloads can be listed via -// base64 -d < foo1 | gunzip | tar t|head -20 -// -// This function checks the request body up to the `gunzip` part. -func isBase64Gzip(val []byte) bool { - if len(val) >= 4 { - // Extract header - hdr := make([]byte, base64.StdEncoding.DecodedLen(4)) - _, err := base64.StdEncoding.Decode(hdr, []byte(val[0:4])) - if err != nil { - log.Println("WARNING: IsBase64Gzip decode error:", err) - return false - } - // Check for gzip heading - magic := []byte{0x1f, 0x8b} - if bytes.Equal(hdr[0:2], magic) { - return true - } else { - return false - } - } else { - return false - } -} - -func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error) { +func (c *CommanderSingle) processQueryPackArchive(qp string, sessionId int) (artifactstore.ArtifactLocation, error) { // These are decoded manually via // base64 -d < foo1 | gunzip | tar t | head -20 // base64 decode the body @@ -487,14 +666,15 @@ func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error) tgz, err := base64.StdEncoding.DecodeString(qp) if err != nil { - slog.Error("querypack body decoding error:", err) - return "", err + slog.Error("Failed to decode query pack body", "err", err) + return artifactstore.ArtifactLocation{}, err } - session_query_pack_tgz_filepath, err := c.vis.ServerStore.SaveQueryPack(tgz, sessionID) + artifactLocation, err := c.v.Artifacts.SaveQueryPack(sessionId, tgz) if err != nil { - return "", err + slog.Error("Failed to save query pack", "err", err) + return artifactstore.ArtifactLocation{}, err } - return session_query_pack_tgz_filepath, err + return artifactLocation, err } diff --git a/pkg/server/types.go b/pkg/server/types.go index 462efa2..854b889 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -1,57 +1,41 @@ package server import ( + "mrvacommander/pkg/artifactstore" "mrvacommander/pkg/common" - "mrvacommander/pkg/logger" "mrvacommander/pkg/qldbstore" - "mrvacommander/pkg/qpstore" "mrvacommander/pkg/queue" - "mrvacommander/pkg/storage" + "mrvacommander/pkg/state" ) type SessionInfo struct { - ID int - Owner string - ControllerRepo string - - QueryPack string - Language string - Repositories []common.NameWithOwner - + ID int + Owner string + ControllerRepo string + QueryPack string + Language string + Repositories []common.NameWithOwner AccessMismatchRepos []common.NameWithOwner NotFoundRepos []common.NameWithOwner NoCodeqlDBRepos []common.NameWithOwner OverLimitRepos []common.NameWithOwner - - AnalysisRepos *map[common.NameWithOwner]storage.DBLocation + AnalysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation } type CommanderSingle struct { - vis *Visibles + v *Visibles } func NewCommanderSingle(st *Visibles) *CommanderSingle { - c := CommanderSingle{} - + c := CommanderSingle{v: st} setupEndpoints(&c) - + go c.ConsumeResults() return &c } -// type State struct { -// Commander Commander -// Logger logger.Logger -// Queue queue.Queue -// Storage storage.Storage -// Runner agent.Runner -// } - type Visibles struct { - Logger logger.Logger - Queue queue.Queue - ServerStore storage.Storage - // TODO extra package for query pack storage - QueryPackStore qpstore.Storage - // TODO extra package for ql db storage - QLDBStore qldbstore.Storage + Queue queue.Queue + State state.ServerState + Artifacts artifactstore.Store + CodeQLDBStore qldbstore.Store } diff --git a/pkg/state/interfaces.go b/pkg/state/interfaces.go new file mode 100644 index 0000000..5fdff80 --- /dev/null +++ b/pkg/state/interfaces.go @@ -0,0 +1,41 @@ +package state + +import ( + "mrvacommander/pkg/common" + "mrvacommander/pkg/queue" +) + +// StorageInterface defines the methods required for managing storage operations +// related to server state, e.g. job status, results, and artifacts. +type ServerState interface { + // NextID increments and returns the next unique ID for a session. + NextID() int + + // GetResult retrieves the analysis result for the specified job. + GetResult(js common.JobSpec) (queue.AnalyzeResult, error) + + // GetJobSpecByRepoId retrieves the JobSpec for the specified job Repo ID. + // TODO: fix this hacky logic + GetJobSpecByRepoId(sessionId int, jobRepoId int) (common.JobSpec, error) + + // SetResult stores the analysis result for the specified session ID and repository. + SetResult(js common.JobSpec, ar queue.AnalyzeResult) + + // GetJobList retrieves the list of analysis jobs for the specified session ID. + GetJobList(sessionId int) ([]queue.AnalyzeJob, error) + + // GetJobInfo retrieves the job information for the specified job specification. + GetJobInfo(js common.JobSpec) (common.JobInfo, error) + + // SetJobInfo stores the job information for the specified job specification. + SetJobInfo(js common.JobSpec, ji common.JobInfo) + + // GetStatus retrieves the status of a job for the specified session ID and repository. + GetStatus(js common.JobSpec) (common.Status, error) + + // SetStatus stores the status of a job for the specified session ID and repository. + SetStatus(js common.JobSpec, status common.Status) + + // AddJob adds an analysis job to the list of jobs for the specified session ID. + AddJob(job queue.AnalyzeJob) +} diff --git a/pkg/state/state_local.go b/pkg/state/state_local.go new file mode 100644 index 0000000..7a177a3 --- /dev/null +++ b/pkg/state/state_local.go @@ -0,0 +1,126 @@ +package state + +import ( + "fmt" + "log/slog" + "mrvacommander/pkg/common" + "mrvacommander/pkg/queue" + "sync" +) + +type LocalState struct { + jobs map[int][]queue.AnalyzeJob + info map[common.JobSpec]common.JobInfo + status map[common.JobSpec]common.Status + result map[common.JobSpec]queue.AnalyzeResult + sessionToJobIdToSpec map[int]map[int]common.JobSpec + mutex sync.Mutex + currentID int +} + +func NewLocalState(startingID int) *LocalState { + state := &LocalState{ + jobs: make(map[int][]queue.AnalyzeJob), + info: make(map[common.JobSpec]common.JobInfo), + status: make(map[common.JobSpec]common.Status), + result: make(map[common.JobSpec]queue.AnalyzeResult), + sessionToJobIdToSpec: make(map[int]map[int]common.JobSpec), + currentID: startingID, + } + state.sessionToJobIdToSpec[startingID] = make(map[int]common.JobSpec) + state.jobs[startingID] = []queue.AnalyzeJob{} + return state +} + +func (s *LocalState) NextID() int { + s.mutex.Lock() + defer s.mutex.Unlock() + s.currentID++ + s.jobs[s.currentID] = []queue.AnalyzeJob{} + return s.currentID +} + +func (s *LocalState) GetResult(js common.JobSpec) (queue.AnalyzeResult, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.result[js]; !ok { + return queue.AnalyzeResult{}, fmt.Errorf("result not found for job spec %v", js) + } + return s.result[js], nil +} + +func (s *LocalState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + idToSpec, ok := s.sessionToJobIdToSpec[sessionId] + if !ok { + return common.JobSpec{}, fmt.Errorf("job ids not found for session %v", sessionId) + } + spec, ok := idToSpec[jobRepoId] + if !ok { + return common.JobSpec{}, fmt.Errorf("job spec not found for job repo id %v", jobRepoId) + } + return spec, nil +} + +func (s *LocalState) SetResult(js common.JobSpec, analyzeResult queue.AnalyzeResult) { + s.mutex.Lock() + s.result[js] = analyzeResult + s.mutex.Unlock() +} + +func (s *LocalState) GetJobList(sessionID int) ([]queue.AnalyzeJob, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.jobs[sessionID]; !ok { + return nil, fmt.Errorf("job list not found for session %v", sessionID) + } + return s.jobs[sessionID], nil +} + +func (s *LocalState) GetJobInfo(js common.JobSpec) (common.JobInfo, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.info[js]; !ok { + return common.JobInfo{}, fmt.Errorf("job info not found for job spec %v", js) + } + return s.info[js], nil +} + +func (s *LocalState) SetJobInfo(js common.JobSpec, ji common.JobInfo) { + s.mutex.Lock() + s.info[js] = ji + s.mutex.Unlock() +} + +func (s *LocalState) GetStatus(js common.JobSpec) (common.Status, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.status[js]; !ok { + return common.StatusError, fmt.Errorf("status not found for job spec %v", js) + } + return s.status[js], nil +} + +func (s *LocalState) SetStatus(js common.JobSpec, status common.Status) { + s.mutex.Lock() + s.status[js] = status + s.mutex.Unlock() +} + +func (s *LocalState) AddJob(job queue.AnalyzeJob) { + s.mutex.Lock() + sessionID := job.Spec.SessionID + s.jobs[sessionID] = append(s.jobs[sessionID], job) + // Map the job index to JobSpec for quick result lookup + if _, ok := s.sessionToJobIdToSpec[sessionID]; !ok { + s.sessionToJobIdToSpec[sessionID] = make(map[int]common.JobSpec) + } + s.sessionToJobIdToSpec[sessionID][len(s.sessionToJobIdToSpec[sessionID])] = job.Spec + if len(s.jobs[sessionID]) != len(s.sessionToJobIdToSpec[sessionID]) { + msg := fmt.Sprintf("Unequal job list and job id map length. Session ID: %v", sessionID) + slog.Error(msg) + panic(msg) + } + s.mutex.Unlock() +} diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go deleted file mode 100644 index 7faf585..0000000 --- a/pkg/storage/interfaces.go +++ /dev/null @@ -1,10 +0,0 @@ -package storage - -import "mrvacommander/pkg/common" - -type Storage interface { - NextID() int - SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error) - FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner, - analysisRepos *map[common.NameWithOwner]DBLocation) -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go deleted file mode 100644 index 5383f92..0000000 --- a/pkg/storage/storage.go +++ /dev/null @@ -1,244 +0,0 @@ -package storage - -import ( - "archive/zip" - "errors" - "fmt" - "io/fs" - "log/slog" - "os" - "path" - "path/filepath" - "sync" - - "mrvacommander/pkg/common" -) - -var ( - jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob) - info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo) - status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status) - result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult) - mutex sync.Mutex -) - -func NewStorageSingle(startingID int, v *Visibles) *StorageSingle { - s := StorageSingle{currentID: startingID} - - s.modules = v - - return &s -} - -func (s *StorageSingle) NextID() int { - s.currentID += 1 - return s.currentID -} - -func (s *StorageSingle) SaveQueryPack(tgz []byte, sessionId int) (string, error) { - // Save the tar.gz body - cwd, err := os.Getwd() - if err != nil { - slog.Error("No working directory") - panic(err) - } - - dirpath := path.Join(cwd, "var", "codeql", "querypacks") - if err := os.MkdirAll(dirpath, 0755); err != nil { - slog.Error("Unable to create query pack output directory", - "dir", dirpath) - return "", err - } - - fpath := path.Join(dirpath, fmt.Sprintf("qp-%d.tgz", sessionId)) - err = os.WriteFile(fpath, tgz, 0644) - if err != nil { - slog.Error("unable to save querypack body decoding error", "path", fpath) - return "", err - } else { - slog.Info("Query pack saved to ", "path", fpath) - } - - return fpath, nil -} - -// Determine for which repositories codeql databases are available. -// -// Those will be the analysis_repos. The rest will be skipped. -func (s *StorageSingle) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner, - analysisRepos *map[common.NameWithOwner]DBLocation) { - slog.Debug("Looking for available CodeQL databases") - - cwd, err := os.Getwd() - if err != nil { - slog.Error("No working directory") - return - } - - analysisRepos = &map[common.NameWithOwner]DBLocation{} - - not_found_repos = []common.NameWithOwner{} - - for _, rep := range analysisReposRequested { - dbPrefix := filepath.Join(cwd, "codeql", "dbs", rep.Owner, rep.Repo) - dbName := fmt.Sprintf("%s_%s_db.zip", rep.Owner, rep.Repo) - dbPath := filepath.Join(dbPrefix, dbName) - - if _, err := os.Stat(dbPath); errors.Is(err, fs.ErrNotExist) { - slog.Info("Database does not exist for repository ", "owner/repo", rep, - "path", dbPath) - not_found_repos = append(not_found_repos, rep) - } else { - slog.Info("Found database for ", "owner/repo", rep, "path", dbPath) - (*analysisRepos)[rep] = DBLocation{Prefix: dbPrefix, File: dbName} - } - } - return not_found_repos, analysisRepos -} - -func ArtifactURL(js common.JobSpec, vaid int) (string, error) { - // We're looking for paths like - // codeql/sarif/google/flatbuffers/google_flatbuffers.sarif - - ar := GetResult(js) - - hostname, err := os.Hostname() - if err != nil { - slog.Error("No host name found") - return "", nil - } - - zfpath, err := PackageResults(ar, js.NameWithOwner, vaid) - if err != nil { - slog.Error("Error packaging results:", "error", err) - return "", err - } - // TODO Need url valid in container network and externally - // For now, we assume the container port 8080 is port 8080 on user's machine - hostname = "localhost" - au := fmt.Sprintf("http://%s:8080/download-server/%s", hostname, zfpath) - return au, nil -} - -func GetResult(js common.JobSpec) common.AnalyzeResult { - mutex.Lock() - defer mutex.Unlock() - ar := result[js] - return ar -} - -func SetResult(sessionid int, nwo common.NameWithOwner, ar common.AnalyzeResult) { - mutex.Lock() - defer mutex.Unlock() - result[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = ar -} - -func PackageResults(ar common.AnalyzeResult, owre common.NameWithOwner, vaid int) (zipPath string, e error) { - slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar) - - cwd, err := os.Getwd() - if err != nil { - slog.Error("No working directory") - panic(err) - } - - // Ensure the output directory exists - dirpath := path.Join(cwd, "var", "codeql", "localrun", "results") - if err := os.MkdirAll(dirpath, 0755); err != nil { - slog.Error("Unable to create results output directory", - "dir", dirpath) - return "", err - } - - // Create a new zip file - zpath := path.Join(dirpath, fmt.Sprintf("results-%s-%s-%d.zip", owre.Owner, owre.Repo, vaid)) - - zfile, err := os.Create(zpath) - if err != nil { - return "", err - } - defer zfile.Close() - - // Create a new zip writer - zwriter := zip.NewWriter(zfile) - defer zwriter.Close() - - // Add each result file to the zip archive - /* - names := []([]string){{ar.RunAnalysisSARIF, "results.sarif"}} - for _, fpath := range names { - file, err := os.Open(fpath[0]) - if err != nil { - return "", err - } - defer file.Close() - - // Create a new file in the zip archive with custom name - // The client is very specific: - // if zf.Name != "results.sarif" && zf.Name != "results.bqrs" { continue } - - zipEntry, err := zwriter.Create(fpath[1]) - if err != nil { - return "", err - } - - // Copy the contents of the file to the zip entry - _, err = io.Copy(zipEntry, file) - if err != nil { - return "", err - } - } - */ - return zpath, nil -} - -func GetJobList(sessionid int) []common.AnalyzeJob { - mutex.Lock() - defer mutex.Unlock() - return jobs[sessionid] -} - -func GetJobInfo(js common.JobSpec) common.JobInfo { - mutex.Lock() - defer mutex.Unlock() - return info[js] -} - -func SetJobInfo(js common.JobSpec, ji common.JobInfo) { - mutex.Lock() - defer mutex.Unlock() - info[js] = ji -} - -func GetStatus(sessionid int, nwo common.NameWithOwner) common.Status { - mutex.Lock() - defer mutex.Unlock() - return status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] -} - -func ResultAsFile(path string) (string, []byte, error) { - fpath := path - if !filepath.IsAbs(path) { - fpath = "/" + path - } - - file, err := os.ReadFile(fpath) - if err != nil { - slog.Warn("Failed to read results file", fpath, err) - return "", nil, err - } - - return fpath, file, nil -} - -func SetStatus(sessionid int, nwo common.NameWithOwner, s common.Status) { - mutex.Lock() - defer mutex.Unlock() - status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = s -} - -func AddJob(sessionid int, job common.AnalyzeJob) { - mutex.Lock() - defer mutex.Unlock() - jobs[sessionid] = append(jobs[sessionid], job) -} diff --git a/pkg/storage/types.go b/pkg/storage/types.go deleted file mode 100644 index c464669..0000000 --- a/pkg/storage/types.go +++ /dev/null @@ -1,66 +0,0 @@ -package storage - -import ( - "mrvacommander/pkg/common" - - "gorm.io/gorm" -) - -type DBLocation struct { - Prefix string - File string -} - -type StorageSingle struct { - currentID int - modules *Visibles -} - -type DBSpec struct { - Host string - Port int - User string - Password string - DBname string -} - -type DBInfo struct { - // Database version of - // info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo) - gorm.Model - JobSpec common.JobSpec `gorm:"type:jsonb"` - JobInfo common.JobInfo `gorm:"type:jsonb"` -} - -type DBJobs struct { - // Database version of - // jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob) - gorm.Model - JobKey int - AnalyzeJob common.AnalyzeJob `gorm:"type:jsonb"` -} - -type DBResult struct { - // Database version of - // result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult) - gorm.Model - JobSpec common.JobSpec `gorm:"type:jsonb"` - AnalyzeResult common.AnalyzeResult `gorm:"type:jsonb"` -} - -type DBStatus struct { - // Database version of - // status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status) - gorm.Model - JobSpec common.JobSpec `gorm:"type:jsonb"` - Status common.Status `gorm:"type:jsonb"` -} - -type StorageContainer struct { - // Database version of StorageSingle - RequestID int - DB *gorm.DB - modules *Visibles -} - -type Visibles struct{} diff --git a/populate-dbstore.sh b/populate-dbstore.sh old mode 100644 new mode 100755 index c12c000..184a984 --- a/populate-dbstore.sh +++ b/populate-dbstore.sh @@ -10,19 +10,22 @@ BUCKET_NAME="qldb" # Check for MinIO client if ! command -v mc &> /dev/null then - echo "MinIO client (mc) not found. " + echo "MinIO client (mc) not found." + exit 1 fi # Configure MinIO client mc alias set $MINIO_ALIAS $MINIO_URL $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD -# Create ql db bucket +# Create qldb bucket mc mb $MINIO_ALIAS/$BUCKET_NAME -# Upload the two sample DBs +# Upload the two sample DBs with new names mc cp cmd/server/codeql/dbs/google/flatbuffers/google_flatbuffers_db.zip \ - cmd/server/codeql/dbs/psycopg/psycopg2/psycopg_psycopg2_db.zip \ - $MINIO_ALIAS/$BUCKET_NAME + $MINIO_ALIAS/$BUCKET_NAME/google\$flatbuffers.zip -# Check new diskuse +mc cp cmd/server/codeql/dbs/psycopg/psycopg2/psycopg_psycopg2_db.zip \ + $MINIO_ALIAS/$BUCKET_NAME/psycopg\$psycopg2.zip + +# Check new disk use du -k dbstore-data diff --git a/test/storage_json_test.go b/test/storage_json_test.go deleted file mode 100644 index 19abf41..0000000 --- a/test/storage_json_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package main - -import ( - "testing" - - "mrvacommander/pkg/storage" -) - -func TestSetupTables(t *testing.T) { - - db, err := storage.ConnectDB(storage.DBSpec{ - Host: "localhost", - Port: 5432, - User: "exampleuser", - Password: "examplepass", - DBname: "exampledb", - }) - - if err != nil { - t.Errorf("Cannot connect to db") - } - - // Check and set up the database - s := storage.StorageContainer{RequestID: 12345, DB: db} - if err := s.SetupTables(); err != nil { - t.Errorf("Cannot set up db") - } - -} diff --git a/utils/archive.go b/utils/archive.go index 13aa6c3..9b25880 100644 --- a/utils/archive.go +++ b/utils/archive.go @@ -3,9 +3,12 @@ package utils import ( "archive/tar" "archive/zip" + "bytes" "compress/gzip" + "encoding/base64" "fmt" "io" + "log" "os" "path/filepath" "strings" @@ -123,3 +126,28 @@ func Untar(r io.Reader, dest string) error { return nil } + +// IsBase64Gzip checks the request body up to the `gunzip` part. +// +// Some important payloads can be listed via +// base64 -d < foo1 | gunzip | tar t|head -20 +func IsBase64Gzip(val []byte) bool { + if len(val) >= 4 { + // Extract header + hdr := make([]byte, base64.StdEncoding.DecodedLen(4)) + _, err := base64.StdEncoding.Decode(hdr, []byte(val[0:4])) + if err != nil { + log.Println("WARNING: IsBase64Gzip decode error:", err) + return false + } + // Check for gzip heading + magic := []byte{0x1f, 0x8b} + if bytes.Equal(hdr[0:2], magic) { + return true + } else { + return false + } + } else { + return false + } +}