Fully implement local and container MRVA

This commit is contained in:
Nicolas Will
2024-06-17 13:16:24 +02:00
parent ef7552c43f
commit e0cbc01d21
43 changed files with 1700 additions and 1137 deletions

View File

@@ -4,26 +4,29 @@ import (
"context"
"fmt"
"log/slog"
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/codeql"
"mrvacommander/pkg/common"
"mrvacommander/pkg/logger"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/qpstore"
"mrvacommander/pkg/queue"
"mrvacommander/utils"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"github.com/elastic/go-sysinfo"
"github.com/google/uuid"
)
/*
type RunnerSingle struct {
queue queue.Queue
}
func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle {
r := RunnerSingle{queue: av.Queue}
func NewAgentSingle(numWorkers int, v *Visibles) *RunnerSingle {
r := RunnerSingle{queue: v.Queue}
for id := 1; id <= numWorkers; id++ {
go r.worker(id)
@@ -31,72 +34,175 @@ func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle {
return &r
}
type Visibles struct {
Logger logger.Logger
Queue queue.Queue
// TODO extra package for query pack storage
QueryPackStore qpstore.Storage
// TODO extra package for ql db storage
QLDBStore qldbstore.Storage
func (r *RunnerSingle) worker(wid int) {
var job common.AnalyzeJob
for {
job = <-r.queue.Jobs()
result, err := RunAnalysisJob(job)
if err != nil {
slog.Error("Failed to run analysis job", slog.Any("error", err))
continue
}
r.queue.Results() <- result
}
}
*/
const (
workerMemoryMB = 2048 // 2 GB
monitorIntervalSec = 10 // Monitor every 10 seconds
)
func calculateWorkers() int {
host, err := sysinfo.Host()
if err != nil {
slog.Error("failed to get host info", "error", err)
os.Exit(1)
}
memInfo, err := host.Memory()
if err != nil {
slog.Error("failed to get memory info", "error", err)
os.Exit(1)
}
// Get available memory in MB
totalMemoryMB := memInfo.Available / (1024 * 1024)
// Ensure we have at least one worker
workers := int(totalMemoryMB / workerMemoryMB)
if workers < 1 {
workers = 1
}
// Limit the number of workers to the number of CPUs
cpuCount := runtime.NumCPU()
if workers > cpuCount {
workers = max(cpuCount, 1)
}
return workers
}
func (r *RunnerSingle) worker(wid int) {
// TODO: reimplement this later
/*
var job common.AnalyzeJob
func StartAndMonitorWorkers(ctx context.Context,
artifacts artifactstore.Store,
databases qldbstore.Store,
queue queue.Queue,
desiredWorkerCount int,
wg *sync.WaitGroup) {
for {
job = <-r.queue.Jobs()
slog.Debug("Picking up job", "job", job, "worker", wid)
slog.Debug("Analysis: running", "job", job)
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusQueued)
resultFile, err := RunAnalysis(job)
if err != nil {
continue
}
slog.Debug("Analysis run finished", "job", job)
// TODO: FIX THIS
res := common.AnalyzeResult{
RunAnalysisSARIF: resultFile,
RunAnalysisBQRS: "", // FIXME ?
}
r.queue.Results() <- res
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusSuccess)
storage.SetResult(job.QueryPackId, job.NWO, res)
currentWorkerCount := 0
stopChans := make([]chan struct{}, 0)
if desiredWorkerCount != 0 {
slog.Info("Starting workers", slog.Int("count", desiredWorkerCount))
for i := 0; i < desiredWorkerCount; i++ {
stopChan := make(chan struct{})
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
*/
return
}
slog.Info("Worker count not specified, managing based on available memory and CPU")
for {
select {
case <-ctx.Done():
// signal all workers to stop
for _, stopChan := range stopChans {
close(stopChan)
}
return
default:
newWorkerCount := calculateWorkers()
if newWorkerCount != currentWorkerCount {
slog.Info(
"Modifying worker count",
slog.Int("current", currentWorkerCount),
slog.Int("new", newWorkerCount))
}
if newWorkerCount > currentWorkerCount {
for i := currentWorkerCount; i < newWorkerCount; i++ {
stopChan := make(chan struct{})
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
} else if newWorkerCount < currentWorkerCount {
for i := newWorkerCount; i < currentWorkerCount; i++ {
close(stopChans[i])
}
stopChans = stopChans[:newWorkerCount]
}
currentWorkerCount = newWorkerCount
time.Sleep(monitorIntervalSec * time.Second)
}
}
}
// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult
func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
var result = common.AnalyzeResult{
RequestId: job.RequestId,
ResultCount: 0,
ResultArchiveURL: "",
Status: common.StatusError,
func RunAnalysisJob(
job queue.AnalyzeJob, artifacts artifactstore.Store, dbs qldbstore.Store) (queue.AnalyzeResult, error) {
var result = queue.AnalyzeResult{
Spec: job.Spec,
ResultCount: 0,
ResultLocation: artifactstore.ArtifactLocation{},
Status: common.StatusError,
}
// Create a temporary directory
tempDir := filepath.Join(os.TempDir(), uuid.New().String())
if err := os.MkdirAll(tempDir, 0755); err != nil {
if err := os.MkdirAll(tempDir, 0600); err != nil {
return result, fmt.Errorf("failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Extract the query pack
// TODO: download from the 'job' query pack URL
// utils.downloadFile
queryPackPath := filepath.Join(tempDir, "qp-54674")
utils.UntarGz("qp-54674.tgz", queryPackPath)
// Download the query pack as a byte slice
queryPackData, err := artifacts.GetQueryPack(job.QueryPackLocation)
if err != nil {
return result, fmt.Errorf("failed to download query pack: %w", err)
}
// Write the query pack data to the filesystem
queryPackArchivePath := filepath.Join(tempDir, "query-pack.tar.gz")
if err := os.WriteFile(queryPackArchivePath, queryPackData, 0600); err != nil {
return result, fmt.Errorf("failed to write query pack archive to disk: %w", err)
}
// Make a directory and extract the query pack
queryPackPath := filepath.Join(tempDir, "pack")
if err := os.Mkdir(queryPackPath, 0600); err != nil {
return result, fmt.Errorf("failed to create query pack directory: %w", err)
}
if err := utils.UntarGz(queryPackArchivePath, queryPackPath); err != nil {
return result, fmt.Errorf("failed to extract query pack: %w", err)
}
// Download the CodeQL database as a byte slice
location, err := dbs.GetDatabaseLocationByNWO(job.Spec.NameWithOwner)
if err != nil {
return result, fmt.Errorf("failed to get database location: %w", err)
}
databaseData, err := dbs.GetDatabase(location)
if err != nil {
return result, fmt.Errorf("failed to get database: %w", err)
}
// Write the CodeQL database data to the filesystem
databasePath := filepath.Join(tempDir, "database.zip")
if err := os.WriteFile(databasePath, databaseData, 0600); err != nil {
return result, fmt.Errorf("failed to write CodeQL database to disk: %w", err)
}
// Perform the CodeQL analysis
runResult, err := codeql.RunQuery("google_flatbuffers_db.zip", "cpp", queryPackPath, tempDir)
runResult, err := codeql.RunQuery(databasePath, job.QueryLanguage, queryPackPath, tempDir)
if err != nil {
return result, fmt.Errorf("failed to run analysis: %w", err)
}
@@ -107,21 +213,32 @@ func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
return result, fmt.Errorf("failed to generate results archive: %w", err)
}
// TODO: Upload the archive to storage
// Upload the archive to storage
slog.Debug("Results archive size", slog.Int("size", len(resultsArchive)))
resultsLocation, err := artifacts.SaveResult(job.Spec, resultsArchive)
if err != nil {
return result, fmt.Errorf("failed to save results archive: %w", err)
}
result = common.AnalyzeResult{
RequestId: job.RequestId,
ResultCount: runResult.ResultCount,
ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE", // TODO
Status: common.StatusSuccess,
result = queue.AnalyzeResult{
Spec: job.Spec,
ResultCount: runResult.ResultCount,
ResultLocation: resultsLocation,
Status: common.StatusSuccess,
SourceLocationPrefix: runResult.SourceLocationPrefix,
DatabaseSHA: runResult.DatabaseSHA,
}
return result, nil
}
// RunWorker runs a worker that processes jobs from queue
func RunWorker(ctx context.Context, stopChan chan struct{}, queue queue.Queue, wg *sync.WaitGroup) {
func RunWorker(ctx context.Context,
artifacts artifactstore.Store,
databases qldbstore.Store,
queue queue.Queue,
stopChan chan struct{},
wg *sync.WaitGroup) {
const (
WORKER_COUNT_STOP_MESSAGE = "Worker stopping due to reduction in worker count"
WORKER_CONTEXT_STOP_MESSAGE = "Worker stopping due to context cancellation"
@@ -144,7 +261,7 @@ func RunWorker(ctx context.Context, stopChan chan struct{}, queue queue.Queue, w
return
}
slog.Info("Running analysis job", slog.Any("job", job))
result, err := RunAnalysisJob(job)
result, err := RunAnalysisJob(job, artifacts, databases)
if err != nil {
slog.Error("Failed to run analysis job", slog.Any("error", err))
continue

View File

@@ -1,4 +1,13 @@
package agent
type Runner interface {
import (
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/queue"
)
type Visibles struct {
Queue queue.Queue
Artifacts artifactstore.Store
CodeQLDBStore qldbstore.Store
}

View File

@@ -0,0 +1,23 @@
package artifactstore
import (
"fmt"
"mrvacommander/pkg/common"
)
type ArtifactLocation struct {
// Data is a map of key-value pairs that describe the location of the artifact.
// For example, a simple key-value pair could be "path" -> "/path/to/artifact.tgz".
// Alternatively, a more complex example could be "bucket" -> "example", "key" -> "UNIQUE_ARTIFACT_IDENTIFIER".
Data map[string]string `json:"data"`
}
// deriveKeyFromSessionId generates a key for a query pack based on the job ID
func deriveKeyFromSessionId(sessionId int) string {
return fmt.Sprintf("%d", sessionId)
}
// deriveKeyFromJobSpec generates a key for a result based on the JobSpec
func deriveKeyFromJobSpec(jobSpec common.JobSpec) string {
return fmt.Sprintf("%d-%s", jobSpec.SessionID, jobSpec.NameWithOwner)
}

View File

@@ -0,0 +1,20 @@
package artifactstore
import "mrvacommander/pkg/common"
type Store interface {
// GetQueryPack retrieves the query pack from the specified location.
GetQueryPack(location ArtifactLocation) ([]byte, error)
// SaveQueryPack saves the query pack using the session ID and returns the artifact location.
SaveQueryPack(sessionId int, data []byte) (ArtifactLocation, error)
// GetResult retrieves the result from the specified location.
GetResult(location ArtifactLocation) ([]byte, error)
// GetResultSize retrieves the size of the result from the specified location.
GetResultSize(location ArtifactLocation) (int, error)
// SaveResult saves the result using the JobSpec and returns the artifact location.
SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error)
}

View File

@@ -0,0 +1,94 @@
package artifactstore
import (
"fmt"
"mrvacommander/pkg/common"
"sync"
)
// InMemoryArtifactStore is an in-memory implementation of the ArtifactStore interface
type InMemoryArtifactStore struct {
mu sync.Mutex
packs map[string][]byte
results map[string][]byte
}
func NewInMemoryArtifactStore() *InMemoryArtifactStore {
return &InMemoryArtifactStore{
packs: make(map[string][]byte),
results: make(map[string][]byte),
}
}
// GetQueryPack retrieves the query pack from the specified location
func (store *InMemoryArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) {
store.mu.Lock()
defer store.mu.Unlock()
key := location.Data["key"]
data, exists := store.packs[key]
if !exists {
return nil, fmt.Errorf("query pack not found: %s", key)
}
return data, nil
}
// SaveQueryPack saves the query pack using the session ID and returns the artifact location
func (store *InMemoryArtifactStore) SaveQueryPack(sessionId int, data []byte) (ArtifactLocation, error) {
store.mu.Lock()
defer store.mu.Unlock()
key := deriveKeyFromSessionId(sessionId)
store.packs[key] = data
location := ArtifactLocation{
Data: map[string]string{
"bucket": "packs",
"key": key,
},
}
return location, nil
}
// GetResult retrieves the result from the specified location
func (store *InMemoryArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) {
store.mu.Lock()
defer store.mu.Unlock()
key := location.Data["key"]
data, exists := store.results[key]
if !exists {
return nil, fmt.Errorf("result not found: %s", key)
}
return data, nil
}
// GetResultSize retrieves the size of the result from the specified location
func (store *InMemoryArtifactStore) GetResultSize(location ArtifactLocation) (int, error) {
store.mu.Lock()
defer store.mu.Unlock()
key := location.Data["key"]
data, exists := store.results[key]
if !exists {
return 0, fmt.Errorf("result not found: %s", key)
}
return len(data), nil
}
// SaveResult saves the result using the JobSpec and returns the artifact location
func (store *InMemoryArtifactStore) SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error) {
store.mu.Lock()
defer store.mu.Unlock()
key := deriveKeyFromJobSpec(jobSpec)
store.results[key] = data
location := ArtifactLocation{
Data: map[string]string{
"bucket": "results",
"key": key,
},
}
return location, nil
}

View File

@@ -0,0 +1,117 @@
package artifactstore
import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"math"
"mrvacommander/pkg/common"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const (
RESULTS_BUCKET_NAME = "results"
PACKS_BUCKET_NAME = "packs"
)
type MinIOArtifactStore struct {
client *minio.Client
}
func NewMinIOArtifactStore(endpoint, id, secret string) (*MinIOArtifactStore, error) {
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(id, secret, ""),
Secure: false,
})
if err != nil {
return nil, err
}
slog.Info("Connected to MinIO artifact store server")
// Create "results" bucket
if err := common.CreateMinIOBucketIfNotExists(minioClient, RESULTS_BUCKET_NAME); err != nil {
return nil, fmt.Errorf("could not create results bucket: %v", err)
}
// Create "packs" bucket
if err := common.CreateMinIOBucketIfNotExists(minioClient, PACKS_BUCKET_NAME); err != nil {
return nil, fmt.Errorf("could not create packs bucket: %v", err)
}
return &MinIOArtifactStore{
client: minioClient,
}, nil
}
func (store *MinIOArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) {
return store.getArtifact(location)
}
func (store *MinIOArtifactStore) SaveQueryPack(jobId int, data []byte) (ArtifactLocation, error) {
return store.saveArtifact(PACKS_BUCKET_NAME, deriveKeyFromSessionId(jobId), data, "application/gzip")
}
func (store *MinIOArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) {
return store.getArtifact(location)
}
func (store *MinIOArtifactStore) GetResultSize(location ArtifactLocation) (int, error) {
bucket := location.Data["bucket"]
key := location.Data["key"]
objectInfo, err := store.client.StatObject(context.Background(), bucket, key, minio.StatObjectOptions{})
if err != nil {
return 0, err
}
if objectInfo.Size > math.MaxInt32 {
return 0, fmt.Errorf("object size %d exceeds max int size", objectInfo.Size)
}
return int(objectInfo.Size), nil
}
func (store *MinIOArtifactStore) SaveResult(jobSpec common.JobSpec, data []byte) (ArtifactLocation, error) {
return store.saveArtifact(RESULTS_BUCKET_NAME, deriveKeyFromJobSpec(jobSpec), data, "application/zip")
}
func (store *MinIOArtifactStore) getArtifact(location ArtifactLocation) ([]byte, error) {
bucket := location.Data["bucket"]
key := location.Data["key"]
object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer object.Close()
data, err := io.ReadAll(object)
if err != nil {
return nil, err
}
return data, nil
}
func (store *MinIOArtifactStore) saveArtifact(bucket, key string, data []byte, contentType string) (ArtifactLocation, error) {
_, err := store.client.PutObject(context.Background(), bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: contentType,
})
if err != nil {
return ArtifactLocation{}, err
}
location := ArtifactLocation{
Data: map[string]string{
"bucket": bucket,
"key": key,
},
}
return location, nil
}

View File

@@ -1,10 +1,9 @@
package common
type JobInfo struct {
QueryLanguage string
CreatedAt string
UpdatedAt string
QueryLanguage string
CreatedAt string
UpdatedAt string
SkippedRepositories SkippedRepositories
}
@@ -16,8 +15,8 @@ type SkippedRepositories struct {
}
type AccessMismatchRepos struct {
RepositoryCount int `json:"repository_count"`
Repositories []string `json:"repositories"`
RepositoryCount int `json:"repository_count"`
Repositories []Repository `json:"repositories"`
}
type NotFoundRepos struct {
@@ -26,13 +25,13 @@ type NotFoundRepos struct {
}
type NoCodeqlDBRepos struct {
RepositoryCount int `json:"repository_count"`
Repositories []string `json:"repositories"`
RepositoryCount int `json:"repository_count"`
Repositories []Repository `json:"repositories"`
}
type OverLimitRepos struct {
RepositoryCount int `json:"repository_count"`
Repositories []string `json:"repositories"`
RepositoryCount int `json:"repository_count"`
Repositories []Repository `json:"repositories"`
}
type StatusResponse struct {

View File

@@ -1,4 +0,0 @@
package common
type Common interface {
}

29
pkg/common/jobspec.go Normal file
View File

@@ -0,0 +1,29 @@
package common
import (
"encoding/base64"
"encoding/json"
)
// EncodeJobSpec encodes a JobSpec into a base64-encoded string.
func EncodeJobSpec(jobSpec JobSpec) (string, error) {
data, err := json.Marshal(jobSpec)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(data), nil
}
// DecodeJobSpec decodes a base64-encoded string into a JobSpec.
func DecodeJobSpec(encoded string) (JobSpec, error) {
data, err := base64.URLEncoding.DecodeString(encoded)
if err != nil {
return JobSpec{}, err
}
var jobSpec JobSpec
err = json.Unmarshal(data, &jobSpec)
if err != nil {
return JobSpec{}, err
}
return jobSpec, nil
}

38
pkg/common/minio.go Normal file
View File

@@ -0,0 +1,38 @@
package common
import (
"context"
"log/slog"
"github.com/minio/minio-go/v7"
)
func CreateMinIOBucketIfNotExists(client *minio.Client, bucketName string) error {
ctx := context.Background()
exists, err := client.BucketExists(ctx, bucketName)
if err != nil {
return err
}
if !exists {
slog.Info("Creating bucket", "name", bucketName)
err = client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{})
if err != nil {
// The bucket might already exist at this stage if another component created it concurrently.
// For example, the server might have attempted to create it at the same time as the agent.
if err.(minio.ErrorResponse).Code == "BucketAlreadyOwnedByYou" {
slog.Info("Failed to create bucket because it already exists", "name", bucketName)
return nil
} else {
return err
}
} else {
slog.Info("Bucket created successfully", "name", bucketName)
}
} else {
slog.Info("Bucket already exists", "name", bucketName)
}
return nil
}

View File

@@ -6,26 +6,6 @@ type NameWithOwner struct {
Repo string
}
// AnalyzeJob represents a job specifying a repository and a query pack to analyze it with.
// This is the message format that the agent receives from the queue.
type AnalyzeJob struct {
RequestId int // json:"request_id"
QueryPackId int // json:"query_pack_id"
QueryPackURL string // json:"query_pack_url"
QueryLanguage string // json:"query_language"
NWO NameWithOwner // json:"nwo"
}
// AnalyzeResult represents the result of an analysis job.
// This is the message format that the agent sends to the queue.
// Status will only ever be StatusSuccess or StatusError when sent in a result.
type AnalyzeResult struct {
Status Status // json:"status"
RequestId int // json:"request_id"
ResultCount int // json:"result_count"
ResultArchiveURL string // json:"result_archive_url"
}
// Status represents the status of a job.
type Status int
@@ -55,6 +35,6 @@ func (s Status) ToExternalString() string {
}
type JobSpec struct {
JobID int
SessionID int
NameWithOwner
}

96
pkg/deploy/init.go Normal file
View File

@@ -0,0 +1,96 @@
package deploy
import (
"fmt"
"log"
"log/slog"
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/queue"
"os"
"strconv"
)
func validateEnvVars(requiredEnvVars []string) {
missing := false
for _, envVar := range requiredEnvVars {
if _, ok := os.LookupEnv(envVar); !ok {
slog.Error("Missing required environment variable", "key", envVar)
missing = true
}
}
if missing {
os.Exit(1)
}
}
func InitRabbitMQ(isAgent bool) (queue.Queue, error) {
requiredEnvVars := []string{
"MRVA_RABBITMQ_HOST",
"MRVA_RABBITMQ_PORT",
"MRVA_RABBITMQ_USER",
"MRVA_RABBITMQ_PASSWORD",
}
validateEnvVars(requiredEnvVars)
rmqHost := os.Getenv("MRVA_RABBITMQ_HOST")
rmqPort := os.Getenv("MRVA_RABBITMQ_PORT")
rmqUser := os.Getenv("MRVA_RABBITMQ_USER")
rmqPass := os.Getenv("MRVA_RABBITMQ_PASSWORD")
rmqPortAsInt, err := strconv.ParseInt(rmqPort, 10, 16)
if err != nil {
return nil, fmt.Errorf("failed to parse RabbitMQ port: %v", err)
}
log.Println("Initializing RabbitMQ queue")
rabbitMQQueue, err := queue.NewRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, isAgent)
if err != nil {
return nil, fmt.Errorf("failed to initialize RabbitMQ: %v", err)
}
return rabbitMQQueue, nil
}
func InitMinIOArtifactStore() (artifactstore.Store, error) {
requiredEnvVars := []string{
"ARTIFACT_MINIO_ENDPOINT",
"ARTIFACT_MINIO_ID",
"ARTIFACT_MINIO_SECRET",
}
validateEnvVars(requiredEnvVars)
endpoint := os.Getenv("ARTIFACT_MINIO_ENDPOINT")
id := os.Getenv("ARTIFACT_MINIO_ID")
secret := os.Getenv("ARTIFACT_MINIO_SECRET")
store, err := artifactstore.NewMinIOArtifactStore(endpoint, id, secret)
if err != nil {
return nil, fmt.Errorf("failed to initialize artifact store: %v", err)
}
return store, nil
}
func InitMinIOCodeQLDatabaseStore() (qldbstore.Store, error) {
requiredEnvVars := []string{
"QLDB_MINIO_ENDPOINT",
"QLDB_MINIO_ID",
"QLDB_MINIO_SECRET",
}
validateEnvVars(requiredEnvVars)
endpoint := os.Getenv("QLDB_MINIO_ENDPOINT")
id := os.Getenv("QLDB_MINIO_ID")
secret := os.Getenv("QLDB_MINIO_SECRET")
store, err := qldbstore.NewMinIOCodeQLDatabaseStore(endpoint, id, secret)
if err != nil {
return nil, fmt.Errorf("failed to initialize ql database storage: %v", err)
}
return store, nil
}

View File

@@ -1,4 +0,0 @@
package logger
type Logger interface {
}

View File

@@ -1,14 +0,0 @@
package logger
type LoggerSingle struct {
modules *Visibles
}
func NewLoggerSingle(v *Visibles) *LoggerSingle {
l := LoggerSingle{}
l.modules = v
return &l
}
type Visibles struct{}

View File

@@ -4,29 +4,25 @@ import (
"mrvacommander/pkg/common"
)
type DBLocation struct {
Prefix string
File string
type CodeQLDatabaseLocation struct {
// `data` is a map of key-value pairs that describe the location of the database.
// For example, a simple key-value pair could be "path" -> "/path/to/database.zip".
// A more complex implementation could be "bucket" -> "example", "key" -> "unique_identifier".
data map[string]string
}
type Storage interface {
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
analysisRepos *map[common.NameWithOwner]DBLocation)
}
type Visibles struct{}
type StorageQLDB struct{}
func NewStore(v *Visibles) (Storage, error) {
s := StorageQLDB{}
return &s, nil
}
func (s *StorageQLDB) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
not_found_repos []common.NameWithOwner,
analysisRepos *map[common.NameWithOwner]DBLocation) {
// TODO implement
return nil, nil
type Store interface {
// FindAvailableDBs returns a map of available databases for the requested analysisReposRequested.
// It also returns a list of repository NWOs that do not have available databases.
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
notFoundRepos []common.NameWithOwner,
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation)
// GetDatabase returns the database as a byte slice for the specified repository.
// A CodeQL database is a zip archive to be processed by the CodeQL CLI.
GetDatabase(location CodeQLDatabaseLocation) ([]byte, error)
// GetDatabaseByNWO returns the database location for the specified repository.
// FindAvailableDBs should be used in lieu of this method for checking database availability.
GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error)
}

View File

@@ -0,0 +1,66 @@
package qldbstore
import (
"fmt"
"mrvacommander/pkg/common"
"os"
"path/filepath"
)
type FilesystemCodeQLDatabaseStore struct {
basePath string
}
func NewLocalFilesystemCodeQLDatabaseStore(basePath string) *FilesystemCodeQLDatabaseStore {
return &FilesystemCodeQLDatabaseStore{
basePath: basePath,
}
}
func (store *FilesystemCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
notFoundRepos []common.NameWithOwner,
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) {
foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation)
for _, repo := range analysisReposRequested {
location, err := store.GetDatabaseLocationByNWO(repo)
if err != nil {
notFoundRepos = append(notFoundRepos, repo)
} else {
foundReposMap[repo] = location
}
}
return notFoundRepos, &foundReposMap
}
func (store *FilesystemCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) {
path, exists := location.data["path"]
if !exists {
return nil, fmt.Errorf("path not specified in location")
}
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return data, nil
}
func (store *FilesystemCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) {
filePath := filepath.Join(store.basePath, fmt.Sprintf("%s/%s/%s_%s_db.zip", nwo.Owner, nwo.Repo, nwo.Owner, nwo.Repo))
// Check if the file exists
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo)
}
location := CodeQLDatabaseLocation{
data: map[string]string{
"path": filePath,
},
}
return location, nil
}

View File

@@ -0,0 +1,98 @@
package qldbstore
import (
"context"
"fmt"
"io"
"log/slog"
"mrvacommander/pkg/common"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const bucketName = "qldb"
type MinIOCodeQLDatabaseStore struct {
client *minio.Client
bucketName string
}
func NewMinIOCodeQLDatabaseStore(endpoint, id, secret string) (*MinIOCodeQLDatabaseStore, error) {
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(id, secret, ""),
Secure: false,
})
if err != nil {
return nil, err
}
slog.Info("Connected to MinIO CodeQL database store server")
err = common.CreateMinIOBucketIfNotExists(minioClient, bucketName)
if err != nil {
return nil, fmt.Errorf("could not create bucket: %v", err)
}
return &MinIOCodeQLDatabaseStore{
client: minioClient,
bucketName: bucketName,
}, nil
}
func (store *MinIOCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
notFoundRepos []common.NameWithOwner,
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) {
foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation)
for _, repo := range analysisReposRequested {
location, err := store.GetDatabaseLocationByNWO(repo)
if err != nil {
notFoundRepos = append(notFoundRepos, repo)
} else {
foundReposMap[repo] = location
}
}
return notFoundRepos, &foundReposMap
}
func (store *MinIOCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) {
bucket := location.data["bucket"]
key := location.data["key"]
object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer object.Close()
data, err := io.ReadAll(object)
if err != nil {
return nil, err
}
return data, nil
}
func (store *MinIOCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) {
objectName := fmt.Sprintf("%s$%s.zip", nwo.Owner, nwo.Repo)
// Check if the object exists
_, err := store.client.StatObject(context.Background(), store.bucketName, objectName, minio.StatObjectOptions{})
if err != nil {
if minio.ToErrorResponse(err).Code == "NoSuchKey" {
return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo)
}
return CodeQLDatabaseLocation{}, err
}
location := CodeQLDatabaseLocation{
data: map[string]string{
"bucket": store.bucketName,
"key": objectName,
},
}
return location, nil
}

View File

@@ -1,5 +0,0 @@
package qpstore
type Storage interface {
SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error)
}

View File

@@ -1,16 +0,0 @@
package qpstore
type Visibles struct{}
type StorageQP struct{}
func NewStore(v *Visibles) (Storage, error) {
s := StorageQP{}
return &s, nil
}
func (s *StorageQP) SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error) {
// TODO implement
return "", nil
}

View File

@@ -1,14 +1,7 @@
package queue
import (
"mrvacommander/pkg/common"
"mrvacommander/pkg/storage"
)
type Queue interface {
Jobs() chan common.AnalyzeJob
Results() chan common.AnalyzeResult
StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation,
session_id int,
session_language string)
Jobs() chan AnalyzeJob
Results() chan AnalyzeResult
Close()
}

View File

@@ -1,31 +0,0 @@
package queue
import (
"log/slog"
"mrvacommander/pkg/common"
"mrvacommander/pkg/storage"
)
func (q *QueueSingle) Jobs() chan common.AnalyzeJob {
return q.jobs
}
func (q *QueueSingle) Results() chan common.AnalyzeResult {
return q.results
}
func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int,
session_language string) {
slog.Debug("Queueing codeql database analyze jobs")
for nwo := range *analysis_repos {
info := common.AnalyzeJob{
QueryPackId: session_id,
QueryLanguage: session_language,
NWO: nwo,
}
q.jobs <- info
storage.SetStatus(session_id, nwo, common.StatusQueued)
storage.AddJob(session_id, info)
}
}

View File

@@ -1,13 +1,9 @@
package queue
import (
"mrvacommander/pkg/common"
"mrvacommander/pkg/storage"
"context"
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
@@ -15,20 +11,20 @@ import (
)
type RabbitMQQueue struct {
jobs chan common.AnalyzeJob
results chan common.AnalyzeResult
jobs chan AnalyzeJob
results chan AnalyzeResult
conn *amqp.Connection
channel *amqp.Channel
}
// InitializeRabbitMQQueue initializes a RabbitMQ queue.
// NewRabbitMQQueue initializes a RabbitMQ queue.
// It returns a pointer to a RabbitMQQueue and an error.
//
// If isAgent is true, the queue is initialized to be used by an agent.
// Otherwise, the queue is initialized to be used by the server.
// The difference in behaviour is that the agent consumes jobs and publishes results,
// while the server publishes jobs and consumes results.
func InitializeRabbitMQQueue(
func NewRabbitMQQueue(
host string,
port int16,
user string,
@@ -94,8 +90,8 @@ func InitializeRabbitMQQueue(
result := RabbitMQQueue{
conn: conn,
channel: ch,
jobs: make(chan common.AnalyzeJob),
results: make(chan common.AnalyzeResult),
jobs: make(chan AnalyzeJob),
results: make(chan AnalyzeResult),
}
if isAgent {
@@ -115,19 +111,14 @@ func InitializeRabbitMQQueue(
return &result, nil
}
func (q *RabbitMQQueue) Jobs() chan common.AnalyzeJob {
func (q *RabbitMQQueue) Jobs() chan AnalyzeJob {
return q.jobs
}
func (q *RabbitMQQueue) Results() chan common.AnalyzeResult {
func (q *RabbitMQQueue) Results() chan AnalyzeResult {
return q.results
}
func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) {
// TODO: Implement
log.Fatal("unimplemented")
}
func (q *RabbitMQQueue) Close() {
q.channel.Close()
q.conn.Close()
@@ -140,7 +131,7 @@ func (q *RabbitMQQueue) ConsumeJobs(queueName string) {
}
for msg := range msgs {
job := common.AnalyzeJob{}
job := AnalyzeJob{}
err := json.Unmarshal(msg.Body, &job)
if err != nil {
slog.Error("failed to unmarshal job", slog.Any("error", err))
@@ -157,7 +148,7 @@ func (q *RabbitMQQueue) PublishResults(queueName string) {
}
}
func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeResult) {
func (q *RabbitMQQueue) publishResult(queueName string, result AnalyzeResult) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@@ -178,7 +169,7 @@ func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeRes
}
}
func (q *RabbitMQQueue) publishJob(queueName string, job common.AnalyzeJob) {
func (q *RabbitMQQueue) publishJob(queueName string, job AnalyzeJob) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@@ -212,7 +203,7 @@ func (q *RabbitMQQueue) ConsumeResults(queueName string) {
}
for msg := range msgs {
result := common.AnalyzeResult{}
result := AnalyzeResult{}
err := json.Unmarshal(msg.Body, &result)
if err != nil {
slog.Error("failed to unmarshal result", slog.Any("error", err))

29
pkg/queue/queue_single.go Normal file
View File

@@ -0,0 +1,29 @@
package queue
type QueueSingle struct {
NumWorkers int
jobs chan AnalyzeJob
results chan AnalyzeResult
}
func NewQueueSingle(numWorkers int) Queue {
q := QueueSingle{
NumWorkers: numWorkers,
jobs: make(chan AnalyzeJob, 10),
results: make(chan AnalyzeResult, 10),
}
return q
}
func (q QueueSingle) Jobs() chan AnalyzeJob {
return q.jobs
}
func (q QueueSingle) Results() chan AnalyzeResult {
return q.results
}
func (q QueueSingle) Close() {
close(q.jobs)
close(q.results)
}

View File

@@ -1,28 +1,28 @@
package queue
import (
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/common"
"mrvacommander/pkg/logger"
)
type QueueSingle struct {
NumWorkers int
jobs chan common.AnalyzeJob
results chan common.AnalyzeResult
modules *Visibles
// AnalyzeJob represents a job specifying a repository and a query pack to analyze it with.
// This is the message format that the agent receives from the queue.
// TODO: make query_pack_location query_pack_url with a presigned URL
type AnalyzeJob struct {
Spec common.JobSpec // json:"job_spec"
QueryPackLocation artifactstore.ArtifactLocation // json:"query_pack_location"
QueryLanguage string // json:"query_language"
}
type Visibles struct {
Logger logger.Logger
}
func NewQueueSingle(numWorkers int, v *Visibles) *QueueSingle {
q := QueueSingle{}
q.jobs = make(chan common.AnalyzeJob, 10)
q.results = make(chan common.AnalyzeResult, 10)
q.NumWorkers = numWorkers
q.modules = v
return &q
// AnalyzeResult represents the result of an analysis job.
// This is the message format that the agent sends to the queue.
// Status will only ever be StatusSuccess or StatusError when sent in a result.
// TODO: make result_location result_archive_url with a presigned URL
type AnalyzeResult struct {
Spec common.JobSpec // json:"job_spec"
Status common.Status // json:"status"
ResultCount int // json:"result_count"
ResultLocation artifactstore.ArtifactLocation // json:"result_location"
SourceLocationPrefix string // json:"source_location_prefix"
DatabaseSHA string // json:"database_sha"
}

View File

@@ -2,13 +2,13 @@ package server
import "net/http"
type Commander interface{}
type CommanderAPI interface {
MRVARequestID(w http.ResponseWriter, r *http.Request)
MRVARequest(w http.ResponseWriter, r *http.Request)
RootHandler(w http.ResponseWriter, r *http.Request)
MRVAStatusID(w http.ResponseWriter, r *http.Request)
MRVAStatus(w http.ResponseWriter, r *http.Request)
MRVADownloadArtifactID(w http.ResponseWriter, r *http.Request)
MRVADownloadArtifact(w http.ResponseWriter, r *http.Request)
MRVADownloadServe(w http.ResponseWriter, r *http.Request)
}

View File

@@ -7,84 +7,159 @@ import (
"errors"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"path/filepath"
"os"
"strconv"
"strings"
"time"
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/common"
"mrvacommander/pkg/storage"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/queue"
"mrvacommander/utils"
"github.com/gorilla/mux"
)
func (c *CommanderSingle) startAnalyses(
analysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation,
queryPackLocation artifactstore.ArtifactLocation,
sessionId int,
queryLanguage string) {
slog.Debug("Queueing analysis jobs", "count", len(*analysisRepos))
for nwo := range *analysisRepos {
jobSpec := common.JobSpec{
SessionID: sessionId,
NameWithOwner: nwo,
}
info := queue.AnalyzeJob{
Spec: jobSpec,
QueryPackLocation: queryPackLocation,
QueryLanguage: queryLanguage,
}
c.v.Queue.Jobs() <- info
c.v.State.SetStatus(jobSpec, common.StatusQueued)
c.v.State.AddJob(info)
}
}
func setupEndpoints(c CommanderAPI) {
r := mux.NewRouter()
//
// First are the API endpoints that mirror those used in the github API
//
r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses", c.MRVARequest)
// /repos/hohn /mrva-controller/code-scanning/codeql/variant-analyses
// Or via
r.HandleFunc("/{repository_id}/code-scanning/codeql/variant-analyses", c.MRVARequestID)
// Root handler
r.HandleFunc("/", c.RootHandler)
// This is the standalone status request.
// It's also the first request made when downloading; the difference is on the
// client side's handling.
// Endpoints for submitting new analyses
r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses", c.MRVARequest)
r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses", c.MRVARequestID)
// Endpoints for status requests
// This is also the first request made when downloading; the difference is in the client-side handling.
r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}", c.MRVAStatus)
r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}", c.MRVAStatusID)
// Endpoints for downloading artifacts
r.HandleFunc("/repos/{controller_owner}/{controller_repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}/repos/{repo_owner}/{repo_name}", c.MRVADownloadArtifact)
r.HandleFunc("/repositories/{controller_repo_id}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}/repositories/{repository_id}", c.MRVADownloadArtifactID)
// Not implemented:
// r.HandleFunc("/codeql-query-console/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}/{owner_id}/{controller_repo_id}", MRVADownLoad3)
// r.HandleFunc("/github-codeql-query-console-prod/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}", MRVADownLoad4)
// Endpoint to serve downloads using encoded JobSpec
r.HandleFunc("/download/{encoded_job_spec}", c.MRVADownloadServe)
//
// Now some support API endpoints
//
r.HandleFunc("/download-server/{local_path:.*}", c.MRVADownloadServe)
// Handler for unhandled endpoints
r.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Error("Unhandled endpoint", "method", r.Method, "uri", r.RequestURI)
http.Error(w, "Not Found", http.StatusNotFound)
})
//
// Bind to a port and pass our router in
//
// TODO make this a configuration entry
log.Fatal(http.ListenAndServe(":8080", r))
go ListenAndServe(r)
}
func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo, vaid int) {
slog.Debug("Submitting status response", "session", vaid)
func ListenAndServe(r *mux.Router) {
// Bind to a port and pass our router in
// The port is configurable via environment variable or default to 8080
port := os.Getenv("SERVER_PORT")
if port == "" {
port = "8080"
}
all_scanned := []common.ScannedRepo{}
jobs := storage.GetJobList(js.JobID)
for _, job := range jobs {
astat := storage.GetStatus(js.JobID, job.NWO).ToExternalString()
all_scanned = append(all_scanned,
err := http.ListenAndServe(":"+port, r)
if err != nil {
slog.Error("Error starting server:", "error", err)
os.Exit(1)
}
}
func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo) {
slog.Debug("Submitting status response", "job_id", js.SessionID)
scannedRepos := []common.ScannedRepo{}
jobs, err := c.v.State.GetJobList(js.SessionID)
if err != nil {
slog.Error("Error getting job list", "error", err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
// Loop through all jobs under the same session id
// TODO: as a high priority, fix this hacky job IDing by index
// this may break with other state implementations
for jobRepoId, job := range jobs {
// Get the job status
status, err := c.v.State.GetStatus(job.Spec)
if err != nil {
slog.Error("Error getting status", "error", err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
// Get the job result
result, err := c.v.State.GetResult(job.Spec)
if err != nil {
slog.Error("Error getting result", "error", err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
// Get the job result artifact size
artifactSize, err := c.v.Artifacts.GetResultSize(result.ResultLocation)
if err != nil {
slog.Error("Error getting artifact size", "error", err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
// Append all scanned (complete and incomplete) repos to the response
scannedRepos = append(scannedRepos,
common.ScannedRepo{
Repository: common.Repository{
ID: 0,
Name: job.NWO.Repo,
FullName: fmt.Sprintf("%s/%s", job.NWO.Owner, job.NWO.Repo),
ID: jobRepoId,
Name: job.Spec.Repo,
FullName: fmt.Sprintf("%s/%s", job.Spec.Owner, job.Spec.Repo),
Private: false,
StargazersCount: 0,
UpdatedAt: ji.UpdatedAt,
},
AnalysisStatus: astat,
ResultCount: 123, // FIXME 123 is a lie so the client downloads
ArtifactSizeBytes: 123, // FIXME
AnalysisStatus: status.ToExternalString(),
ResultCount: result.ResultCount,
ArtifactSizeBytes: int(artifactSize),
},
)
}
astat := storage.GetStatus(js.JobID, js.NameWithOwner).ToExternalString()
jobStatus, err := c.v.State.GetStatus(js)
if err != nil {
slog.Error("Error getting job status", "error", err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
status := common.StatusResponse{
SessionId: js.JobID,
SessionId: js.SessionID,
ControllerRepo: common.ControllerRepo{},
Actor: common.Actor{},
QueryLanguage: ji.QueryLanguage,
@@ -92,8 +167,8 @@ func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpe
CreatedAt: ji.CreatedAt,
UpdatedAt: ji.UpdatedAt,
ActionsWorkflowRunID: 0, // FIXME
Status: astat,
ScannedRepositories: all_scanned,
Status: jobStatus.ToExternalString(),
ScannedRepositories: scannedRepos,
SkippedRepositories: ji.SkippedRepositories,
}
@@ -115,111 +190,195 @@ func (c *CommanderSingle) RootHandler(w http.ResponseWriter, r *http.Request) {
slog.Info("Request on /")
}
func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
func (c *CommanderSingle) MRVAStatusCommon(w http.ResponseWriter, r *http.Request, owner, repo string, variantAnalysisID string) {
slog.Info("MRVA status request for ",
"owner", vars["owner"],
"repo", vars["repo"],
"codeql_variant_analysis_id", vars["codeql_variant_analysis_id"])
id, err := strconv.Atoi(vars["codeql_variant_analysis_id"])
"owner", owner,
"repo", repo,
"codeql_variant_analysis_id", variantAnalysisID)
id, err := strconv.ParseInt(variantAnalysisID, 10, 32)
if err != nil {
slog.Error("Variant analysis is is not integer", "id",
vars["codeql_variant_analysis_id"])
http.Error(w, err.Error(), http.StatusInternalServerError)
slog.Error("Variant analysis ID is not integer", "id",
variantAnalysisID)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
spec, err := c.v.State.GetJobList(int(id))
if err != nil || len(spec) == 0 {
msg := "No jobs found for given session id"
slog.Error(msg, "id", variantAnalysisID)
http.Error(w, msg, http.StatusNotFound)
return
}
// The status reports one status for all jobs belonging to an id.
// So we simply report the status of a job as the status of all.
spec := storage.GetJobList(id)
if spec == nil {
msg := "No jobs found for given job id"
slog.Error(msg, "id", vars["codeql_variant_analysis_id"])
http.Error(w, msg, http.StatusUnprocessableEntity)
job := spec[0]
jobInfo, err := c.v.State.GetJobInfo(job.Spec)
if err != nil {
msg := "No job info found for given session id"
slog.Error(msg, "id", variantAnalysisID)
http.Error(w, msg, http.StatusBadRequest)
return
}
job := spec[0]
c.submitStatusResponse(w, job.Spec, jobInfo)
}
js := common.JobSpec{
JobID: job.QueryPackId,
NameWithOwner: job.NWO,
}
func (c *CommanderSingle) MRVAStatusID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("MRVA status request (MRVAStatusID)")
// Mapping to unused/unused and passing variant analysis id
c.MRVAStatusCommon(w, r, "unused", "unused", vars["codeql_variant_analysis_id"])
}
ji := storage.GetJobInfo(js)
c.StatusResponse(w, js, ji, id)
func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("MRVA status request (MRVAStatus)")
// Mapping to owner/repo and passing variant analysis id
c.MRVAStatusCommon(w, r, vars["owner"], vars["repo"], vars["codeql_variant_analysis_id"])
}
// Download artifacts
func (c *CommanderSingle) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("MRVA artifact download",
"controller_owner", vars["controller_owner"],
"controller_repo", vars["controller_repo"],
"codeql_variant_analysis_id", vars["codeql_variant_analysis_id"],
"repo_owner", vars["repo_owner"],
"repo_name", vars["repo_name"],
func (c *CommanderSingle) MRVADownloadArtifactCommon(w http.ResponseWriter, r *http.Request, jobRepoId int, jobSpec common.JobSpec) {
slog.Debug("MRVA artifact download",
"codeql_variant_analysis_id", jobSpec.SessionID,
"repo_owner", jobSpec.NameWithOwner.Owner,
"repo_name", jobSpec.NameWithOwner.Repo,
)
vaid, err := strconv.Atoi(vars["codeql_variant_analysis_id"])
c.sendDownloadResponse(w, jobRepoId, jobSpec)
}
func (c *CommanderSingle) MRVADownloadArtifactID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Debug("MRVA artifact download", "id", vars["codeql_variant_analysis_id"], "repo_id", vars["repository_id"])
sessionId, err := strconv.ParseInt(vars["codeql_variant_analysis_id"], 10, 32)
if err != nil {
slog.Error("Variant analysis is is not integer", "id",
vars["codeql_variant_analysis_id"])
http.Error(w, err.Error(), http.StatusInternalServerError)
slog.Error("Variant analysis ID is not an integer", "id", vars["codeql_variant_analysis_id"])
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
js := common.JobSpec{
JobID: vaid,
repoId, err := strconv.ParseInt(vars["repository_id"], 10, 32)
if err != nil {
slog.Error("Repository ID is not an integer", "id", vars["repository_id"])
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
jobSpec, err := c.v.State.GetJobSpecByRepoId(int(sessionId), int(repoId))
if err != nil {
slog.Error("Failed to get job spec by repo ID", "error", err)
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
c.MRVADownloadArtifactCommon(w, r, int(repoId), jobSpec)
}
func (c *CommanderSingle) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
sessionId, err := strconv.ParseInt(vars["codeql_variant_analysis_id"], 10, 32)
if err != nil {
slog.Error("Variant analysis ID is not an integer", "id", vars["codeql_variant_analysis_id"])
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
jobSpec := common.JobSpec{
SessionID: int(sessionId),
NameWithOwner: common.NameWithOwner{
Owner: vars["repo_owner"],
Repo: vars["repo_name"],
},
}
c.DownloadResponse(w, js, vaid)
// TODO: THIS IS BROKEN UNLESS REPO ID IS IGNORED
c.MRVADownloadArtifactCommon(w, r, -1, jobSpec)
}
func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobSpec, vaid int) {
slog.Debug("Forming download response", "session", vaid, "job", js)
func (c *CommanderSingle) sendDownloadResponse(w http.ResponseWriter, jobRepoId int, jobSpec common.JobSpec) {
var response common.DownloadResponse
astat := storage.GetStatus(vaid, js.NameWithOwner)
slog.Debug("Forming download response", "job", jobSpec)
var dlr common.DownloadResponse
if astat == common.StatusSuccess {
jobStatus, err := c.v.State.GetStatus(jobSpec)
if err != nil {
slog.Error(err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
au, err := storage.ArtifactURL(js, vaid)
if jobStatus == common.StatusSuccess {
jobResult, err := c.v.State.GetResult(jobSpec)
if err != nil {
slog.Error(err.Error())
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}
jobResultData, err := c.v.Artifacts.GetResult(jobResult.ResultLocation)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
dlr = common.DownloadResponse{
// Generate the artifact URL
encodedJobSpec, err := common.EncodeJobSpec(jobSpec)
if err != nil {
http.Error(w, "Failed to encode job spec", http.StatusInternalServerError)
return
}
host := os.Getenv("SERVER_HOST")
if host == "" {
host = "localhost"
}
port := os.Getenv("SERVER_PORT")
if port == "" {
port = "8080"
}
artifactURL := fmt.Sprintf("http://%s:%s/download/%s", host, port, encodedJobSpec)
response = common.DownloadResponse{
Repository: common.DownloadRepo{
Name: js.Repo,
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
// TODO: fix jobRepoID coming from the NWO path. The MRVA extension uses repo ID.
ID: jobRepoId,
Name: jobSpec.Repo,
FullName: fmt.Sprintf("%s/%s", jobSpec.Owner, jobSpec.Repo),
},
AnalysisStatus: astat.ToExternalString(),
ResultCount: 123, // FIXME
ArtifactSizeBytes: 123, // FIXME
DatabaseCommitSha: "do-we-use-dcs-p",
SourceLocationPrefix: "do-we-use-slp-p",
ArtifactURL: au,
AnalysisStatus: jobStatus.ToExternalString(),
ResultCount: jobResult.ResultCount,
ArtifactSizeBytes: len(jobResultData),
DatabaseCommitSha: jobResult.DatabaseSHA,
SourceLocationPrefix: jobResult.SourceLocationPrefix,
ArtifactURL: artifactURL,
}
} else {
dlr = common.DownloadResponse{
response = common.DownloadResponse{
Repository: common.DownloadRepo{
Name: js.Repo,
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
// TODO: fix jobRepoID coming from the NWO path. The MRVA extension uses repo ID.
ID: jobRepoId,
Name: jobSpec.Repo,
FullName: fmt.Sprintf("%s/%s", jobSpec.Owner, jobSpec.Repo),
},
AnalysisStatus: astat.ToExternalString(),
AnalysisStatus: jobStatus.ToExternalString(),
ResultCount: 0,
ArtifactSizeBytes: 0,
DatabaseCommitSha: "",
SourceLocationPrefix: "/not/relevant/here",
SourceLocationPrefix: "",
ArtifactURL: "",
}
}
// Encode the response as JSON
jdlr, err := json.Marshal(dlr)
responseJson, err := json.Marshal(response)
if err != nil {
slog.Error("Error encoding response as JSON:",
"error", err)
@@ -229,75 +388,74 @@ func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobS
// Send analysisReposJSON via ResponseWriter
w.Header().Set("Content-Type", "application/json")
w.Write(jdlr)
w.Write(responseJson)
}
func (c *CommanderSingle) MRVADownloadServe(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("File download request", "local_path", vars["local_path"])
encodedJobSpec := vars["encoded_job_spec"]
FileDownload(w, vars["local_path"])
}
func FileDownload(w http.ResponseWriter, path string) {
slog.Debug("Sending zip file with .sarif/.bqrs", "path", path)
fpath, res, err := storage.ResultAsFile(path)
jobSpec, err := common.DecodeJobSpec(encodedJobSpec)
if err != nil {
http.Error(w, "Failed to read results", http.StatusInternalServerError)
http.Error(w, "Invalid job spec", http.StatusBadRequest)
return
}
// Set headers
fname := filepath.Base(fpath)
w.Header().Set("Content-Disposition", "attachment; filename="+fname)
slog.Info("Result download request", "job_spec", jobSpec)
result, err := c.v.State.GetResult(jobSpec)
if err != nil {
slog.Error("Failed to get result", "error", err)
http.Error(w, "Failed to get result", http.StatusInternalServerError)
return
}
slog.Debug("Result location", "location", result.ResultLocation)
data, err := c.v.Artifacts.GetResult(result.ResultLocation)
if err != nil {
slog.Error("Failed to retrieve artifact", "error", err)
http.Error(w, "Failed to retrieve artifact", http.StatusInternalServerError)
return
}
// Send the file as a response
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(data)
}
// Copy the file contents to the response writer
rdr := bytes.NewReader(res)
_, err = io.Copy(w, rdr)
func (c *CommanderSingle) MRVARequestCommon(w http.ResponseWriter, r *http.Request) {
sessionId := c.v.State.NextID()
slog.Info("New MRVA Request", "id", fmt.Sprint(sessionId))
queryLanguage, repoNWOs, queryPackLocation, err := c.collectRequestInfo(w, r, sessionId)
if err != nil {
http.Error(w, "Failed to send file", http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
slog.Debug("Uploaded file", "path", fpath)
slog.Debug("Processed request info", "location", queryPackLocation, "language", queryLanguage)
}
// TODO This returns 0 analysisRepos. 2024/06/19 02:26:47 DEBUG Queueing analysis jobs count=0
notFoundRepos, analysisRepos := c.v.CodeQLDBStore.FindAvailableDBs(repoNWOs)
func (c *CommanderSingle) MRVARequestID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("New mrva using repository_id=%v\n", vars["repository_id"])
}
func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
slog.Info("New mrva run ", "owner", vars["owner"], "repo", vars["repo"])
session_id := c.vis.ServerStore.NextID()
session_owner := vars["owner"]
session_controller_repo := vars["repo"]
slog.Info("new run", "id: ", fmt.Sprint(session_id), session_owner, session_controller_repo)
session_language, session_repositories, session_tgz_ref, err := c.collectRequestInfo(w, r, session_id)
if err != nil {
return
if len(*analysisRepos) == 0 {
slog.Warn("No repositories found for analysis")
}
not_found_repos, analysisRepos := c.vis.ServerStore.FindAvailableDBs(session_repositories)
c.vis.Queue.StartAnalyses(analysisRepos, session_id, session_language)
// XX: session_is is separate from the query pack ref. Value may be equal
c.startAnalyses(analysisRepos, queryPackLocation, sessionId, queryLanguage)
si := SessionInfo{
ID: session_id,
Owner: session_owner,
ControllerRepo: session_controller_repo,
ID: sessionId,
Owner: "unused",
ControllerRepo: "unused",
QueryPack: session_tgz_ref,
Language: session_language,
Repositories: session_repositories,
QueryPack: strconv.Itoa(sessionId), // TODO
Language: queryLanguage,
Repositories: repoNWOs,
AccessMismatchRepos: nil, /* FIXME */
NotFoundRepos: not_found_repos,
NotFoundRepos: notFoundRepos,
NoCodeqlDBRepos: nil, /* FIXME */
OverLimitRepos: nil, /* FIXME */
@@ -305,8 +463,9 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
}
slog.Debug("Forming and sending response for submitted analysis job", "id", si.ID)
submit_response, err := submit_response(si)
submit_response, err := c.submitResponse(si)
if err != nil {
slog.Error("Error forming submit response", "error", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -315,6 +474,16 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
w.Write(submit_response)
}
func (c *CommanderSingle) MRVARequestID(w http.ResponseWriter, r *http.Request) {
slog.Debug("MRVARequestID")
c.MRVARequestCommon(w, r)
}
func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
slog.Debug("MRVARequest")
c.MRVARequestCommon(w, r)
}
func nwoToNwoStringArray(nwo []common.NameWithOwner) ([]string, int) {
repos := []string{}
count := len(nwo)
@@ -324,111 +493,145 @@ func nwoToNwoStringArray(nwo []common.NameWithOwner) ([]string, int) {
return repos, count
}
func submit_response(sn SessionInfo) ([]byte, error) {
func nwoToDummyRepositoryArray(nwo []common.NameWithOwner) ([]common.Repository, int) {
repos := []common.Repository{}
for _, repo := range nwo {
repos = append(repos, common.Repository{
ID: -1,
Name: repo.Repo,
FullName: fmt.Sprintf("%s/%s", repo.Owner, repo.Repo),
Private: false,
StargazersCount: 0,
UpdatedAt: time.Now().Format(time.RFC3339),
})
}
count := len(nwo)
return repos, count
}
// ConsumeResults moves results from 'queue' to server 'state'
func (c *CommanderSingle) ConsumeResults() {
slog.Info("Started server results consumer.")
for {
r := <-c.v.Queue.Results()
slog.Debug("Result consumed:", "r", r, "status", r.Status.ToExternalString())
c.v.State.SetResult(r.Spec, r)
c.v.State.SetStatus(r.Spec, r.Status)
}
}
func (c *CommanderSingle) submitResponse(si SessionInfo) ([]byte, error) {
// Construct the response bottom-up
var m_cr common.ControllerRepo
var m_ac common.Actor
var controllerRepo common.ControllerRepo
var actor common.Actor
repos, count := nwoToNwoStringArray(sn.NotFoundRepos)
r_nfr := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repos}
repoNames, count := nwoToNwoStringArray(si.NotFoundRepos)
notFoundRepos := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repoNames}
repos, count = nwoToNwoStringArray(sn.AccessMismatchRepos)
r_amr := common.AccessMismatchRepos{RepositoryCount: count, Repositories: repos}
repos, _ := nwoToDummyRepositoryArray(si.AccessMismatchRepos)
accessMismatchRepos := common.AccessMismatchRepos{RepositoryCount: count, Repositories: repos}
repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos)
r_ncd := common.NoCodeqlDBRepos{RepositoryCount: count, Repositories: repos}
repos, _ = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
noCodeQLDBRepos := common.NoCodeqlDBRepos{RepositoryCount: count, Repositories: repos}
// TODO fill these with real values?
repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos)
r_olr := common.OverLimitRepos{RepositoryCount: count, Repositories: repos}
repos, _ = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
overlimitRepos := common.OverLimitRepos{RepositoryCount: count, Repositories: repos}
m_skip := common.SkippedRepositories{
AccessMismatchRepos: r_amr,
NotFoundRepos: r_nfr,
NoCodeqlDBRepos: r_ncd,
OverLimitRepos: r_olr}
skippedRepositories := common.SkippedRepositories{
AccessMismatchRepos: accessMismatchRepos,
NotFoundRepos: notFoundRepos,
NoCodeqlDBRepos: noCodeQLDBRepos,
OverLimitRepos: overlimitRepos}
m_sr := common.SubmitResponse{
Actor: m_ac,
ControllerRepo: m_cr,
ID: sn.ID,
QueryLanguage: sn.Language,
QueryPackURL: sn.QueryPack,
response := common.SubmitResponse{
Actor: actor,
ControllerRepo: controllerRepo,
ID: si.ID,
QueryLanguage: si.Language,
QueryPackURL: si.QueryPack,
CreatedAt: time.Now().Format(time.RFC3339),
UpdatedAt: time.Now().Format(time.RFC3339),
Status: "in_progress",
SkippedRepositories: m_skip,
SkippedRepositories: skippedRepositories,
}
// Store data needed later
joblist := storage.GetJobList(sn.ID)
joblist, err := c.v.State.GetJobList(si.ID)
if err != nil {
slog.Error("Error getting job list", "error", err.Error())
return nil, err
}
for _, job := range joblist {
storage.SetJobInfo(common.JobSpec{
JobID: sn.ID,
NameWithOwner: job.NWO,
c.v.State.SetJobInfo(common.JobSpec{
SessionID: si.ID,
NameWithOwner: job.Spec.NameWithOwner,
}, common.JobInfo{
QueryLanguage: sn.Language,
CreatedAt: m_sr.CreatedAt,
UpdatedAt: m_sr.UpdatedAt,
SkippedRepositories: m_skip,
QueryLanguage: si.Language,
CreatedAt: response.CreatedAt,
UpdatedAt: response.UpdatedAt,
SkippedRepositories: skippedRepositories,
},
)
}
// Encode the response as JSON
submit_response, err := json.Marshal(m_sr)
responseJson, err := json.Marshal(response)
if err != nil {
slog.Warn("Error encoding response as JSON:", err)
slog.Error("Error encoding response as JSON", "err", err)
return nil, err
}
return submit_response, nil
return responseJson, nil
}
func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, string, error) {
func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, artifactstore.ArtifactLocation, error) {
slog.Debug("Collecting session info")
if r.Body == nil {
err := errors.New("missing request body")
log.Println(err)
slog.Error("Error reading MRVA submission body", "error", err)
http.Error(w, err.Error(), http.StatusNoContent)
return "", []common.NameWithOwner{}, "", err
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
}
buf, err := io.ReadAll(r.Body)
if err != nil {
var w http.ResponseWriter
slog.Error("Error reading MRVA submission body", "error", err.Error())
slog.Error("Error reading MRVA submission body", "error", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return "", []common.NameWithOwner{}, "", err
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
}
msg, err := TrySubmitMsg(buf)
if err != nil {
// Unknown message
slog.Error("Unknown MRVA submission body format")
http.Error(w, err.Error(), http.StatusBadRequest)
return "", []common.NameWithOwner{}, "", err
}
// Decompose the SubmitMsg and keep information
// Save the query pack and keep the location
if !isBase64Gzip([]byte(msg.QueryPack)) {
msg, err := tryParseSubmitMsg(buf)
if err != nil {
slog.Error("Unknown MRVA submission body format", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
}
// 1. Save the query pack and keep the location
if !utils.IsBase64Gzip([]byte(msg.QueryPack)) {
slog.Error("MRVA submission body querypack has invalid format")
err := errors.New("MRVA submission body querypack has invalid format")
http.Error(w, err.Error(), http.StatusBadRequest)
return "", []common.NameWithOwner{}, "", err
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
}
session_tgz_ref, err := c.extract_tgz(msg.QueryPack, sessionId)
queryPackLocation, err := c.processQueryPackArchive(msg.QueryPack, sessionId)
if err != nil {
slog.Error("Error processing query pack archive", "error", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return "", []common.NameWithOwner{}, "", err
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
}
// 2. Save the language
session_language := msg.Language
sessionLanguage := msg.Language
// 3. Save the repositories
var session_repositories []common.NameWithOwner
var sessionRepos []common.NameWithOwner
for _, v := range msg.Repositories {
t := strings.Split(v, "/")
@@ -437,14 +640,15 @@ func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Requ
slog.Error(err, "entry", t)
http.Error(w, err, http.StatusBadRequest)
}
session_repositories = append(session_repositories,
sessionRepos = append(sessionRepos,
common.NameWithOwner{Owner: t[0], Repo: t[1]})
}
return session_language, session_repositories, session_tgz_ref, nil
return sessionLanguage, sessionRepos, queryPackLocation, nil
}
// Try to extract a SubmitMsg from a json-encoded buffer
func TrySubmitMsg(buf []byte) (common.SubmitMsg, error) {
func tryParseSubmitMsg(buf []byte) (common.SubmitMsg, error) {
buf1 := make([]byte, len(buf))
copy(buf1, buf)
dec := json.NewDecoder(bytes.NewReader(buf1))
@@ -454,32 +658,7 @@ func TrySubmitMsg(buf []byte) (common.SubmitMsg, error) {
return m, err
}
// Some important payloads can be listed via
// base64 -d < foo1 | gunzip | tar t|head -20
//
// This function checks the request body up to the `gunzip` part.
func isBase64Gzip(val []byte) bool {
if len(val) >= 4 {
// Extract header
hdr := make([]byte, base64.StdEncoding.DecodedLen(4))
_, err := base64.StdEncoding.Decode(hdr, []byte(val[0:4]))
if err != nil {
log.Println("WARNING: IsBase64Gzip decode error:", err)
return false
}
// Check for gzip heading
magic := []byte{0x1f, 0x8b}
if bytes.Equal(hdr[0:2], magic) {
return true
} else {
return false
}
} else {
return false
}
}
func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error) {
func (c *CommanderSingle) processQueryPackArchive(qp string, sessionId int) (artifactstore.ArtifactLocation, error) {
// These are decoded manually via
// base64 -d < foo1 | gunzip | tar t | head -20
// base64 decode the body
@@ -487,14 +666,15 @@ func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error)
tgz, err := base64.StdEncoding.DecodeString(qp)
if err != nil {
slog.Error("querypack body decoding error:", err)
return "", err
slog.Error("Failed to decode query pack body", "err", err)
return artifactstore.ArtifactLocation{}, err
}
session_query_pack_tgz_filepath, err := c.vis.ServerStore.SaveQueryPack(tgz, sessionID)
artifactLocation, err := c.v.Artifacts.SaveQueryPack(sessionId, tgz)
if err != nil {
return "", err
slog.Error("Failed to save query pack", "err", err)
return artifactstore.ArtifactLocation{}, err
}
return session_query_pack_tgz_filepath, err
return artifactLocation, err
}

View File

@@ -1,57 +1,41 @@
package server
import (
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/common"
"mrvacommander/pkg/logger"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/qpstore"
"mrvacommander/pkg/queue"
"mrvacommander/pkg/storage"
"mrvacommander/pkg/state"
)
type SessionInfo struct {
ID int
Owner string
ControllerRepo string
QueryPack string
Language string
Repositories []common.NameWithOwner
ID int
Owner string
ControllerRepo string
QueryPack string
Language string
Repositories []common.NameWithOwner
AccessMismatchRepos []common.NameWithOwner
NotFoundRepos []common.NameWithOwner
NoCodeqlDBRepos []common.NameWithOwner
OverLimitRepos []common.NameWithOwner
AnalysisRepos *map[common.NameWithOwner]storage.DBLocation
AnalysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation
}
type CommanderSingle struct {
vis *Visibles
v *Visibles
}
func NewCommanderSingle(st *Visibles) *CommanderSingle {
c := CommanderSingle{}
c := CommanderSingle{v: st}
setupEndpoints(&c)
go c.ConsumeResults()
return &c
}
// type State struct {
// Commander Commander
// Logger logger.Logger
// Queue queue.Queue
// Storage storage.Storage
// Runner agent.Runner
// }
type Visibles struct {
Logger logger.Logger
Queue queue.Queue
ServerStore storage.Storage
// TODO extra package for query pack storage
QueryPackStore qpstore.Storage
// TODO extra package for ql db storage
QLDBStore qldbstore.Storage
Queue queue.Queue
State state.ServerState
Artifacts artifactstore.Store
CodeQLDBStore qldbstore.Store
}

41
pkg/state/interfaces.go Normal file
View File

@@ -0,0 +1,41 @@
package state
import (
"mrvacommander/pkg/common"
"mrvacommander/pkg/queue"
)
// StorageInterface defines the methods required for managing storage operations
// related to server state, e.g. job status, results, and artifacts.
type ServerState interface {
// NextID increments and returns the next unique ID for a session.
NextID() int
// GetResult retrieves the analysis result for the specified job.
GetResult(js common.JobSpec) (queue.AnalyzeResult, error)
// GetJobSpecByRepoId retrieves the JobSpec for the specified job Repo ID.
// TODO: fix this hacky logic
GetJobSpecByRepoId(sessionId int, jobRepoId int) (common.JobSpec, error)
// SetResult stores the analysis result for the specified session ID and repository.
SetResult(js common.JobSpec, ar queue.AnalyzeResult)
// GetJobList retrieves the list of analysis jobs for the specified session ID.
GetJobList(sessionId int) ([]queue.AnalyzeJob, error)
// GetJobInfo retrieves the job information for the specified job specification.
GetJobInfo(js common.JobSpec) (common.JobInfo, error)
// SetJobInfo stores the job information for the specified job specification.
SetJobInfo(js common.JobSpec, ji common.JobInfo)
// GetStatus retrieves the status of a job for the specified session ID and repository.
GetStatus(js common.JobSpec) (common.Status, error)
// SetStatus stores the status of a job for the specified session ID and repository.
SetStatus(js common.JobSpec, status common.Status)
// AddJob adds an analysis job to the list of jobs for the specified session ID.
AddJob(job queue.AnalyzeJob)
}

126
pkg/state/state_local.go Normal file
View File

@@ -0,0 +1,126 @@
package state
import (
"fmt"
"log/slog"
"mrvacommander/pkg/common"
"mrvacommander/pkg/queue"
"sync"
)
type LocalState struct {
jobs map[int][]queue.AnalyzeJob
info map[common.JobSpec]common.JobInfo
status map[common.JobSpec]common.Status
result map[common.JobSpec]queue.AnalyzeResult
sessionToJobIdToSpec map[int]map[int]common.JobSpec
mutex sync.Mutex
currentID int
}
func NewLocalState(startingID int) *LocalState {
state := &LocalState{
jobs: make(map[int][]queue.AnalyzeJob),
info: make(map[common.JobSpec]common.JobInfo),
status: make(map[common.JobSpec]common.Status),
result: make(map[common.JobSpec]queue.AnalyzeResult),
sessionToJobIdToSpec: make(map[int]map[int]common.JobSpec),
currentID: startingID,
}
state.sessionToJobIdToSpec[startingID] = make(map[int]common.JobSpec)
state.jobs[startingID] = []queue.AnalyzeJob{}
return state
}
func (s *LocalState) NextID() int {
s.mutex.Lock()
defer s.mutex.Unlock()
s.currentID++
s.jobs[s.currentID] = []queue.AnalyzeJob{}
return s.currentID
}
func (s *LocalState) GetResult(js common.JobSpec) (queue.AnalyzeResult, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.result[js]; !ok {
return queue.AnalyzeResult{}, fmt.Errorf("result not found for job spec %v", js)
}
return s.result[js], nil
}
func (s *LocalState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
idToSpec, ok := s.sessionToJobIdToSpec[sessionId]
if !ok {
return common.JobSpec{}, fmt.Errorf("job ids not found for session %v", sessionId)
}
spec, ok := idToSpec[jobRepoId]
if !ok {
return common.JobSpec{}, fmt.Errorf("job spec not found for job repo id %v", jobRepoId)
}
return spec, nil
}
func (s *LocalState) SetResult(js common.JobSpec, analyzeResult queue.AnalyzeResult) {
s.mutex.Lock()
s.result[js] = analyzeResult
s.mutex.Unlock()
}
func (s *LocalState) GetJobList(sessionID int) ([]queue.AnalyzeJob, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.jobs[sessionID]; !ok {
return nil, fmt.Errorf("job list not found for session %v", sessionID)
}
return s.jobs[sessionID], nil
}
func (s *LocalState) GetJobInfo(js common.JobSpec) (common.JobInfo, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.info[js]; !ok {
return common.JobInfo{}, fmt.Errorf("job info not found for job spec %v", js)
}
return s.info[js], nil
}
func (s *LocalState) SetJobInfo(js common.JobSpec, ji common.JobInfo) {
s.mutex.Lock()
s.info[js] = ji
s.mutex.Unlock()
}
func (s *LocalState) GetStatus(js common.JobSpec) (common.Status, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.status[js]; !ok {
return common.StatusError, fmt.Errorf("status not found for job spec %v", js)
}
return s.status[js], nil
}
func (s *LocalState) SetStatus(js common.JobSpec, status common.Status) {
s.mutex.Lock()
s.status[js] = status
s.mutex.Unlock()
}
func (s *LocalState) AddJob(job queue.AnalyzeJob) {
s.mutex.Lock()
sessionID := job.Spec.SessionID
s.jobs[sessionID] = append(s.jobs[sessionID], job)
// Map the job index to JobSpec for quick result lookup
if _, ok := s.sessionToJobIdToSpec[sessionID]; !ok {
s.sessionToJobIdToSpec[sessionID] = make(map[int]common.JobSpec)
}
s.sessionToJobIdToSpec[sessionID][len(s.sessionToJobIdToSpec[sessionID])] = job.Spec
if len(s.jobs[sessionID]) != len(s.sessionToJobIdToSpec[sessionID]) {
msg := fmt.Sprintf("Unequal job list and job id map length. Session ID: %v", sessionID)
slog.Error(msg)
panic(msg)
}
s.mutex.Unlock()
}

View File

@@ -1,10 +0,0 @@
package storage
import "mrvacommander/pkg/common"
type Storage interface {
NextID() int
SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error)
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
analysisRepos *map[common.NameWithOwner]DBLocation)
}

View File

@@ -1,244 +0,0 @@
package storage
import (
"archive/zip"
"errors"
"fmt"
"io/fs"
"log/slog"
"os"
"path"
"path/filepath"
"sync"
"mrvacommander/pkg/common"
)
var (
jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob)
info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo)
status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status)
result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult)
mutex sync.Mutex
)
func NewStorageSingle(startingID int, v *Visibles) *StorageSingle {
s := StorageSingle{currentID: startingID}
s.modules = v
return &s
}
func (s *StorageSingle) NextID() int {
s.currentID += 1
return s.currentID
}
func (s *StorageSingle) SaveQueryPack(tgz []byte, sessionId int) (string, error) {
// Save the tar.gz body
cwd, err := os.Getwd()
if err != nil {
slog.Error("No working directory")
panic(err)
}
dirpath := path.Join(cwd, "var", "codeql", "querypacks")
if err := os.MkdirAll(dirpath, 0755); err != nil {
slog.Error("Unable to create query pack output directory",
"dir", dirpath)
return "", err
}
fpath := path.Join(dirpath, fmt.Sprintf("qp-%d.tgz", sessionId))
err = os.WriteFile(fpath, tgz, 0644)
if err != nil {
slog.Error("unable to save querypack body decoding error", "path", fpath)
return "", err
} else {
slog.Info("Query pack saved to ", "path", fpath)
}
return fpath, nil
}
// Determine for which repositories codeql databases are available.
//
// Those will be the analysis_repos. The rest will be skipped.
func (s *StorageSingle) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
analysisRepos *map[common.NameWithOwner]DBLocation) {
slog.Debug("Looking for available CodeQL databases")
cwd, err := os.Getwd()
if err != nil {
slog.Error("No working directory")
return
}
analysisRepos = &map[common.NameWithOwner]DBLocation{}
not_found_repos = []common.NameWithOwner{}
for _, rep := range analysisReposRequested {
dbPrefix := filepath.Join(cwd, "codeql", "dbs", rep.Owner, rep.Repo)
dbName := fmt.Sprintf("%s_%s_db.zip", rep.Owner, rep.Repo)
dbPath := filepath.Join(dbPrefix, dbName)
if _, err := os.Stat(dbPath); errors.Is(err, fs.ErrNotExist) {
slog.Info("Database does not exist for repository ", "owner/repo", rep,
"path", dbPath)
not_found_repos = append(not_found_repos, rep)
} else {
slog.Info("Found database for ", "owner/repo", rep, "path", dbPath)
(*analysisRepos)[rep] = DBLocation{Prefix: dbPrefix, File: dbName}
}
}
return not_found_repos, analysisRepos
}
func ArtifactURL(js common.JobSpec, vaid int) (string, error) {
// We're looking for paths like
// codeql/sarif/google/flatbuffers/google_flatbuffers.sarif
ar := GetResult(js)
hostname, err := os.Hostname()
if err != nil {
slog.Error("No host name found")
return "", nil
}
zfpath, err := PackageResults(ar, js.NameWithOwner, vaid)
if err != nil {
slog.Error("Error packaging results:", "error", err)
return "", err
}
// TODO Need url valid in container network and externally
// For now, we assume the container port 8080 is port 8080 on user's machine
hostname = "localhost"
au := fmt.Sprintf("http://%s:8080/download-server/%s", hostname, zfpath)
return au, nil
}
func GetResult(js common.JobSpec) common.AnalyzeResult {
mutex.Lock()
defer mutex.Unlock()
ar := result[js]
return ar
}
func SetResult(sessionid int, nwo common.NameWithOwner, ar common.AnalyzeResult) {
mutex.Lock()
defer mutex.Unlock()
result[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = ar
}
func PackageResults(ar common.AnalyzeResult, owre common.NameWithOwner, vaid int) (zipPath string, e error) {
slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar)
cwd, err := os.Getwd()
if err != nil {
slog.Error("No working directory")
panic(err)
}
// Ensure the output directory exists
dirpath := path.Join(cwd, "var", "codeql", "localrun", "results")
if err := os.MkdirAll(dirpath, 0755); err != nil {
slog.Error("Unable to create results output directory",
"dir", dirpath)
return "", err
}
// Create a new zip file
zpath := path.Join(dirpath, fmt.Sprintf("results-%s-%s-%d.zip", owre.Owner, owre.Repo, vaid))
zfile, err := os.Create(zpath)
if err != nil {
return "", err
}
defer zfile.Close()
// Create a new zip writer
zwriter := zip.NewWriter(zfile)
defer zwriter.Close()
// Add each result file to the zip archive
/*
names := []([]string){{ar.RunAnalysisSARIF, "results.sarif"}}
for _, fpath := range names {
file, err := os.Open(fpath[0])
if err != nil {
return "", err
}
defer file.Close()
// Create a new file in the zip archive with custom name
// The client is very specific:
// if zf.Name != "results.sarif" && zf.Name != "results.bqrs" { continue }
zipEntry, err := zwriter.Create(fpath[1])
if err != nil {
return "", err
}
// Copy the contents of the file to the zip entry
_, err = io.Copy(zipEntry, file)
if err != nil {
return "", err
}
}
*/
return zpath, nil
}
func GetJobList(sessionid int) []common.AnalyzeJob {
mutex.Lock()
defer mutex.Unlock()
return jobs[sessionid]
}
func GetJobInfo(js common.JobSpec) common.JobInfo {
mutex.Lock()
defer mutex.Unlock()
return info[js]
}
func SetJobInfo(js common.JobSpec, ji common.JobInfo) {
mutex.Lock()
defer mutex.Unlock()
info[js] = ji
}
func GetStatus(sessionid int, nwo common.NameWithOwner) common.Status {
mutex.Lock()
defer mutex.Unlock()
return status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}]
}
func ResultAsFile(path string) (string, []byte, error) {
fpath := path
if !filepath.IsAbs(path) {
fpath = "/" + path
}
file, err := os.ReadFile(fpath)
if err != nil {
slog.Warn("Failed to read results file", fpath, err)
return "", nil, err
}
return fpath, file, nil
}
func SetStatus(sessionid int, nwo common.NameWithOwner, s common.Status) {
mutex.Lock()
defer mutex.Unlock()
status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = s
}
func AddJob(sessionid int, job common.AnalyzeJob) {
mutex.Lock()
defer mutex.Unlock()
jobs[sessionid] = append(jobs[sessionid], job)
}

View File

@@ -1,66 +0,0 @@
package storage
import (
"mrvacommander/pkg/common"
"gorm.io/gorm"
)
type DBLocation struct {
Prefix string
File string
}
type StorageSingle struct {
currentID int
modules *Visibles
}
type DBSpec struct {
Host string
Port int
User string
Password string
DBname string
}
type DBInfo struct {
// Database version of
// info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo)
gorm.Model
JobSpec common.JobSpec `gorm:"type:jsonb"`
JobInfo common.JobInfo `gorm:"type:jsonb"`
}
type DBJobs struct {
// Database version of
// jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob)
gorm.Model
JobKey int
AnalyzeJob common.AnalyzeJob `gorm:"type:jsonb"`
}
type DBResult struct {
// Database version of
// result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult)
gorm.Model
JobSpec common.JobSpec `gorm:"type:jsonb"`
AnalyzeResult common.AnalyzeResult `gorm:"type:jsonb"`
}
type DBStatus struct {
// Database version of
// status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status)
gorm.Model
JobSpec common.JobSpec `gorm:"type:jsonb"`
Status common.Status `gorm:"type:jsonb"`
}
type StorageContainer struct {
// Database version of StorageSingle
RequestID int
DB *gorm.DB
modules *Visibles
}
type Visibles struct{}