Add hepc integration
This commit is contained in:
committed by
=Michael Hohn
parent
4140eaafc4
commit
23e3ea9367
3
.gitignore
vendored
3
.gitignore
vendored
@@ -57,3 +57,6 @@ notes/*.html
|
|||||||
|
|
||||||
# Make timestamp files
|
# Make timestamp files
|
||||||
mk.*
|
mk.*
|
||||||
|
demo/containers/dbsdata/data/
|
||||||
|
demo/containers/dbsdata/tmp.dbsdata_backup.tar
|
||||||
|
client/qldbtools/db-collection-py-1/
|
||||||
|
|||||||
@@ -4,11 +4,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/hohn/mrvacommander/pkg/artifactstore"
|
"github.com/hohn/mrvacommander/pkg/artifactstore"
|
||||||
"github.com/hohn/mrvacommander/pkg/qldbstore"
|
"github.com/hohn/mrvacommander/pkg/qldbstore"
|
||||||
"github.com/hohn/mrvacommander/pkg/queue"
|
"github.com/hohn/mrvacommander/pkg/queue"
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func validateEnvVars(requiredEnvVars []string) {
|
func validateEnvVars(requiredEnvVars []string) {
|
||||||
@@ -94,3 +95,17 @@ func InitMinIOCodeQLDatabaseStore() (qldbstore.Store, error) {
|
|||||||
|
|
||||||
return store, nil
|
return store, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func InitHEPCDatabaseStore() (qldbstore.Store, error) {
|
||||||
|
requiredEnvVars := []string{
|
||||||
|
"MRVA_HEPC_ENDPOINT",
|
||||||
|
"MRVA_HEPC_CACHE_DURATION",
|
||||||
|
}
|
||||||
|
validateEnvVars(requiredEnvVars)
|
||||||
|
|
||||||
|
endpoint := os.Getenv("MRVA_HEPC_ENDPOINT")
|
||||||
|
|
||||||
|
store := qldbstore.NewHepcStore(endpoint)
|
||||||
|
|
||||||
|
return store, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,13 +4,25 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hohn/mrvacommander/pkg/common"
|
"github.com/hohn/mrvacommander/pkg/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultCacheDurationMinutes = 60
|
||||||
|
|
||||||
type HepcStore struct {
|
type HepcStore struct {
|
||||||
Endpoint string
|
Endpoint string
|
||||||
|
metadataCache []HepcResult
|
||||||
|
cacheLastUpdated time.Time
|
||||||
|
cacheMutex sync.Mutex
|
||||||
|
cacheDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type HepcResult struct {
|
type HepcResult struct {
|
||||||
@@ -26,28 +38,42 @@ type HepcResult struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewHepcStore(endpoint string) *HepcStore {
|
func NewHepcStore(endpoint string) *HepcStore {
|
||||||
return &HepcStore{Endpoint: endpoint}
|
cacheDuration := getMetaCacheDuration()
|
||||||
|
return &HepcStore{
|
||||||
|
Endpoint: endpoint,
|
||||||
|
cacheDuration: cacheDuration,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HepcStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
func getMetaCacheDuration() time.Duration {
|
||||||
notFoundRepos []common.NameWithOwner,
|
durationStr := os.Getenv("MRVA_HEPC_CACHE_DURATION")
|
||||||
foundRepos []common.NameWithOwner) {
|
if durationStr == "" {
|
||||||
|
return time.Minute * defaultCacheDurationMinutes
|
||||||
|
}
|
||||||
|
duration, err := strconv.Atoi(durationStr)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("Invalid MRVA_HEPC_CACHE_DURATION value. Using default",
|
||||||
|
durationStr, defaultCacheDurationMinutes,
|
||||||
|
)
|
||||||
|
return time.Minute * defaultCacheDurationMinutes
|
||||||
|
}
|
||||||
|
return time.Minute * time.Duration(duration)
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch the metadata.json from the Hepc server
|
func (h *HepcStore) fetchMetadata() ([]HepcResult, error) {
|
||||||
url := fmt.Sprintf("%s/index", h.Endpoint)
|
url := fmt.Sprintf("%s/index", h.Endpoint)
|
||||||
resp, err := http.Get(url)
|
resp, err := http.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error fetching metadata: %v\n", err)
|
slog.Warn("Error fetching metadata.", err)
|
||||||
return analysisReposRequested, nil
|
return nil, fmt.Errorf("error fetching metadata: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
fmt.Printf("Non-OK HTTP status: %s\n", resp.Status)
|
slog.Warn("Non-OK HTTP status", resp.Status)
|
||||||
return analysisReposRequested, nil
|
return nil, fmt.Errorf("non-OK HTTP status: %s", resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode the response
|
|
||||||
var results []HepcResult
|
var results []HepcResult
|
||||||
decoder := json.NewDecoder(resp.Body)
|
decoder := json.NewDecoder(resp.Body)
|
||||||
for {
|
for {
|
||||||
@@ -55,15 +81,38 @@ func (h *HepcStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwn
|
|||||||
if err := decoder.Decode(&result); err == io.EOF {
|
if err := decoder.Decode(&result); err == io.EOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
fmt.Printf("Error decoding JSON: %v\n", err)
|
slog.Warn("Error decoding JSON", err)
|
||||||
return analysisReposRequested, nil
|
return nil, fmt.Errorf("error decoding JSON: %w", err)
|
||||||
}
|
}
|
||||||
results = append(results, result)
|
results = append(results, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HepcStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) (
|
||||||
|
notFoundRepos []common.NameWithOwner,
|
||||||
|
foundRepos []common.NameWithOwner) {
|
||||||
|
|
||||||
|
// Check cache
|
||||||
|
h.cacheMutex.Lock()
|
||||||
|
if time.Since(h.cacheLastUpdated) > h.cacheDuration {
|
||||||
|
// Cache is expired or not set; refresh
|
||||||
|
results, err := h.fetchMetadata()
|
||||||
|
if err != nil {
|
||||||
|
h.cacheMutex.Unlock()
|
||||||
|
slog.Warn("Error fetching metadata", err)
|
||||||
|
return analysisReposRequested, nil
|
||||||
|
}
|
||||||
|
h.metadataCache = results
|
||||||
|
h.cacheLastUpdated = time.Now()
|
||||||
|
}
|
||||||
|
cachedResults := h.metadataCache
|
||||||
|
h.cacheMutex.Unlock()
|
||||||
|
|
||||||
// Compare against requested repos
|
// Compare against requested repos
|
||||||
repoSet := make(map[string]struct{})
|
repoSet := make(map[string]struct{})
|
||||||
for _, result := range results {
|
for _, result := range cachedResults {
|
||||||
repoSet[result.Projname] = struct{}{}
|
repoSet[result.Projname] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,42 +129,67 @@ func (h *HepcStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwn
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *HepcStore) GetDatabase(location common.NameWithOwner) ([]byte, error) {
|
func (h *HepcStore) GetDatabase(location common.NameWithOwner) ([]byte, error) {
|
||||||
// Fetch the latest results for the specified repository
|
// Ensure metadata is up-to-date by using the cache
|
||||||
url := fmt.Sprintf("%s/api/v1/latest_results/codeql-all", h.Endpoint)
|
h.cacheMutex.Lock()
|
||||||
resp, err := http.Get(url)
|
if time.Since(h.cacheLastUpdated) > h.cacheDuration {
|
||||||
|
// Refresh the metadata cache if it is stale
|
||||||
|
results, err := h.fetchMetadata()
|
||||||
|
if err != nil {
|
||||||
|
h.cacheMutex.Unlock()
|
||||||
|
return nil, fmt.Errorf("error refreshing metadata cache: %w", err)
|
||||||
|
}
|
||||||
|
h.metadataCache = results
|
||||||
|
h.cacheLastUpdated = time.Now()
|
||||||
|
}
|
||||||
|
cachedResults := h.metadataCache
|
||||||
|
h.cacheMutex.Unlock()
|
||||||
|
|
||||||
|
// Construct the key for the requested database
|
||||||
|
key := fmt.Sprintf("%s/%s", location.Owner, location.Repo)
|
||||||
|
|
||||||
|
// Locate the result URL in the cached metadata
|
||||||
|
var resultURL string
|
||||||
|
for _, result := range cachedResults {
|
||||||
|
if result.Projname == key {
|
||||||
|
resultURL = result.ResultURL
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if resultURL == "" {
|
||||||
|
return nil, fmt.Errorf("database not found for repository: %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the database content
|
||||||
|
resp, err := http.Get(replaceHepcURL(resultURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error fetching database metadata: %w", err)
|
return nil, fmt.Errorf("error fetching database: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return nil, fmt.Errorf("non-OK HTTP status: %s", resp.Status)
|
return nil, fmt.Errorf("non-OK HTTP status for database fetch: %s", resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
var latestResults []HepcResult
|
// Read and return the database data
|
||||||
decoder := json.NewDecoder(resp.Body)
|
data, err := io.ReadAll(resp.Body)
|
||||||
if err := decoder.Decode(&latestResults); err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error decoding JSON: %w", err)
|
return nil, fmt.Errorf("error reading database content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the correct result for the requested repo
|
return data, nil
|
||||||
repoKey := fmt.Sprintf("%s/%s", location.Owner, location.Repo)
|
}
|
||||||
for _, result := range latestResults {
|
|
||||||
if result.Projname == repoKey {
|
// replaceHepcURL replaces the fixed "http://hepc" with the value from
|
||||||
// Fetch the database as a byte slice
|
// MRVA_HEPC_ENDPOINT
|
||||||
dbResp, err := http.Get(result.ResultURL)
|
func replaceHepcURL(originalURL string) string {
|
||||||
if err != nil {
|
hepcEndpoint := os.Getenv("MRVA_HEPC_ENDPOINT")
|
||||||
return nil, fmt.Errorf("error fetching database: %w", err)
|
if hepcEndpoint == "" {
|
||||||
}
|
hepcEndpoint = "http://hepc:8070" // Default fallback
|
||||||
defer dbResp.Body.Close()
|
}
|
||||||
|
|
||||||
if dbResp.StatusCode != http.StatusOK {
|
// Replace "http://hepc" at the beginning of the URL
|
||||||
return nil, fmt.Errorf("non-OK HTTP status for database fetch: %s", dbResp.Status)
|
newURL := strings.Replace(originalURL, "http://hepc", hepcEndpoint, 1)
|
||||||
}
|
|
||||||
|
return newURL
|
||||||
return io.ReadAll(dbResp.Body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("database not found for repository: %s", repoKey)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user