Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e44c8dfe1 | ||
|
|
1a009ccde0 | ||
|
|
5dfca00fa5 | ||
|
|
46052cd20f | ||
|
|
8f318c114f | ||
|
|
1633245444 | ||
|
|
02acf3eeaf | ||
|
|
30f2d22a71 | ||
|
|
95e42ae85a |
@@ -53,6 +53,11 @@ These are simple steps using a single container.
|
||||
cd /mrva/mrvacommander/cmd/server/
|
||||
./server -loglevel=debug -mode=container
|
||||
|
||||
1. Test server from the host via
|
||||
|
||||
cd ~/work-gh/mrva/mrvacommander/tools
|
||||
sh ./request_16-Jun-2024_11-33-16.curl
|
||||
|
||||
1. Test server via remote client by following the steps in [gh-mrva](https://github.com/hohn/gh-mrva/blob/connection-redirect/README.org#compacted-edit-run-debug-cycle)
|
||||
|
||||
### Some general docker-compose commands
|
||||
|
||||
@@ -113,6 +113,7 @@ func main() {
|
||||
slog.Info("Starting agent")
|
||||
|
||||
workerCount := flag.Int("workers", 0, "number of workers")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
requiredEnvVars := []string{
|
||||
@@ -144,7 +145,7 @@ func main() {
|
||||
|
||||
slog.Info("Initializing RabbitMQ queue")
|
||||
|
||||
rabbitMQQueue, err := queue.InitializeRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, false)
|
||||
rabbitMQQueue, err := queue.NewRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, false)
|
||||
if err != nil {
|
||||
slog.Error("failed to initialize RabbitMQ", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
|
||||
BIN
cmd/server/codeql/dbs/psycopg/psycopg2/psycopg_psycopg2_db.zip
(Stored with Git LFS)
BIN
cmd/server/codeql/dbs/psycopg/psycopg2/psycopg_psycopg2_db.zip
(Stored with Git LFS)
Binary file not shown.
@@ -8,16 +8,16 @@ import (
|
||||
"log"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"mrvacommander/config/mcc"
|
||||
|
||||
"mrvacommander/pkg/agent"
|
||||
"mrvacommander/pkg/logger"
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
"mrvacommander/pkg/qpstore"
|
||||
"mrvacommander/pkg/queue"
|
||||
"mrvacommander/pkg/server"
|
||||
"mrvacommander/pkg/storage"
|
||||
"mrvacommander/pkg/state"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -70,80 +70,64 @@ func main() {
|
||||
switch *mode {
|
||||
case "standalone":
|
||||
// Assemble single-process version
|
||||
|
||||
sl := logger.NewLoggerSingle(&logger.Visibles{})
|
||||
|
||||
// FIXME take value from configuration
|
||||
sq := queue.NewQueueSingle(2, &queue.Visibles{
|
||||
Logger: sl,
|
||||
})
|
||||
|
||||
ss := storage.NewStorageSingle(config.Storage.StartingID, &storage.Visibles{})
|
||||
|
||||
qp, err := qpstore.NewStore(&qpstore.Visibles{})
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize query pack storage")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ql, err := qldbstore.NewStore(&qldbstore.Visibles{})
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize ql database storage")
|
||||
os.Exit(1)
|
||||
}
|
||||
sq := queue.NewQueueSingle(2)
|
||||
ss := state.NewLocalState(config.Storage.StartingID)
|
||||
as := artifactstore.NewInMemoryArtifactStore()
|
||||
ql := qldbstore.NewLocalFilesystemCodeQLDatabaseStore("")
|
||||
|
||||
server.NewCommanderSingle(&server.Visibles{
|
||||
Logger: sl,
|
||||
Queue: sq,
|
||||
ServerStore: ss,
|
||||
QueryPackStore: qp,
|
||||
QLDBStore: ql,
|
||||
Queue: sq,
|
||||
State: ss,
|
||||
Artifacts: as,
|
||||
CodeQLDBStore: ql,
|
||||
})
|
||||
|
||||
// FIXME take value from configuration
|
||||
agent.NewAgentSingle(2, &agent.Visibles{
|
||||
Logger: sl,
|
||||
Queue: sq,
|
||||
QueryPackStore: qp,
|
||||
QLDBStore: ql,
|
||||
Queue: sq,
|
||||
Artifacts: as,
|
||||
CodeQLDBStore: ql,
|
||||
})
|
||||
|
||||
case "container":
|
||||
// Assemble container version
|
||||
sl := logger.NewLoggerSingle(&logger.Visibles{})
|
||||
rmqHost := os.Getenv("MRVA_RABBITMQ_HOST")
|
||||
rmqPort := os.Getenv("MRVA_RABBITMQ_PORT")
|
||||
rmqUser := os.Getenv("MRVA_RABBITMQ_USER")
|
||||
rmqPass := os.Getenv("MRVA_RABBITMQ_PASSWORD")
|
||||
|
||||
// FIXME take value from configuration
|
||||
sq := queue.NewQueueSingle(2, &queue.Visibles{
|
||||
Logger: sl,
|
||||
})
|
||||
|
||||
ss := storage.NewStorageSingle(config.Storage.StartingID, &storage.Visibles{})
|
||||
|
||||
qp, err := qpstore.NewStore(&qpstore.Visibles{})
|
||||
rmqPortAsInt, err := strconv.ParseInt(rmqPort, 10, 16)
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize query pack storage")
|
||||
slog.Error("Failed to parse RabbitMQ port", slog.Any("error", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ql, err := qldbstore.NewStore(&qldbstore.Visibles{})
|
||||
sq, err := queue.NewRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, false)
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize RabbitMQ queue")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ss := state.NewContainerState(config.Storage.StartingID)
|
||||
|
||||
// TODO: add arguments
|
||||
as, err := artifactstore.NewMinIOArtifactStore("", "", "")
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize artifact store")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// TODO: add arguments
|
||||
ql, err := qldbstore.NewMinIOCodeQLDatabaseStore("", "", "", "")
|
||||
if err != nil {
|
||||
slog.Error("Unable to initialize ql database storage")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
agent.NewAgentSingle(2, &agent.Visibles{
|
||||
Logger: sl,
|
||||
Queue: sq,
|
||||
QueryPackStore: qp,
|
||||
QLDBStore: ql,
|
||||
})
|
||||
|
||||
server.NewCommanderSingle(&server.Visibles{
|
||||
Logger: sl,
|
||||
Queue: sq,
|
||||
ServerStore: ss,
|
||||
QueryPackStore: qp,
|
||||
QLDBStore: ql,
|
||||
server.NewCommanderContainer(&server.Visibles{
|
||||
Queue: sq,
|
||||
State: ss,
|
||||
Artifacts: as,
|
||||
CodeQLDBStore: ql,
|
||||
})
|
||||
|
||||
case "cluster":
|
||||
|
||||
@@ -26,12 +26,12 @@ services:
|
||||
context: ./cmd/server
|
||||
dockerfile: Dockerfile
|
||||
container_name: server
|
||||
stop_grace_period: 1s # Reduce the timeout period for testing
|
||||
stop_grace_period: 1s # Reduce the timeout period for testing
|
||||
environment:
|
||||
- MRVA_SERVER_ROOT=/mrva/mrvacommander/cmd/server
|
||||
command: sh -c "tail -f /dev/null"
|
||||
ports:
|
||||
- "8080:8080"
|
||||
- "8080:8080"
|
||||
volumes:
|
||||
- ./:/mrva/mrvacommander
|
||||
depends_on:
|
||||
@@ -53,7 +53,6 @@ services:
|
||||
volumes:
|
||||
- ./dbstore-data:/data
|
||||
|
||||
|
||||
qpstore:
|
||||
image: minio/minio:RELEASE.2024-06-11T03-13-30Z
|
||||
container_name: qpstore
|
||||
@@ -67,7 +66,6 @@ services:
|
||||
command: server /data --console-address ":9001"
|
||||
volumes:
|
||||
- ./qpstore-data:/data
|
||||
|
||||
|
||||
agent:
|
||||
build:
|
||||
@@ -76,7 +74,8 @@ services:
|
||||
container_name: agent
|
||||
depends_on:
|
||||
- rabbitmq
|
||||
- minio
|
||||
- dbstore
|
||||
- qpstore
|
||||
environment:
|
||||
MRVA_RABBITMQ_HOST: rabbitmq
|
||||
MRVA_RABBITMQ_PORT: 5672
|
||||
@@ -85,7 +84,6 @@ services:
|
||||
networks:
|
||||
- backend
|
||||
|
||||
|
||||
networks:
|
||||
backend:
|
||||
driver: bridge
|
||||
@@ -93,4 +91,3 @@ networks:
|
||||
# Remove named volumes to use bind mounts
|
||||
# volumes:
|
||||
# minio-data:
|
||||
|
||||
|
||||
@@ -4,11 +4,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/codeql"
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/logger"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
"mrvacommander/pkg/qpstore"
|
||||
"mrvacommander/pkg/queue"
|
||||
"mrvacommander/utils"
|
||||
"os"
|
||||
@@ -22,8 +20,8 @@ type RunnerSingle struct {
|
||||
queue queue.Queue
|
||||
}
|
||||
|
||||
func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle {
|
||||
r := RunnerSingle{queue: av.Queue}
|
||||
func NewAgentSingle(numWorkers int, v *Visibles) *RunnerSingle {
|
||||
r := RunnerSingle{queue: v.Queue}
|
||||
|
||||
for id := 1; id <= numWorkers; id++ {
|
||||
go r.worker(id)
|
||||
@@ -31,15 +29,6 @@ func NewAgentSingle(numWorkers int, av *Visibles) *RunnerSingle {
|
||||
return &r
|
||||
}
|
||||
|
||||
type Visibles struct {
|
||||
Logger logger.Logger
|
||||
Queue queue.Queue
|
||||
// TODO extra package for query pack storage
|
||||
QueryPackStore qpstore.Storage
|
||||
// TODO extra package for ql db storage
|
||||
QLDBStore qldbstore.Storage
|
||||
}
|
||||
|
||||
func (r *RunnerSingle) worker(wid int) {
|
||||
// TODO: reimplement this later
|
||||
/*
|
||||
@@ -76,10 +65,10 @@ func (r *RunnerSingle) worker(wid int) {
|
||||
// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult
|
||||
func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
|
||||
var result = common.AnalyzeResult{
|
||||
RequestId: job.RequestId,
|
||||
ResultCount: 0,
|
||||
ResultArchiveURL: "",
|
||||
Status: common.StatusError,
|
||||
RequestId: job.RequestId,
|
||||
ResultCount: 0,
|
||||
ResultLocation: artifactstore.ArtifactLocation{},
|
||||
Status: common.StatusError,
|
||||
}
|
||||
|
||||
// Create a temporary directory
|
||||
@@ -111,10 +100,10 @@ func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
|
||||
slog.Debug("Results archive size", slog.Int("size", len(resultsArchive)))
|
||||
|
||||
result = common.AnalyzeResult{
|
||||
RequestId: job.RequestId,
|
||||
ResultCount: runResult.ResultCount,
|
||||
ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE", // TODO
|
||||
Status: common.StatusSuccess,
|
||||
RequestId: job.RequestId,
|
||||
ResultCount: runResult.ResultCount,
|
||||
ResultLocation: artifactstore.ArtifactLocation{}, // TODO "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE"
|
||||
Status: common.StatusSuccess,
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
||||
@@ -1,4 +1,13 @@
|
||||
package agent
|
||||
|
||||
type Runner interface {
|
||||
import (
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
"mrvacommander/pkg/queue"
|
||||
)
|
||||
|
||||
type Visibles struct {
|
||||
Queue queue.Queue
|
||||
Artifacts artifactstore.ArtifactStore
|
||||
CodeQLDBStore qldbstore.CodeQLDatabaseStore
|
||||
}
|
||||
|
||||
24
pkg/artifactstore/interfaces.go
Normal file
24
pkg/artifactstore/interfaces.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package artifactstore
|
||||
|
||||
type ArtifactLocation struct {
|
||||
// Data is a map of key-value pairs that describe the location of the artifact.
|
||||
// For example, a simple key-value pair could be "path" -> "/path/to/artifact.tgz".
|
||||
// Alternatively, a more complex example could be "bucket" -> "example", "key" -> "UNIQUE_ARTIFACT_IDENTIFIER".
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
type ArtifactStore interface {
|
||||
// GetQueryPack returns the query pack as a byte slice for the specified location.
|
||||
GetQueryPack(location ArtifactLocation) ([]byte, error)
|
||||
|
||||
// SaveQueryPack saves the query pack from the provided byte slice and session ID.
|
||||
// It returns the location of the saved query pack.
|
||||
SaveQueryPack(sessionID int, data []byte) (ArtifactLocation, error)
|
||||
|
||||
// GetResult returns the result archive as a byte slice for the specified location.
|
||||
GetResult(location ArtifactLocation) ([]byte, error)
|
||||
|
||||
// SaveResult saves the result archive from the provided byte slice and session ID.
|
||||
// It returns the location of the saved result archive.
|
||||
SaveResult(sessionID int, data []byte) (ArtifactLocation, error)
|
||||
}
|
||||
75
pkg/artifactstore/store_memory.go
Normal file
75
pkg/artifactstore/store_memory.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package artifactstore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type InMemoryArtifactStore struct {
|
||||
mu sync.Mutex
|
||||
packs map[string][]byte
|
||||
results map[string][]byte
|
||||
}
|
||||
|
||||
func NewInMemoryArtifactStore() *InMemoryArtifactStore {
|
||||
return &InMemoryArtifactStore{
|
||||
packs: make(map[string][]byte),
|
||||
results: make(map[string][]byte),
|
||||
}
|
||||
}
|
||||
|
||||
func (store *InMemoryArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
key := location.data["key"]
|
||||
data, exists := store.packs[key]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("query pack not found: %s", key)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (store *InMemoryArtifactStore) SaveQueryPack(sessionID int, data []byte) (ArtifactLocation, error) {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
key := fmt.Sprintf("%d.tgz", sessionID)
|
||||
store.packs[key] = data
|
||||
|
||||
location := ArtifactLocation{
|
||||
data: map[string]string{
|
||||
"bucket": "packs",
|
||||
"key": key,
|
||||
},
|
||||
}
|
||||
return location, nil
|
||||
}
|
||||
|
||||
func (store *InMemoryArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
key := location.data["key"]
|
||||
data, exists := store.results[key]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("result not found: %s", key)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (store *InMemoryArtifactStore) SaveResult(sessionID int, data []byte) (ArtifactLocation, error) {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
key := fmt.Sprintf("%d.tgz", sessionID)
|
||||
store.results[key] = data
|
||||
|
||||
location := ArtifactLocation{
|
||||
data: map[string]string{
|
||||
"bucket": "results",
|
||||
"key": key,
|
||||
},
|
||||
}
|
||||
return location, nil
|
||||
}
|
||||
82
pkg/artifactstore/store_minio.go
Normal file
82
pkg/artifactstore/store_minio.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package artifactstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
type MinIOArtifactStore struct {
|
||||
client *minio.Client
|
||||
}
|
||||
|
||||
func NewMinIOArtifactStore(endpoint, id, secret string) (*MinIOArtifactStore, error) {
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(id, secret, ""),
|
||||
Secure: false,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MinIOArtifactStore{
|
||||
client: minioClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) GetQueryPack(location ArtifactLocation) ([]byte, error) {
|
||||
return store.getArtifact(location)
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) SaveQueryPack(sessionID int, data []byte) (ArtifactLocation, error) {
|
||||
return store.saveArtifact("packs", sessionID, data)
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) GetResult(location ArtifactLocation) ([]byte, error) {
|
||||
return store.getArtifact(location)
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) SaveResult(sessionID int, data []byte) (ArtifactLocation, error) {
|
||||
return store.saveArtifact("results", sessionID, data)
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) getArtifact(location ArtifactLocation) ([]byte, error) {
|
||||
bucket := location.data["bucket"]
|
||||
key := location.data["key"]
|
||||
|
||||
object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer object.Close()
|
||||
|
||||
data, err := io.ReadAll(object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (store *MinIOArtifactStore) saveArtifact(bucket string, sessionID int, data []byte) (ArtifactLocation, error) {
|
||||
key := fmt.Sprintf("%d.tgz", sessionID)
|
||||
_, err := store.client.PutObject(context.Background(), bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
|
||||
ContentType: "application/gzip",
|
||||
})
|
||||
if err != nil {
|
||||
return ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
location := ArtifactLocation{
|
||||
data: map[string]string{
|
||||
"bucket": bucket,
|
||||
"key": key,
|
||||
},
|
||||
}
|
||||
|
||||
return location, nil
|
||||
}
|
||||
@@ -1,10 +1,9 @@
|
||||
package common
|
||||
|
||||
type JobInfo struct {
|
||||
QueryLanguage string
|
||||
CreatedAt string
|
||||
UpdatedAt string
|
||||
|
||||
QueryLanguage string
|
||||
CreatedAt string
|
||||
UpdatedAt string
|
||||
SkippedRepositories SkippedRepositories
|
||||
}
|
||||
|
||||
@@ -16,8 +15,8 @@ type SkippedRepositories struct {
|
||||
}
|
||||
|
||||
type AccessMismatchRepos struct {
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []string `json:"repositories"`
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []Repository `json:"repositories"`
|
||||
}
|
||||
|
||||
type NotFoundRepos struct {
|
||||
@@ -26,13 +25,13 @@ type NotFoundRepos struct {
|
||||
}
|
||||
|
||||
type NoCodeqlDBRepos struct {
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []string `json:"repositories"`
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []Repository `json:"repositories"`
|
||||
}
|
||||
|
||||
type OverLimitRepos struct {
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []string `json:"repositories"`
|
||||
RepositoryCount int `json:"repository_count"`
|
||||
Repositories []Repository `json:"repositories"`
|
||||
}
|
||||
|
||||
type StatusResponse struct {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package common
|
||||
|
||||
import "mrvacommander/pkg/artifactstore"
|
||||
|
||||
// NameWithOwner represents a repository name and its owner name.
|
||||
type NameWithOwner struct {
|
||||
Owner string
|
||||
@@ -8,22 +10,24 @@ type NameWithOwner struct {
|
||||
|
||||
// AnalyzeJob represents a job specifying a repository and a query pack to analyze it with.
|
||||
// This is the message format that the agent receives from the queue.
|
||||
// TODO: make query_pack_location query_pack_url with a presigned URL
|
||||
type AnalyzeJob struct {
|
||||
RequestId int // json:"request_id"
|
||||
QueryPackId int // json:"query_pack_id"
|
||||
QueryPackURL string // json:"query_pack_url"
|
||||
QueryLanguage string // json:"query_language"
|
||||
NWO NameWithOwner // json:"nwo"
|
||||
RequestId int // json:"request_id"
|
||||
QueryPackId int // json:"query_pack_id"
|
||||
QueryPackLocation artifactstore.ArtifactLocation // json:"query_pack_location"
|
||||
QueryLanguage string // json:"query_language"
|
||||
NWO NameWithOwner // json:"nwo"
|
||||
}
|
||||
|
||||
// AnalyzeResult represents the result of an analysis job.
|
||||
// This is the message format that the agent sends to the queue.
|
||||
// Status will only ever be StatusSuccess or StatusError when sent in a result.
|
||||
// TODO: make result_location result_archive_url with a presigned URL
|
||||
type AnalyzeResult struct {
|
||||
Status Status // json:"status"
|
||||
RequestId int // json:"request_id"
|
||||
ResultCount int // json:"result_count"
|
||||
ResultArchiveURL string // json:"result_archive_url"
|
||||
Status Status // json:"status"
|
||||
RequestId int // json:"request_id"
|
||||
ResultCount int // json:"result_count"
|
||||
ResultLocation artifactstore.ArtifactLocation // json:"result_location"
|
||||
}
|
||||
|
||||
// Status represents the status of a job.
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
package logger
|
||||
|
||||
type Logger interface {
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
package logger
|
||||
|
||||
type LoggerSingle struct {
|
||||
modules *Visibles
|
||||
}
|
||||
|
||||
func NewLoggerSingle(v *Visibles) *LoggerSingle {
|
||||
l := LoggerSingle{}
|
||||
|
||||
l.modules = v
|
||||
return &l
|
||||
}
|
||||
|
||||
type Visibles struct{}
|
||||
@@ -4,29 +4,25 @@ import (
|
||||
"mrvacommander/pkg/common"
|
||||
)
|
||||
|
||||
type DBLocation struct {
|
||||
Prefix string
|
||||
File string
|
||||
type CodeQLDatabaseLocation struct {
|
||||
// Data is a map of key-value pairs that describe the location of the database.
|
||||
// For example, a simple key-value pair could be "path" -> "/path/to/database.zip".
|
||||
// Alternatively, a more complex example could be "bucket" -> "example", "key" -> "UNIQUE_DB_IDENTIFIER".
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
|
||||
analysisRepos *map[common.NameWithOwner]DBLocation)
|
||||
}
|
||||
|
||||
type Visibles struct{}
|
||||
|
||||
type StorageQLDB struct{}
|
||||
|
||||
func NewStore(v *Visibles) (Storage, error) {
|
||||
s := StorageQLDB{}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func (s *StorageQLDB) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
||||
not_found_repos []common.NameWithOwner,
|
||||
analysisRepos *map[common.NameWithOwner]DBLocation) {
|
||||
// TODO implement
|
||||
return nil, nil
|
||||
type CodeQLDatabaseStore interface {
|
||||
// FindAvailableDBs returns a map of available databases for the requested analysisReposRequested.
|
||||
// It also returns a list of repository NWOs that do not have available databases.
|
||||
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
||||
notFoundRepos []common.NameWithOwner,
|
||||
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation)
|
||||
|
||||
// GetDatabase returns the database as a byte slice for the specified repository.
|
||||
// A CodeQL database is a zip archive to be processed by the CodeQL CLI.
|
||||
GetDatabase(location CodeQLDatabaseLocation) ([]byte, error)
|
||||
|
||||
// GetDatabaseByNWO returns the database location for the specified repository.
|
||||
// FindAvailableDBs should be used in lieu of this method for checking database availability.
|
||||
GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error)
|
||||
}
|
||||
|
||||
66
pkg/qldbstore/qldbstore_local.go
Normal file
66
pkg/qldbstore/qldbstore_local.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package qldbstore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"mrvacommander/pkg/common"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
type FilesystemCodeQLDatabaseStore struct {
|
||||
basePath string
|
||||
}
|
||||
|
||||
func NewLocalFilesystemCodeQLDatabaseStore(basePath string) *FilesystemCodeQLDatabaseStore {
|
||||
return &FilesystemCodeQLDatabaseStore{
|
||||
basePath: basePath,
|
||||
}
|
||||
}
|
||||
|
||||
func (store *FilesystemCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
||||
notFoundRepos []common.NameWithOwner,
|
||||
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) {
|
||||
|
||||
foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation)
|
||||
for _, repo := range analysisReposRequested {
|
||||
location, err := store.GetDatabaseLocationByNWO(repo)
|
||||
if err != nil {
|
||||
notFoundRepos = append(notFoundRepos, repo)
|
||||
} else {
|
||||
foundReposMap[repo] = location
|
||||
}
|
||||
}
|
||||
|
||||
return notFoundRepos, &foundReposMap
|
||||
}
|
||||
|
||||
func (store *FilesystemCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) {
|
||||
path, exists := location.data["path"]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("path not specified in location")
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (store *FilesystemCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) {
|
||||
filePath := filepath.Join(store.basePath, fmt.Sprintf("%s_%s.zip", nwo.Owner, nwo.Repo))
|
||||
|
||||
// Check if the file exists
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo)
|
||||
}
|
||||
|
||||
location := CodeQLDatabaseLocation{
|
||||
data: map[string]string{
|
||||
"path": filePath,
|
||||
},
|
||||
}
|
||||
|
||||
return location, nil
|
||||
}
|
||||
88
pkg/qldbstore/qldbstore_minio.go
Normal file
88
pkg/qldbstore/qldbstore_minio.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package qldbstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mrvacommander/pkg/common"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
type MinIOCodeQLDatabaseStore struct {
|
||||
client *minio.Client
|
||||
bucketName string
|
||||
}
|
||||
|
||||
func NewMinIOCodeQLDatabaseStore(endpoint, id, secret, bucketName string) (*MinIOCodeQLDatabaseStore, error) {
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(id, secret, ""),
|
||||
Secure: false,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MinIOCodeQLDatabaseStore{
|
||||
client: minioClient,
|
||||
bucketName: bucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (store *MinIOCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
||||
notFoundRepos []common.NameWithOwner,
|
||||
foundRepos *map[common.NameWithOwner]CodeQLDatabaseLocation) {
|
||||
|
||||
foundReposMap := make(map[common.NameWithOwner]CodeQLDatabaseLocation)
|
||||
for _, repo := range analysisReposRequested {
|
||||
location, err := store.GetDatabaseLocationByNWO(repo)
|
||||
if err != nil {
|
||||
notFoundRepos = append(notFoundRepos, repo)
|
||||
} else {
|
||||
foundReposMap[repo] = location
|
||||
}
|
||||
}
|
||||
|
||||
return notFoundRepos, &foundReposMap
|
||||
}
|
||||
|
||||
func (store *MinIOCodeQLDatabaseStore) GetDatabase(location CodeQLDatabaseLocation) ([]byte, error) {
|
||||
bucket := location.data["bucket"]
|
||||
key := location.data["key"]
|
||||
|
||||
object, err := store.client.GetObject(context.Background(), bucket, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer object.Close()
|
||||
|
||||
data, err := io.ReadAll(object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (store *MinIOCodeQLDatabaseStore) GetDatabaseLocationByNWO(nwo common.NameWithOwner) (CodeQLDatabaseLocation, error) {
|
||||
objectName := fmt.Sprintf("%s/%s.zip", nwo.Owner, nwo.Repo)
|
||||
|
||||
// Check if the object exists
|
||||
_, err := store.client.StatObject(context.Background(), store.bucketName, objectName, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
if minio.ToErrorResponse(err).Code == "NoSuchKey" {
|
||||
return CodeQLDatabaseLocation{}, fmt.Errorf("database not found for %s", nwo)
|
||||
}
|
||||
return CodeQLDatabaseLocation{}, err
|
||||
}
|
||||
|
||||
location := CodeQLDatabaseLocation{
|
||||
data: map[string]string{
|
||||
"bucket": store.bucketName,
|
||||
"key": objectName,
|
||||
},
|
||||
}
|
||||
|
||||
return location, nil
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
package qpstore
|
||||
|
||||
type Storage interface {
|
||||
SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error)
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
package qpstore
|
||||
|
||||
type Visibles struct{}
|
||||
|
||||
type StorageQP struct{}
|
||||
|
||||
func NewStore(v *Visibles) (Storage, error) {
|
||||
s := StorageQP{}
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func (s *StorageQP) SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error) {
|
||||
// TODO implement
|
||||
return "", nil
|
||||
}
|
||||
@@ -2,13 +2,9 @@ package queue
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/storage"
|
||||
)
|
||||
|
||||
type Queue interface {
|
||||
Jobs() chan common.AnalyzeJob
|
||||
Results() chan common.AnalyzeResult
|
||||
StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation,
|
||||
session_id int,
|
||||
session_language string)
|
||||
}
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/storage"
|
||||
)
|
||||
|
||||
func (q *QueueSingle) Jobs() chan common.AnalyzeJob {
|
||||
return q.jobs
|
||||
}
|
||||
|
||||
func (q *QueueSingle) Results() chan common.AnalyzeResult {
|
||||
return q.results
|
||||
}
|
||||
|
||||
func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int,
|
||||
session_language string) {
|
||||
slog.Debug("Queueing codeql database analyze jobs")
|
||||
|
||||
for nwo := range *analysis_repos {
|
||||
info := common.AnalyzeJob{
|
||||
QueryPackId: session_id,
|
||||
QueryLanguage: session_language,
|
||||
NWO: nwo,
|
||||
}
|
||||
q.jobs <- info
|
||||
storage.SetStatus(session_id, nwo, common.StatusQueued)
|
||||
storage.AddJob(session_id, info)
|
||||
}
|
||||
}
|
||||
@@ -2,12 +2,10 @@ package queue
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/storage"
|
||||
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
@@ -21,14 +19,14 @@ type RabbitMQQueue struct {
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
// InitializeRabbitMQQueue initializes a RabbitMQ queue.
|
||||
// NewRabbitMQQueue initializes a RabbitMQ queue.
|
||||
// It returns a pointer to a RabbitMQQueue and an error.
|
||||
//
|
||||
// If isAgent is true, the queue is initialized to be used by an agent.
|
||||
// Otherwise, the queue is initialized to be used by the server.
|
||||
// The difference in behaviour is that the agent consumes jobs and publishes results,
|
||||
// while the server publishes jobs and consumes results.
|
||||
func InitializeRabbitMQQueue(
|
||||
func NewRabbitMQQueue(
|
||||
host string,
|
||||
port int16,
|
||||
user string,
|
||||
@@ -123,11 +121,6 @@ func (q *RabbitMQQueue) Results() chan common.AnalyzeResult {
|
||||
return q.results
|
||||
}
|
||||
|
||||
func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) {
|
||||
// TODO: Implement
|
||||
log.Fatal("unimplemented")
|
||||
}
|
||||
|
||||
func (q *RabbitMQQueue) Close() {
|
||||
q.channel.Close()
|
||||
q.conn.Close()
|
||||
28
pkg/queue/queue_single.go
Normal file
28
pkg/queue/queue_single.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/common"
|
||||
)
|
||||
|
||||
type QueueSingle struct {
|
||||
NumWorkers int
|
||||
jobs chan common.AnalyzeJob
|
||||
results chan common.AnalyzeResult
|
||||
}
|
||||
|
||||
func NewQueueSingle(numWorkers int) *QueueSingle {
|
||||
q := QueueSingle{
|
||||
NumWorkers: numWorkers,
|
||||
jobs: make(chan common.AnalyzeJob, 10),
|
||||
results: make(chan common.AnalyzeResult, 10),
|
||||
}
|
||||
return &q
|
||||
}
|
||||
|
||||
func (q *QueueSingle) Jobs() chan common.AnalyzeJob {
|
||||
return q.jobs
|
||||
}
|
||||
|
||||
func (q *QueueSingle) Results() chan common.AnalyzeResult {
|
||||
return q.results
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/logger"
|
||||
)
|
||||
|
||||
type QueueSingle struct {
|
||||
NumWorkers int
|
||||
jobs chan common.AnalyzeJob
|
||||
results chan common.AnalyzeResult
|
||||
modules *Visibles
|
||||
}
|
||||
|
||||
type Visibles struct {
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
func NewQueueSingle(numWorkers int, v *Visibles) *QueueSingle {
|
||||
q := QueueSingle{}
|
||||
q.jobs = make(chan common.AnalyzeJob, 10)
|
||||
q.results = make(chan common.AnalyzeResult, 10)
|
||||
q.NumWorkers = numWorkers
|
||||
|
||||
q.modules = v
|
||||
|
||||
return &q
|
||||
}
|
||||
417
pkg/server/container.go
Normal file
417
pkg/server/container.go
Normal file
@@ -0,0 +1,417 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (c *CommanderContainer) startAnalyses(
|
||||
// X1: check
|
||||
analysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation,
|
||||
jobID int,
|
||||
queryLanguage string) {
|
||||
slog.Debug("Queueing analysis jobs")
|
||||
|
||||
for nwo := range *analysisRepos {
|
||||
info := common.AnalyzeJob{
|
||||
QueryPackId: jobID,
|
||||
QueryLanguage: queryLanguage,
|
||||
NWO: nwo,
|
||||
}
|
||||
c.v.Queue.Jobs() <- info
|
||||
c.v.State.SetStatus(jobID, nwo, common.StatusQueued)
|
||||
c.v.State.AddJob(jobID, info)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) StatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo, vaid int) {
|
||||
slog.Debug("Submitting status response", "session", vaid)
|
||||
|
||||
all_scanned := []common.ScannedRepo{}
|
||||
jobs := c.v.State.GetJobList(js.JobID)
|
||||
for _, job := range jobs {
|
||||
astat := c.v.State.GetStatus(js.JobID, job.NWO).ToExternalString()
|
||||
all_scanned = append(all_scanned,
|
||||
common.ScannedRepo{
|
||||
Repository: common.Repository{
|
||||
ID: 0,
|
||||
Name: job.NWO.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", job.NWO.Owner, job.NWO.Repo),
|
||||
Private: false,
|
||||
StargazersCount: 0,
|
||||
UpdatedAt: ji.UpdatedAt,
|
||||
},
|
||||
AnalysisStatus: astat,
|
||||
ResultCount: 123, // FIXME 123 is a lie so the client downloads
|
||||
ArtifactSizeBytes: 123, // FIXME
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
astat := c.v.State.GetStatus(js.JobID, js.NameWithOwner).ToExternalString()
|
||||
|
||||
status := common.StatusResponse{
|
||||
SessionId: js.JobID,
|
||||
ControllerRepo: common.ControllerRepo{},
|
||||
Actor: common.Actor{},
|
||||
QueryLanguage: ji.QueryLanguage,
|
||||
QueryPackURL: "", // FIXME
|
||||
CreatedAt: ji.CreatedAt,
|
||||
UpdatedAt: ji.UpdatedAt,
|
||||
ActionsWorkflowRunID: 0, // FIXME
|
||||
Status: astat,
|
||||
ScannedRepositories: all_scanned,
|
||||
SkippedRepositories: ji.SkippedRepositories,
|
||||
}
|
||||
|
||||
// Encode the response as JSON
|
||||
submitStatus, err := json.Marshal(status)
|
||||
if err != nil {
|
||||
slog.Error("Error encoding response as JSON:",
|
||||
"error", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Send analysisReposJSON via ResponseWriter
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(submitStatus)
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) RootHandler(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Info("Request on /")
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) MRVAStatus(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("MRVA status request for ",
|
||||
"owner", vars["owner"],
|
||||
"repo", vars["repo"],
|
||||
"codeql_variant_analysis_id", vars["codeql_variant_analysis_id"])
|
||||
id, err := strconv.Atoi(vars["codeql_variant_analysis_id"])
|
||||
if err != nil {
|
||||
slog.Error("Variant analysis is is not integer", "id",
|
||||
vars["codeql_variant_analysis_id"])
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// The status reports one status for all jobs belonging to an id.
|
||||
// So we simply report the status of a job as the status of all.
|
||||
spec := c.v.State.GetJobList(id)
|
||||
if spec == nil {
|
||||
msg := "No jobs found for given job id"
|
||||
slog.Error(msg, "id", vars["codeql_variant_analysis_id"])
|
||||
http.Error(w, msg, http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
job := spec[0]
|
||||
|
||||
js := common.JobSpec{
|
||||
JobID: job.QueryPackId,
|
||||
NameWithOwner: job.NWO,
|
||||
}
|
||||
|
||||
ji := c.v.State.GetJobInfo(js)
|
||||
|
||||
c.StatusResponse(w, js, ji, id)
|
||||
}
|
||||
|
||||
// Download artifacts
|
||||
func (c *CommanderContainer) MRVADownloadArtifact(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("MRVA artifact download",
|
||||
"controller_owner", vars["controller_owner"],
|
||||
"controller_repo", vars["controller_repo"],
|
||||
"codeql_variant_analysis_id", vars["codeql_variant_analysis_id"],
|
||||
"repo_owner", vars["repo_owner"],
|
||||
"repo_name", vars["repo_name"],
|
||||
)
|
||||
vaid, err := strconv.Atoi(vars["codeql_variant_analysis_id"])
|
||||
if err != nil {
|
||||
slog.Error("Variant analysis is is not integer", "id",
|
||||
vars["codeql_variant_analysis_id"])
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
js := common.JobSpec{
|
||||
JobID: vaid,
|
||||
NameWithOwner: common.NameWithOwner{
|
||||
Owner: vars["repo_owner"],
|
||||
Repo: vars["repo_name"],
|
||||
},
|
||||
}
|
||||
c.DownloadResponse(w, js, vaid)
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) DownloadResponse(w http.ResponseWriter, js common.JobSpec, jobID int) {
|
||||
var response common.DownloadResponse
|
||||
|
||||
slog.Debug("Forming download response", "id", jobID, "job", js)
|
||||
|
||||
jobStatus := c.v.State.GetStatus(jobID, js.NameWithOwner)
|
||||
|
||||
if jobStatus == common.StatusSuccess {
|
||||
jobResult := c.v.State.GetResult(js)
|
||||
// TODO: return this as a URL @hohn
|
||||
jobResultData, err := c.v.Artifacts.GetResult(jobResult.ResultLocation)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response = common.DownloadResponse{
|
||||
Repository: common.DownloadRepo{
|
||||
Name: js.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
|
||||
},
|
||||
AnalysisStatus: jobStatus.ToExternalString(),
|
||||
ResultCount: jobResult.ResultCount,
|
||||
ArtifactSizeBytes: len(jobResultData),
|
||||
DatabaseCommitSha: "do-we-use-dcs-p",
|
||||
SourceLocationPrefix: "do-we-use-slp-p",
|
||||
ArtifactURL: "TODO", // @hohn
|
||||
}
|
||||
} else {
|
||||
response = common.DownloadResponse{
|
||||
Repository: common.DownloadRepo{
|
||||
Name: js.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
|
||||
},
|
||||
AnalysisStatus: jobStatus.ToExternalString(),
|
||||
ResultCount: 0,
|
||||
ArtifactSizeBytes: 0,
|
||||
DatabaseCommitSha: "",
|
||||
SourceLocationPrefix: "/not/relevant/here",
|
||||
ArtifactURL: "",
|
||||
}
|
||||
}
|
||||
|
||||
// Encode the response as JSON
|
||||
responseJson, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
slog.Error("Error encoding response as JSON:",
|
||||
"error", err)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Send analysisReposJSON via ResponseWriter
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(responseJson)
|
||||
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) MRVADownloadServe(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("File download request", "local_path", vars["local_path"])
|
||||
|
||||
FileDownload(w, vars["local_path"])
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) MRVARequestID(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("New mrva run using repository ID", "id", vars["repository_id"])
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) MRVARequest(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("New mrva run", "owner", vars["owner"], "repo", vars["repo"])
|
||||
|
||||
session_id := c.v.State.NextID()
|
||||
session_owner := vars["owner"]
|
||||
session_controller_repo := vars["repo"]
|
||||
slog.Info("new run", "id: ", fmt.Sprint(session_id), session_owner, session_controller_repo)
|
||||
session_language, session_repositories, session_tgz_ref, err := c.collectRequestInfo(w, r, session_id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
not_found_repos, analysisRepos := c.v.CodeQLDBStore.FindAvailableDBs(session_repositories)
|
||||
|
||||
c.startAnalyses(analysisRepos, session_id, session_language)
|
||||
|
||||
si := SessionInfo{
|
||||
ID: session_id,
|
||||
Owner: session_owner,
|
||||
ControllerRepo: session_controller_repo,
|
||||
|
||||
QueryPack: session_tgz_ref,
|
||||
Language: session_language,
|
||||
Repositories: session_repositories,
|
||||
|
||||
AccessMismatchRepos: nil, /* FIXME */
|
||||
NotFoundRepos: not_found_repos,
|
||||
NoCodeqlDBRepos: nil, /* FIXME */
|
||||
OverLimitRepos: nil, /* FIXME */
|
||||
|
||||
AnalysisRepos: analysisRepos,
|
||||
}
|
||||
|
||||
slog.Debug("Forming and sending response for submitted analysis job", "id", si.ID)
|
||||
submit_response, err := c.submitResponse(si)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(submit_response)
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) submitResponse(si SessionInfo) ([]byte, error) {
|
||||
// Construct the response bottom-up
|
||||
var m_cr common.ControllerRepo
|
||||
var m_ac common.Actor
|
||||
|
||||
repos, count := nwoToNwoStringArray(si.NotFoundRepos)
|
||||
r_nfr := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repos}
|
||||
|
||||
ra, rac := nwoToDummyRepositoryArray(si.AccessMismatchRepos)
|
||||
r_amr := common.AccessMismatchRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
ra, rac = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
|
||||
r_ncd := common.NoCodeqlDBRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
// TODO fill these with real values?
|
||||
ra, rac = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
|
||||
r_olr := common.OverLimitRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
m_skip := common.SkippedRepositories{
|
||||
AccessMismatchRepos: r_amr,
|
||||
NotFoundRepos: r_nfr,
|
||||
NoCodeqlDBRepos: r_ncd,
|
||||
OverLimitRepos: r_olr}
|
||||
|
||||
m_sr := common.SubmitResponse{
|
||||
Actor: m_ac,
|
||||
ControllerRepo: m_cr,
|
||||
ID: si.ID,
|
||||
QueryLanguage: si.Language,
|
||||
// TODO: broken, need proper URL using si.data
|
||||
QueryPackURL: "broken-for-now",
|
||||
CreatedAt: time.Now().Format(time.RFC3339),
|
||||
UpdatedAt: time.Now().Format(time.RFC3339),
|
||||
Status: "in_progress",
|
||||
SkippedRepositories: m_skip,
|
||||
}
|
||||
|
||||
// Store data needed later
|
||||
// joblist := state.GetJobList(si.ID)
|
||||
// (si.JobID)?
|
||||
joblist := c.v.State.GetJobList(si.ID)
|
||||
|
||||
for _, job := range joblist {
|
||||
c.v.State.SetJobInfo(common.JobSpec{
|
||||
JobID: si.ID,
|
||||
NameWithOwner: job.NWO,
|
||||
}, common.JobInfo{
|
||||
QueryLanguage: si.Language,
|
||||
CreatedAt: m_sr.CreatedAt,
|
||||
UpdatedAt: m_sr.UpdatedAt,
|
||||
SkippedRepositories: m_skip,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Encode the response as JSON
|
||||
submit_response, err := json.Marshal(m_sr)
|
||||
if err != nil {
|
||||
slog.Warn("Error encoding response as JSON:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return submit_response, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, artifactstore.ArtifactLocation, error) {
|
||||
slog.Debug("Collecting session info")
|
||||
|
||||
if r.Body == nil {
|
||||
err := errors.New("missing request body")
|
||||
log.Println(err)
|
||||
http.Error(w, err.Error(), http.StatusNoContent)
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
buf, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
var w http.ResponseWriter
|
||||
slog.Error("Error reading MRVA submission body", "error", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
msg, err := TrySubmitMsg(buf)
|
||||
if err != nil {
|
||||
// Unknown message
|
||||
slog.Error("Unknown MRVA submission body format")
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
// Decompose the SubmitMsg and keep information
|
||||
|
||||
// Save the query pack and keep the location
|
||||
if !isBase64Gzip([]byte(msg.QueryPack)) {
|
||||
slog.Error("MRVA submission body querypack has invalid format")
|
||||
err := errors.New("MRVA submission body querypack has invalid format")
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
session_tgz_ref, err := c.processQueryPackArchive(msg.QueryPack, sessionId)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
// 2. Save the language
|
||||
session_language := msg.Language
|
||||
|
||||
// 3. Save the repositories
|
||||
var session_repositories []common.NameWithOwner
|
||||
|
||||
for _, v := range msg.Repositories {
|
||||
t := strings.Split(v, "/")
|
||||
if len(t) != 2 {
|
||||
err := "Invalid owner / repository entry"
|
||||
slog.Error(err, "entry", t)
|
||||
http.Error(w, err, http.StatusBadRequest)
|
||||
}
|
||||
session_repositories = append(session_repositories,
|
||||
common.NameWithOwner{Owner: t[0], Repo: t[1]})
|
||||
}
|
||||
return session_language, session_repositories, session_tgz_ref, nil
|
||||
}
|
||||
|
||||
func (c *CommanderContainer) processQueryPackArchive(qp string, sessionID int) (artifactstore.ArtifactLocation, error) {
|
||||
// These are decoded manually via
|
||||
// base64 -d < foo1 | gunzip | tar t | head -20
|
||||
// base64 decode the body
|
||||
slog.Debug("Extracting query pack")
|
||||
|
||||
tgz, err := base64.StdEncoding.DecodeString(qp)
|
||||
if err != nil {
|
||||
slog.Error("querypack body decoding error:", err)
|
||||
return artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
session_query_pack_tgz_filepath, err := c.v.Artifacts.SaveQueryPack(sessionID, tgz)
|
||||
if err != nil {
|
||||
return artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
return session_query_pack_tgz_filepath, err
|
||||
}
|
||||
@@ -10,60 +10,81 @@ import (
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/storage"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
"mrvacommander/pkg/state"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (c *CommanderSingle) startAnalyses(
|
||||
analysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation,
|
||||
jobID int,
|
||||
queryLanguage string) {
|
||||
slog.Debug("Queueing analysis jobs")
|
||||
|
||||
for nwo := range *analysisRepos {
|
||||
info := common.AnalyzeJob{
|
||||
QueryPackId: jobID,
|
||||
QueryLanguage: queryLanguage,
|
||||
NWO: nwo,
|
||||
}
|
||||
c.v.Queue.Jobs() <- info
|
||||
c.v.State.SetStatus(jobID, nwo, common.StatusQueued)
|
||||
c.v.State.AddJob(jobID, info)
|
||||
}
|
||||
}
|
||||
|
||||
func setupEndpoints(c CommanderAPI) {
|
||||
r := mux.NewRouter()
|
||||
|
||||
//
|
||||
// First are the API endpoints that mirror those used in the github API
|
||||
//
|
||||
// API endpoints that mirror those used in the GitHub API
|
||||
r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses", c.MRVARequest)
|
||||
// /repos/hohn /mrva-controller/code-scanning/codeql/variant-analyses
|
||||
// Or via
|
||||
// Example: /repos/hohn/mrva-controller/code-scanning/codeql/variant-analyses
|
||||
|
||||
// Endpoint using repository ID
|
||||
r.HandleFunc("/{repository_id}/code-scanning/codeql/variant-analyses", c.MRVARequestID)
|
||||
|
||||
// Root handler
|
||||
r.HandleFunc("/", c.RootHandler)
|
||||
|
||||
// This is the standalone status request.
|
||||
// It's also the first request made when downloading; the difference is on the
|
||||
// client side's handling.
|
||||
// Standalone status request
|
||||
// This is also the first request made when downloading; the difference is in the client-side handling.
|
||||
r.HandleFunc("/repos/{owner}/{repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}", c.MRVAStatus)
|
||||
|
||||
// Endpoint for downloading artifacts
|
||||
r.HandleFunc("/repos/{controller_owner}/{controller_repo}/code-scanning/codeql/variant-analyses/{codeql_variant_analysis_id}/repos/{repo_owner}/{repo_name}", c.MRVADownloadArtifact)
|
||||
|
||||
// Not implemented:
|
||||
// r.HandleFunc("/codeql-query-console/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}/{owner_id}/{controller_repo_id}", MRVADownLoad3)
|
||||
// r.HandleFunc("/github-codeql-query-console-prod/codeql-variant-analysis-repo-tasks/{codeql_variant_analysis_id}/{repo_id}", MRVADownLoad4)
|
||||
|
||||
//
|
||||
// Now some support API endpoints
|
||||
//
|
||||
// Support API endpoint
|
||||
r.HandleFunc("/download-server/{local_path:.*}", c.MRVADownloadServe)
|
||||
|
||||
//
|
||||
// Bind to a port and pass our router in
|
||||
//
|
||||
// TODO make this a configuration entry
|
||||
log.Fatal(http.ListenAndServe(":8080", r))
|
||||
// TODO: Make this a configuration entry
|
||||
err := http.ListenAndServe(":8080", r)
|
||||
if err != nil {
|
||||
slog.Error("Error starting server:", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo, vaid int) {
|
||||
slog.Debug("Submitting status response", "session", vaid)
|
||||
|
||||
all_scanned := []common.ScannedRepo{}
|
||||
jobs := storage.GetJobList(js.JobID)
|
||||
jobs := c.v.State.GetJobList(js.JobID)
|
||||
for _, job := range jobs {
|
||||
astat := storage.GetStatus(js.JobID, job.NWO).ToExternalString()
|
||||
astat := c.v.State.GetStatus(js.JobID, job.NWO).ToExternalString()
|
||||
all_scanned = append(all_scanned,
|
||||
common.ScannedRepo{
|
||||
Repository: common.Repository{
|
||||
@@ -81,7 +102,7 @@ func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpe
|
||||
)
|
||||
}
|
||||
|
||||
astat := storage.GetStatus(js.JobID, js.NameWithOwner).ToExternalString()
|
||||
astat := c.v.State.GetStatus(js.JobID, js.NameWithOwner).ToExternalString()
|
||||
|
||||
status := common.StatusResponse{
|
||||
SessionId: js.JobID,
|
||||
@@ -130,7 +151,7 @@ func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
// The status reports one status for all jobs belonging to an id.
|
||||
// So we simply report the status of a job as the status of all.
|
||||
spec := storage.GetJobList(id)
|
||||
spec := c.v.State.GetJobList(id)
|
||||
if spec == nil {
|
||||
msg := "No jobs found for given job id"
|
||||
slog.Error(msg, "id", vars["codeql_variant_analysis_id"])
|
||||
@@ -145,7 +166,7 @@ func (c *CommanderSingle) MRVAStatus(w http.ResponseWriter, r *http.Request) {
|
||||
NameWithOwner: job.NWO,
|
||||
}
|
||||
|
||||
ji := storage.GetJobInfo(js)
|
||||
ji := c.v.State.GetJobInfo(js)
|
||||
|
||||
c.StatusResponse(w, js, ji, id)
|
||||
}
|
||||
@@ -177,39 +198,41 @@ func (c *CommanderSingle) MRVADownloadArtifact(w http.ResponseWriter, r *http.Re
|
||||
c.DownloadResponse(w, js, vaid)
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobSpec, vaid int) {
|
||||
slog.Debug("Forming download response", "session", vaid, "job", js)
|
||||
func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobSpec, jobID int) {
|
||||
var response common.DownloadResponse
|
||||
|
||||
astat := storage.GetStatus(vaid, js.NameWithOwner)
|
||||
slog.Debug("Forming download response", "id", jobID, "job", js)
|
||||
|
||||
var dlr common.DownloadResponse
|
||||
if astat == common.StatusSuccess {
|
||||
jobStatus := c.v.State.GetStatus(jobID, js.NameWithOwner)
|
||||
|
||||
au, err := storage.ArtifactURL(js, vaid)
|
||||
if jobStatus == common.StatusSuccess {
|
||||
jobResult := c.v.State.GetResult(js)
|
||||
// TODO: return this as a URL @hohn
|
||||
jobResultData, err := c.v.Artifacts.GetResult(jobResult.ResultLocation)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
dlr = common.DownloadResponse{
|
||||
response = common.DownloadResponse{
|
||||
Repository: common.DownloadRepo{
|
||||
Name: js.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
|
||||
},
|
||||
AnalysisStatus: astat.ToExternalString(),
|
||||
ResultCount: 123, // FIXME
|
||||
ArtifactSizeBytes: 123, // FIXME
|
||||
AnalysisStatus: jobStatus.ToExternalString(),
|
||||
ResultCount: jobResult.ResultCount,
|
||||
ArtifactSizeBytes: len(jobResultData),
|
||||
DatabaseCommitSha: "do-we-use-dcs-p",
|
||||
SourceLocationPrefix: "do-we-use-slp-p",
|
||||
ArtifactURL: au,
|
||||
ArtifactURL: "TODO", // @hohn
|
||||
}
|
||||
} else {
|
||||
dlr = common.DownloadResponse{
|
||||
response = common.DownloadResponse{
|
||||
Repository: common.DownloadRepo{
|
||||
Name: js.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo),
|
||||
},
|
||||
AnalysisStatus: astat.ToExternalString(),
|
||||
AnalysisStatus: jobStatus.ToExternalString(),
|
||||
ResultCount: 0,
|
||||
ArtifactSizeBytes: 0,
|
||||
DatabaseCommitSha: "",
|
||||
@@ -219,7 +242,7 @@ func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobS
|
||||
}
|
||||
|
||||
// Encode the response as JSON
|
||||
jdlr, err := json.Marshal(dlr)
|
||||
responseJson, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
slog.Error("Error encoding response as JSON:",
|
||||
"error", err)
|
||||
@@ -229,7 +252,7 @@ func (c *CommanderSingle) DownloadResponse(w http.ResponseWriter, js common.JobS
|
||||
|
||||
// Send analysisReposJSON via ResponseWriter
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(jdlr)
|
||||
w.Write(responseJson)
|
||||
|
||||
}
|
||||
|
||||
@@ -243,7 +266,7 @@ func (c *CommanderSingle) MRVADownloadServe(w http.ResponseWriter, r *http.Reque
|
||||
func FileDownload(w http.ResponseWriter, path string) {
|
||||
slog.Debug("Sending zip file with .sarif/.bqrs", "path", path)
|
||||
|
||||
fpath, res, err := storage.ResultAsFile(path)
|
||||
fpath, res, err := state.ResultAsFile(path)
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to read results", http.StatusInternalServerError)
|
||||
return
|
||||
@@ -262,19 +285,18 @@ func FileDownload(w http.ResponseWriter, path string) {
|
||||
}
|
||||
|
||||
slog.Debug("Uploaded file", "path", fpath)
|
||||
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) MRVARequestID(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("New mrva using repository_id=%v\n", vars["repository_id"])
|
||||
slog.Info("New mrva run using repository ID", "id", vars["repository_id"])
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
slog.Info("New mrva run ", "owner", vars["owner"], "repo", vars["repo"])
|
||||
slog.Info("New mrva run", "owner", vars["owner"], "repo", vars["repo"])
|
||||
|
||||
session_id := c.vis.ServerStore.NextID()
|
||||
session_id := c.v.State.NextID()
|
||||
session_owner := vars["owner"]
|
||||
session_controller_repo := vars["repo"]
|
||||
slog.Info("new run", "id: ", fmt.Sprint(session_id), session_owner, session_controller_repo)
|
||||
@@ -283,9 +305,9 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
not_found_repos, analysisRepos := c.vis.ServerStore.FindAvailableDBs(session_repositories)
|
||||
not_found_repos, analysisRepos := c.v.CodeQLDBStore.FindAvailableDBs(session_repositories)
|
||||
|
||||
c.vis.Queue.StartAnalyses(analysisRepos, session_id, session_language)
|
||||
c.startAnalyses(analysisRepos, session_id, session_language)
|
||||
|
||||
si := SessionInfo{
|
||||
ID: session_id,
|
||||
@@ -305,7 +327,7 @@ func (c *CommanderSingle) MRVARequest(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
slog.Debug("Forming and sending response for submitted analysis job", "id", si.ID)
|
||||
submit_response, err := submit_response(si)
|
||||
submit_response, err := c.submitResponse(si)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
@@ -324,23 +346,40 @@ func nwoToNwoStringArray(nwo []common.NameWithOwner) ([]string, int) {
|
||||
return repos, count
|
||||
}
|
||||
|
||||
func submit_response(sn SessionInfo) ([]byte, error) {
|
||||
func nwoToDummyRepositoryArray(nwo []common.NameWithOwner) ([]common.Repository, int) {
|
||||
repos := []common.Repository{}
|
||||
for _, repo := range nwo {
|
||||
repos = append(repos, common.Repository{
|
||||
ID: 0,
|
||||
Name: repo.Repo,
|
||||
FullName: fmt.Sprintf("%s/%s", repo.Owner, repo.Repo),
|
||||
Private: false,
|
||||
StargazersCount: 0,
|
||||
UpdatedAt: time.Now().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
count := len(nwo)
|
||||
|
||||
return repos, count
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) submitResponse(si SessionInfo) ([]byte, error) {
|
||||
// Construct the response bottom-up
|
||||
var m_cr common.ControllerRepo
|
||||
var m_ac common.Actor
|
||||
|
||||
repos, count := nwoToNwoStringArray(sn.NotFoundRepos)
|
||||
repos, count := nwoToNwoStringArray(si.NotFoundRepos)
|
||||
r_nfr := common.NotFoundRepos{RepositoryCount: count, RepositoryFullNames: repos}
|
||||
|
||||
repos, count = nwoToNwoStringArray(sn.AccessMismatchRepos)
|
||||
r_amr := common.AccessMismatchRepos{RepositoryCount: count, Repositories: repos}
|
||||
ra, rac := nwoToDummyRepositoryArray(si.AccessMismatchRepos)
|
||||
r_amr := common.AccessMismatchRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos)
|
||||
r_ncd := common.NoCodeqlDBRepos{RepositoryCount: count, Repositories: repos}
|
||||
ra, rac = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
|
||||
r_ncd := common.NoCodeqlDBRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
// TODO fill these with real values?
|
||||
repos, count = nwoToNwoStringArray(sn.NoCodeqlDBRepos)
|
||||
r_olr := common.OverLimitRepos{RepositoryCount: count, Repositories: repos}
|
||||
ra, rac = nwoToDummyRepositoryArray(si.NoCodeqlDBRepos)
|
||||
r_olr := common.OverLimitRepos{RepositoryCount: rac, Repositories: ra}
|
||||
|
||||
m_skip := common.SkippedRepositories{
|
||||
AccessMismatchRepos: r_amr,
|
||||
@@ -349,11 +388,12 @@ func submit_response(sn SessionInfo) ([]byte, error) {
|
||||
OverLimitRepos: r_olr}
|
||||
|
||||
m_sr := common.SubmitResponse{
|
||||
Actor: m_ac,
|
||||
ControllerRepo: m_cr,
|
||||
ID: sn.ID,
|
||||
QueryLanguage: sn.Language,
|
||||
QueryPackURL: sn.QueryPack,
|
||||
Actor: m_ac,
|
||||
ControllerRepo: m_cr,
|
||||
ID: si.ID,
|
||||
QueryLanguage: si.Language,
|
||||
// TODO: broken, need proper URL using si.data
|
||||
QueryPackURL: "broken-for-now",
|
||||
CreatedAt: time.Now().Format(time.RFC3339),
|
||||
UpdatedAt: time.Now().Format(time.RFC3339),
|
||||
Status: "in_progress",
|
||||
@@ -361,14 +401,16 @@ func submit_response(sn SessionInfo) ([]byte, error) {
|
||||
}
|
||||
|
||||
// Store data needed later
|
||||
joblist := storage.GetJobList(sn.ID)
|
||||
// joblist := state.GetJobList(si.ID)
|
||||
// (si.JobID)?
|
||||
joblist := c.v.State.GetJobList(si.ID)
|
||||
|
||||
for _, job := range joblist {
|
||||
storage.SetJobInfo(common.JobSpec{
|
||||
JobID: sn.ID,
|
||||
c.v.State.SetJobInfo(common.JobSpec{
|
||||
JobID: si.ID,
|
||||
NameWithOwner: job.NWO,
|
||||
}, common.JobInfo{
|
||||
QueryLanguage: sn.Language,
|
||||
QueryLanguage: si.Language,
|
||||
CreatedAt: m_sr.CreatedAt,
|
||||
UpdatedAt: m_sr.UpdatedAt,
|
||||
SkippedRepositories: m_skip,
|
||||
@@ -379,35 +421,35 @@ func submit_response(sn SessionInfo) ([]byte, error) {
|
||||
// Encode the response as JSON
|
||||
submit_response, err := json.Marshal(m_sr)
|
||||
if err != nil {
|
||||
slog.Warn("Error encoding response as JSON:", err)
|
||||
slog.Warn("Error encoding response as JSON:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return submit_response, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, string, error) {
|
||||
func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.NameWithOwner, artifactstore.ArtifactLocation, error) {
|
||||
slog.Debug("Collecting session info")
|
||||
|
||||
if r.Body == nil {
|
||||
err := errors.New("missing request body")
|
||||
log.Println(err)
|
||||
http.Error(w, err.Error(), http.StatusNoContent)
|
||||
return "", []common.NameWithOwner{}, "", err
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
buf, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
var w http.ResponseWriter
|
||||
slog.Error("Error reading MRVA submission body", "error", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, "", err
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
msg, err := TrySubmitMsg(buf)
|
||||
if err != nil {
|
||||
// Unknown message
|
||||
slog.Error("Unknown MRVA submission body format")
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, "", err
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
// Decompose the SubmitMsg and keep information
|
||||
|
||||
@@ -416,12 +458,12 @@ func (c *CommanderSingle) collectRequestInfo(w http.ResponseWriter, r *http.Requ
|
||||
slog.Error("MRVA submission body querypack has invalid format")
|
||||
err := errors.New("MRVA submission body querypack has invalid format")
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, "", err
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
session_tgz_ref, err := c.extract_tgz(msg.QueryPack, sessionId)
|
||||
session_tgz_ref, err := c.processQueryPackArchive(msg.QueryPack, sessionId)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return "", []common.NameWithOwner{}, "", err
|
||||
return "", []common.NameWithOwner{}, artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
// 2. Save the language
|
||||
@@ -479,7 +521,7 @@ func isBase64Gzip(val []byte) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error) {
|
||||
func (c *CommanderSingle) processQueryPackArchive(qp string, sessionID int) (artifactstore.ArtifactLocation, error) {
|
||||
// These are decoded manually via
|
||||
// base64 -d < foo1 | gunzip | tar t | head -20
|
||||
// base64 decode the body
|
||||
@@ -488,12 +530,12 @@ func (c *CommanderSingle) extract_tgz(qp string, sessionID int) (string, error)
|
||||
tgz, err := base64.StdEncoding.DecodeString(qp)
|
||||
if err != nil {
|
||||
slog.Error("querypack body decoding error:", err)
|
||||
return "", err
|
||||
return artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
session_query_pack_tgz_filepath, err := c.vis.ServerStore.SaveQueryPack(tgz, sessionID)
|
||||
session_query_pack_tgz_filepath, err := c.v.Artifacts.SaveQueryPack(sessionID, tgz)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return artifactstore.ArtifactLocation{}, err
|
||||
}
|
||||
|
||||
return session_query_pack_tgz_filepath, err
|
||||
|
||||
@@ -1,57 +1,50 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/artifactstore"
|
||||
"mrvacommander/pkg/common"
|
||||
"mrvacommander/pkg/logger"
|
||||
"mrvacommander/pkg/qldbstore"
|
||||
"mrvacommander/pkg/qpstore"
|
||||
"mrvacommander/pkg/queue"
|
||||
"mrvacommander/pkg/storage"
|
||||
"mrvacommander/pkg/state"
|
||||
)
|
||||
|
||||
type SessionInfo struct {
|
||||
ID int
|
||||
Owner string
|
||||
ControllerRepo string
|
||||
|
||||
QueryPack string
|
||||
Language string
|
||||
Repositories []common.NameWithOwner
|
||||
|
||||
ID int
|
||||
Owner string
|
||||
ControllerRepo string
|
||||
QueryPack artifactstore.ArtifactLocation
|
||||
Language string
|
||||
Repositories []common.NameWithOwner
|
||||
AccessMismatchRepos []common.NameWithOwner
|
||||
NotFoundRepos []common.NameWithOwner
|
||||
NoCodeqlDBRepos []common.NameWithOwner
|
||||
OverLimitRepos []common.NameWithOwner
|
||||
|
||||
AnalysisRepos *map[common.NameWithOwner]storage.DBLocation
|
||||
AnalysisRepos *map[common.NameWithOwner]qldbstore.CodeQLDatabaseLocation
|
||||
}
|
||||
|
||||
type CommanderSingle struct {
|
||||
vis *Visibles
|
||||
v *Visibles
|
||||
}
|
||||
|
||||
type CommanderContainer struct {
|
||||
v *Visibles
|
||||
}
|
||||
|
||||
func NewCommanderSingle(st *Visibles) *CommanderSingle {
|
||||
c := CommanderSingle{}
|
||||
|
||||
c := CommanderSingle{v: st}
|
||||
setupEndpoints(&c)
|
||||
|
||||
return &c
|
||||
}
|
||||
|
||||
// type State struct {
|
||||
// Commander Commander
|
||||
// Logger logger.Logger
|
||||
// Queue queue.Queue
|
||||
// Storage storage.Storage
|
||||
// Runner agent.Runner
|
||||
// }
|
||||
func NewCommanderContainer(st *Visibles) *CommanderContainer {
|
||||
c := CommanderContainer{v: st}
|
||||
setupEndpoints(&c)
|
||||
return &c
|
||||
}
|
||||
|
||||
type Visibles struct {
|
||||
Logger logger.Logger
|
||||
Queue queue.Queue
|
||||
ServerStore storage.Storage
|
||||
// TODO extra package for query pack storage
|
||||
QueryPackStore qpstore.Storage
|
||||
// TODO extra package for ql db storage
|
||||
QLDBStore qldbstore.Storage
|
||||
Queue queue.Queue
|
||||
State state.CommonState
|
||||
Artifacts artifactstore.ArtifactStore
|
||||
CodeQLDBStore qldbstore.CodeQLDatabaseStore
|
||||
}
|
||||
|
||||
86
pkg/state/containerstate.go
Normal file
86
pkg/state/containerstate.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"mrvacommander/pkg/common"
|
||||
)
|
||||
|
||||
type ContainerState struct {
|
||||
jobs map[int][]common.AnalyzeJob
|
||||
info map[common.JobSpec]common.JobInfo
|
||||
status map[common.JobSpec]common.Status
|
||||
result map[common.JobSpec]common.AnalyzeResult
|
||||
mutex sync.Mutex
|
||||
currentID int
|
||||
}
|
||||
|
||||
func NewContainerState(startingID int) *ContainerState {
|
||||
return &ContainerState{
|
||||
jobs: make(map[int][]common.AnalyzeJob),
|
||||
info: make(map[common.JobSpec]common.JobInfo),
|
||||
status: make(map[common.JobSpec]common.Status),
|
||||
result: make(map[common.JobSpec]common.AnalyzeResult),
|
||||
currentID: startingID,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ContainerState) NextID() int {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
s.currentID++
|
||||
return s.currentID
|
||||
}
|
||||
|
||||
func (s *ContainerState) GetArtifactURL(js common.JobSpec, vaid int) (string, error) {
|
||||
// TODO: have the server convert an artifact to a URL temporarily hosted on the
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (s *ContainerState) GetResult(js common.JobSpec) common.AnalyzeResult {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.result[js]
|
||||
}
|
||||
|
||||
func (s *ContainerState) SetResult(jobID int, nwo common.NameWithOwner, analyzeResult common.AnalyzeResult) {
|
||||
s.mutex.Lock()
|
||||
s.result[common.JobSpec{JobID: jobID, NameWithOwner: nwo}] = analyzeResult
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *ContainerState) GetJobList(jobID int) []common.AnalyzeJob {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.jobs[jobID]
|
||||
}
|
||||
|
||||
func (s *ContainerState) GetJobInfo(js common.JobSpec) common.JobInfo {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.info[js]
|
||||
}
|
||||
|
||||
func (s *ContainerState) SetJobInfo(js common.JobSpec, ji common.JobInfo) {
|
||||
s.mutex.Lock()
|
||||
s.info[js] = ji
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *ContainerState) GetStatus(jobID int, nwo common.NameWithOwner) common.Status {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.status[common.JobSpec{JobID: jobID, NameWithOwner: nwo}]
|
||||
}
|
||||
|
||||
func (s *ContainerState) SetStatus(jobID int, nwo common.NameWithOwner, status common.Status) {
|
||||
s.mutex.Lock()
|
||||
s.status[common.JobSpec{JobID: jobID, NameWithOwner: nwo}] = status
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *ContainerState) AddJob(jobID int, job common.AnalyzeJob) {
|
||||
s.mutex.Lock()
|
||||
s.jobs[jobID] = append(s.jobs[jobID], job)
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
50
pkg/state/interfaces.go
Normal file
50
pkg/state/interfaces.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package state
|
||||
|
||||
import "mrvacommander/pkg/common"
|
||||
|
||||
// StorageInterface defines the methods required for managing storage operations
|
||||
// related to server state, e.g. job status, results, and artifacts.
|
||||
type ServerState interface {
|
||||
// NextID increments and returns the next unique ID for a session.
|
||||
NextID() int
|
||||
|
||||
// GetResult retrieves the analysis result for the specified job.
|
||||
GetResult(js common.JobSpec) common.AnalyzeResult
|
||||
|
||||
// SetResult stores the analysis result for the specified session ID and repository.
|
||||
SetResult(jobID int, nwo common.NameWithOwner, ar common.AnalyzeResult)
|
||||
|
||||
// GetJobList retrieves the list of analysis jobs for the specified session ID.
|
||||
GetJobList(jobID int) []common.AnalyzeJob
|
||||
|
||||
// GetJobInfo retrieves the job information for the specified job specification.
|
||||
GetJobInfo(js common.JobSpec) common.JobInfo
|
||||
|
||||
// SetJobInfo stores the job information for the specified job specification.
|
||||
SetJobInfo(js common.JobSpec, ji common.JobInfo)
|
||||
|
||||
// GetStatus retrieves the status of a job for the specified session ID and repository.
|
||||
GetStatus(jobID int, nwo common.NameWithOwner) common.Status
|
||||
|
||||
// ResultAsFile reads and returns the content of a result file from the specified path.
|
||||
ResultAsFile(path string) (string, []byte, error)
|
||||
|
||||
// SetStatus stores the status of a job for the specified session ID and repository.
|
||||
SetStatus(jobID int, nwo common.NameWithOwner, status common.Status)
|
||||
|
||||
// AddJob adds an analysis job to the list of jobs for the specified session ID.
|
||||
AddJob(jobID int, job common.AnalyzeJob)
|
||||
}
|
||||
|
||||
type CommonState interface {
|
||||
NextID() int
|
||||
GetArtifactURL(js common.JobSpec, vaid int) (string, error)
|
||||
GetResult(js common.JobSpec) common.AnalyzeResult
|
||||
SetResult(jobID int, nwo common.NameWithOwner, analyzeResult common.AnalyzeResult)
|
||||
GetJobList(jobID int) []common.AnalyzeJob
|
||||
GetJobInfo(js common.JobSpec) common.JobInfo
|
||||
SetJobInfo(js common.JobSpec, ji common.JobInfo)
|
||||
GetStatus(jobID int, nwo common.NameWithOwner) common.Status
|
||||
SetStatus(jobID int, nwo common.NameWithOwner, status common.Status)
|
||||
AddJob(jobID int, job common.AnalyzeJob)
|
||||
}
|
||||
105
pkg/state/state_local.go
Normal file
105
pkg/state/state_local.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"mrvacommander/pkg/common"
|
||||
)
|
||||
|
||||
type LocalState struct {
|
||||
jobs map[int][]common.AnalyzeJob
|
||||
info map[common.JobSpec]common.JobInfo
|
||||
status map[common.JobSpec]common.Status
|
||||
result map[common.JobSpec]common.AnalyzeResult
|
||||
mutex sync.Mutex
|
||||
currentID int
|
||||
}
|
||||
|
||||
func NewLocalState(startingID int) *LocalState {
|
||||
return &LocalState{
|
||||
jobs: make(map[int][]common.AnalyzeJob),
|
||||
info: make(map[common.JobSpec]common.JobInfo),
|
||||
status: make(map[common.JobSpec]common.Status),
|
||||
result: make(map[common.JobSpec]common.AnalyzeResult),
|
||||
currentID: startingID,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalState) NextID() int {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
s.currentID++
|
||||
return s.currentID
|
||||
}
|
||||
|
||||
func (s *LocalState) GetArtifactURL(js common.JobSpec, vaid int) (string, error) {
|
||||
// TODO: have the server convert an artifact to a URL temporarily hosted on the
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (s *LocalState) GetResult(js common.JobSpec) common.AnalyzeResult {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.result[js]
|
||||
}
|
||||
|
||||
func (s *LocalState) SetResult(jobID int, nwo common.NameWithOwner, analyzeResult common.AnalyzeResult) {
|
||||
s.mutex.Lock()
|
||||
s.result[common.JobSpec{JobID: jobID, NameWithOwner: nwo}] = analyzeResult
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *LocalState) GetJobList(jobID int) []common.AnalyzeJob {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.jobs[jobID]
|
||||
}
|
||||
|
||||
func (s *LocalState) GetJobInfo(js common.JobSpec) common.JobInfo {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.info[js]
|
||||
}
|
||||
|
||||
func (s *LocalState) SetJobInfo(js common.JobSpec, ji common.JobInfo) {
|
||||
s.mutex.Lock()
|
||||
s.info[js] = ji
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *LocalState) GetStatus(jobID int, nwo common.NameWithOwner) common.Status {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return s.status[common.JobSpec{JobID: jobID, NameWithOwner: nwo}]
|
||||
}
|
||||
|
||||
func (s *LocalState) SetStatus(jobID int, nwo common.NameWithOwner, status common.Status) {
|
||||
s.mutex.Lock()
|
||||
s.status[common.JobSpec{JobID: jobID, NameWithOwner: nwo}] = status
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (s *LocalState) AddJob(jobID int, job common.AnalyzeJob) {
|
||||
s.mutex.Lock()
|
||||
s.jobs[jobID] = append(s.jobs[jobID], job)
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
// TODO: @hohn
|
||||
func ResultAsFile(path string) (string, []byte, error) {
|
||||
fpath := path
|
||||
if !filepath.IsAbs(path) {
|
||||
fpath = "/" + path
|
||||
}
|
||||
|
||||
file, err := os.ReadFile(fpath)
|
||||
if err != nil {
|
||||
slog.Warn("Failed to read results file", fpath, err)
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return fpath, file, nil
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
package storage
|
||||
|
||||
import "mrvacommander/pkg/common"
|
||||
|
||||
type Storage interface {
|
||||
NextID() int
|
||||
SaveQueryPack(tgz []byte, sessionID int) (storagePath string, error error)
|
||||
FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
|
||||
analysisRepos *map[common.NameWithOwner]DBLocation)
|
||||
}
|
||||
@@ -1,244 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"mrvacommander/pkg/common"
|
||||
)
|
||||
|
||||
var (
|
||||
jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob)
|
||||
info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo)
|
||||
status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status)
|
||||
result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult)
|
||||
mutex sync.Mutex
|
||||
)
|
||||
|
||||
func NewStorageSingle(startingID int, v *Visibles) *StorageSingle {
|
||||
s := StorageSingle{currentID: startingID}
|
||||
|
||||
s.modules = v
|
||||
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s *StorageSingle) NextID() int {
|
||||
s.currentID += 1
|
||||
return s.currentID
|
||||
}
|
||||
|
||||
func (s *StorageSingle) SaveQueryPack(tgz []byte, sessionId int) (string, error) {
|
||||
// Save the tar.gz body
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
slog.Error("No working directory")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
dirpath := path.Join(cwd, "var", "codeql", "querypacks")
|
||||
if err := os.MkdirAll(dirpath, 0755); err != nil {
|
||||
slog.Error("Unable to create query pack output directory",
|
||||
"dir", dirpath)
|
||||
return "", err
|
||||
}
|
||||
|
||||
fpath := path.Join(dirpath, fmt.Sprintf("qp-%d.tgz", sessionId))
|
||||
err = os.WriteFile(fpath, tgz, 0644)
|
||||
if err != nil {
|
||||
slog.Error("unable to save querypack body decoding error", "path", fpath)
|
||||
return "", err
|
||||
} else {
|
||||
slog.Info("Query pack saved to ", "path", fpath)
|
||||
}
|
||||
|
||||
return fpath, nil
|
||||
}
|
||||
|
||||
// Determine for which repositories codeql databases are available.
|
||||
//
|
||||
// Those will be the analysis_repos. The rest will be skipped.
|
||||
func (s *StorageSingle) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (not_found_repos []common.NameWithOwner,
|
||||
analysisRepos *map[common.NameWithOwner]DBLocation) {
|
||||
slog.Debug("Looking for available CodeQL databases")
|
||||
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
slog.Error("No working directory")
|
||||
return
|
||||
}
|
||||
|
||||
analysisRepos = &map[common.NameWithOwner]DBLocation{}
|
||||
|
||||
not_found_repos = []common.NameWithOwner{}
|
||||
|
||||
for _, rep := range analysisReposRequested {
|
||||
dbPrefix := filepath.Join(cwd, "codeql", "dbs", rep.Owner, rep.Repo)
|
||||
dbName := fmt.Sprintf("%s_%s_db.zip", rep.Owner, rep.Repo)
|
||||
dbPath := filepath.Join(dbPrefix, dbName)
|
||||
|
||||
if _, err := os.Stat(dbPath); errors.Is(err, fs.ErrNotExist) {
|
||||
slog.Info("Database does not exist for repository ", "owner/repo", rep,
|
||||
"path", dbPath)
|
||||
not_found_repos = append(not_found_repos, rep)
|
||||
} else {
|
||||
slog.Info("Found database for ", "owner/repo", rep, "path", dbPath)
|
||||
(*analysisRepos)[rep] = DBLocation{Prefix: dbPrefix, File: dbName}
|
||||
}
|
||||
}
|
||||
return not_found_repos, analysisRepos
|
||||
}
|
||||
|
||||
func ArtifactURL(js common.JobSpec, vaid int) (string, error) {
|
||||
// We're looking for paths like
|
||||
// codeql/sarif/google/flatbuffers/google_flatbuffers.sarif
|
||||
|
||||
ar := GetResult(js)
|
||||
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
slog.Error("No host name found")
|
||||
return "", nil
|
||||
}
|
||||
|
||||
zfpath, err := PackageResults(ar, js.NameWithOwner, vaid)
|
||||
if err != nil {
|
||||
slog.Error("Error packaging results:", "error", err)
|
||||
return "", err
|
||||
}
|
||||
// TODO Need url valid in container network and externally
|
||||
// For now, we assume the container port 8080 is port 8080 on user's machine
|
||||
hostname = "localhost"
|
||||
au := fmt.Sprintf("http://%s:8080/download-server/%s", hostname, zfpath)
|
||||
return au, nil
|
||||
}
|
||||
|
||||
func GetResult(js common.JobSpec) common.AnalyzeResult {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
ar := result[js]
|
||||
return ar
|
||||
}
|
||||
|
||||
func SetResult(sessionid int, nwo common.NameWithOwner, ar common.AnalyzeResult) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
result[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = ar
|
||||
}
|
||||
|
||||
func PackageResults(ar common.AnalyzeResult, owre common.NameWithOwner, vaid int) (zipPath string, e error) {
|
||||
slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar)
|
||||
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
slog.Error("No working directory")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Ensure the output directory exists
|
||||
dirpath := path.Join(cwd, "var", "codeql", "localrun", "results")
|
||||
if err := os.MkdirAll(dirpath, 0755); err != nil {
|
||||
slog.Error("Unable to create results output directory",
|
||||
"dir", dirpath)
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Create a new zip file
|
||||
zpath := path.Join(dirpath, fmt.Sprintf("results-%s-%s-%d.zip", owre.Owner, owre.Repo, vaid))
|
||||
|
||||
zfile, err := os.Create(zpath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer zfile.Close()
|
||||
|
||||
// Create a new zip writer
|
||||
zwriter := zip.NewWriter(zfile)
|
||||
defer zwriter.Close()
|
||||
|
||||
// Add each result file to the zip archive
|
||||
/*
|
||||
names := []([]string){{ar.RunAnalysisSARIF, "results.sarif"}}
|
||||
for _, fpath := range names {
|
||||
file, err := os.Open(fpath[0])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Create a new file in the zip archive with custom name
|
||||
// The client is very specific:
|
||||
// if zf.Name != "results.sarif" && zf.Name != "results.bqrs" { continue }
|
||||
|
||||
zipEntry, err := zwriter.Create(fpath[1])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Copy the contents of the file to the zip entry
|
||||
_, err = io.Copy(zipEntry, file)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
*/
|
||||
return zpath, nil
|
||||
}
|
||||
|
||||
func GetJobList(sessionid int) []common.AnalyzeJob {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return jobs[sessionid]
|
||||
}
|
||||
|
||||
func GetJobInfo(js common.JobSpec) common.JobInfo {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return info[js]
|
||||
}
|
||||
|
||||
func SetJobInfo(js common.JobSpec, ji common.JobInfo) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
info[js] = ji
|
||||
}
|
||||
|
||||
func GetStatus(sessionid int, nwo common.NameWithOwner) common.Status {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}]
|
||||
}
|
||||
|
||||
func ResultAsFile(path string) (string, []byte, error) {
|
||||
fpath := path
|
||||
if !filepath.IsAbs(path) {
|
||||
fpath = "/" + path
|
||||
}
|
||||
|
||||
file, err := os.ReadFile(fpath)
|
||||
if err != nil {
|
||||
slog.Warn("Failed to read results file", fpath, err)
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return fpath, file, nil
|
||||
}
|
||||
|
||||
func SetStatus(sessionid int, nwo common.NameWithOwner, s common.Status) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
status[common.JobSpec{JobID: sessionid, NameWithOwner: nwo}] = s
|
||||
}
|
||||
|
||||
func AddJob(sessionid int, job common.AnalyzeJob) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
jobs[sessionid] = append(jobs[sessionid], job)
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"mrvacommander/pkg/common"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type DBLocation struct {
|
||||
Prefix string
|
||||
File string
|
||||
}
|
||||
|
||||
type StorageSingle struct {
|
||||
currentID int
|
||||
modules *Visibles
|
||||
}
|
||||
|
||||
type DBSpec struct {
|
||||
Host string
|
||||
Port int
|
||||
User string
|
||||
Password string
|
||||
DBname string
|
||||
}
|
||||
|
||||
type DBInfo struct {
|
||||
// Database version of
|
||||
// info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo)
|
||||
gorm.Model
|
||||
JobSpec common.JobSpec `gorm:"type:jsonb"`
|
||||
JobInfo common.JobInfo `gorm:"type:jsonb"`
|
||||
}
|
||||
|
||||
type DBJobs struct {
|
||||
// Database version of
|
||||
// jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob)
|
||||
gorm.Model
|
||||
JobKey int
|
||||
AnalyzeJob common.AnalyzeJob `gorm:"type:jsonb"`
|
||||
}
|
||||
|
||||
type DBResult struct {
|
||||
// Database version of
|
||||
// result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult)
|
||||
gorm.Model
|
||||
JobSpec common.JobSpec `gorm:"type:jsonb"`
|
||||
AnalyzeResult common.AnalyzeResult `gorm:"type:jsonb"`
|
||||
}
|
||||
|
||||
type DBStatus struct {
|
||||
// Database version of
|
||||
// status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status)
|
||||
gorm.Model
|
||||
JobSpec common.JobSpec `gorm:"type:jsonb"`
|
||||
Status common.Status `gorm:"type:jsonb"`
|
||||
}
|
||||
|
||||
type StorageContainer struct {
|
||||
// Database version of StorageSingle
|
||||
RequestID int
|
||||
DB *gorm.DB
|
||||
modules *Visibles
|
||||
}
|
||||
|
||||
type Visibles struct{}
|
||||
Reference in New Issue
Block a user