Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ func main() {
workerChannel := rabbitmq.NewRabbitMQChannel(conn)

// Initialize the services
storage := storage.NewStorage(config.StorageBaseUrl)
storageService := storage.NewStorage(config.StorageBaseUrl)
fileCache := storage.NewFileCache(config.CacheDirPath)
if err := fileCache.InitCache(); err != nil {
logger.Fatalf("Failed to initialize file cache: %v", err)
}
compiler := compiler.NewCompiler()
packager := packager.NewPackager(storage)
packager := packager.NewPackager(storageService, fileCache)
executor := executor.NewExecutor(dCli)
verifier := verifier.NewVerifier(config.VerifierFlags)
responder := responder.NewResponder(workerChannel, config.PublishChanSize)
Expand Down
1 change: 1 addition & 0 deletions generate_mocks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ INTERFACES=(
"internal/storage Storage"
"internal/pipeline Worker"
"internal/docker DockerClient"
"internal/storage FileCache"
)


Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/rabbitmq/amqp091-go v1.10.0
github.com/stretchr/testify v1.10.0
go.uber.org/mock v0.6.0
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand All @@ -17,6 +18,7 @@ require (
require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand All @@ -31,6 +33,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
Expand All @@ -40,5 +43,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/time v0.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.2 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:C
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw=
Expand All @@ -59,6 +63,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down Expand Up @@ -139,6 +145,9 @@ google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
15 changes: 15 additions & 0 deletions internal/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
ConsumeQueueName string
MaxWorkers int
VerifierFlags []string
CacheDirPath string
}

func NewConfig() *Config {
Expand All @@ -43,6 +44,7 @@ func NewConfig() *Config {
storageBaseUrl := storageConfig()
workerQueueName, maxWorkers := workerConfig()
verifierFlagsStr := verifierConfig()
cacheDirPath := cacheConfig()

return &Config{
RabbitMQURL: rabbitmqURL,
Expand All @@ -51,6 +53,7 @@ func NewConfig() *Config {
ConsumeQueueName: workerQueueName,
MaxWorkers: maxWorkers,
VerifierFlags: verifierFlagsStr,
CacheDirPath: cacheDirPath,
}
}

Expand Down Expand Up @@ -157,3 +160,15 @@ func verifierConfig() []string {

return strings.Split(verifierFlagsStr, ",")
}

func cacheConfig() string {
logger := logger.NewNamedLogger("config")

cacheDirPath := os.Getenv("CACHE_DIR_PATH")
if cacheDirPath == "" {
cacheDirPath = constants.CacheDirPath
logger.Warnf("CACHE_DIR_PATH is not set, using default value %s", constants.CacheDirPath)
}

return cacheDirPath
}
17 changes: 17 additions & 0 deletions internal/config/worker_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ func TestVerifierConfig_DefaultsAndCustom(t *testing.T) {
}
}

func TestCacheConfig_DefaultsAndCustom(t *testing.T) {
config := NewConfig()
if config.CacheDirPath != constants.CacheDirPath {
t.Fatalf("expected default cache dir path %q, got %q", constants.CacheDirPath, config.CacheDirPath)
}

t.Setenv("CACHE_DIR_PATH", "/custom/cache/path")
config2 := NewConfig()
if config2.CacheDirPath != "/custom/cache/path" {
t.Fatalf("expected cache dir path %q, got %q", "/custom/cache/path", config2.CacheDirPath)
}
}

func TestNewConfig_PicksUpValues(t *testing.T) {
// set a variety of envs and ensure NewConfig reads them
t.Setenv("RABBITMQ_HOST", "xhost")
Expand All @@ -104,6 +117,7 @@ func TestNewConfig_PicksUpValues(t *testing.T) {
t.Setenv("MAX_WORKERS", "5")
t.Setenv("JOBS_DATA_VOLUME", "vol-1")
t.Setenv("VERIFIER_FLAGS", "-a,-b")
t.Setenv("CACHE_DIR_PATH", "/test/cache")

cfg := NewConfig()
if cfg.RabbitMQURL == "" {
Expand All @@ -122,6 +136,9 @@ func TestNewConfig_PicksUpValues(t *testing.T) {
if len(cfg.VerifierFlags) != 2 || cfg.VerifierFlags[0] != "-a" || cfg.VerifierFlags[1] != "-b" {
t.Fatalf("unexpected VerifierFlags: %v", cfg.VerifierFlags)
}
if cfg.CacheDirPath != "/test/cache" {
t.Fatalf("unexpected CacheDirPath: %s", cfg.CacheDirPath)
}
// ensure publish chan size parsed
if cfg.PublishChanSize != 4 {
t.Fatalf("unexpected PublishChanSize: %d", cfg.PublishChanSize)
Expand Down
70 changes: 63 additions & 7 deletions internal/stages/packager/packager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type Packager interface {
}

type packager struct {
logger *zap.SugaredLogger
storage storage.Storage
logger *zap.SugaredLogger
storage storage.Storage
fileCache storage.FileCache
}

type TaskDirConfig struct {
Expand All @@ -51,11 +52,12 @@ type TaskDirConfig struct {
CompileErrFilePath string
}

func NewPackager(storage storage.Storage) Packager {
func NewPackager(storageService storage.Storage, fileCache storage.FileCache) Packager {
logger := logger.NewNamedLogger("packager")
return &packager{
logger: logger,
storage: storage,
logger: logger,
storage: storageService,
fileCache: fileCache,
}
}

Expand Down Expand Up @@ -177,7 +179,7 @@ func (p *packager) prepareTestCaseFiles(basePath string, idx int, tc messages.Te
p.logger.Warnf("Test case %d input location is empty, skipping", idx)
} else {
inputDest := filepath.Join(basePath, constants.InputDirName, filepath.Base(tc.InputFile.Path))
if _, err := p.storage.DownloadFile(tc.InputFile, inputDest); err != nil {
if err := p.downloadOrCopyFromCache(tc.InputFile, inputDest); err != nil {
return err
}
}
Expand All @@ -187,7 +189,7 @@ func (p *packager) prepareTestCaseFiles(basePath string, idx int, tc messages.Te
p.logger.Warnf("Test case %d expected output location is empty, skipping", idx)
} else {
outputDest := filepath.Join(basePath, constants.OutputDirName, filepath.Base(tc.ExpectedOutput.Path))
if _, err := p.storage.DownloadFile(tc.ExpectedOutput, outputDest); err != nil {
if err := p.downloadOrCopyFromCache(tc.ExpectedOutput, outputDest); err != nil {
return err
}
}
Expand Down Expand Up @@ -286,11 +288,65 @@ func (p *packager) SendSolutionPackage(
return nil
}

func (p *packager) downloadOrCopyFromCache(
fileLocation messages.FileLocation,
destPath string,
) error {
// Try to get from cache
if p.fileCache == nil {
return p.downloadAndCache(fileLocation, destPath)
}

cachedPath, isCached, err := p.fileCache.GetCachedFile(fileLocation)
if err != nil {
p.logger.Warnf("Error checking cache for %s: %v", fileLocation.Path, err)
return p.downloadAndCache(fileLocation, destPath)
}

if !isCached {
return p.downloadAndCache(fileLocation, destPath)
}

// Copy from cache to destination
if err := utils.CopyFile(cachedPath, destPath); err != nil {
p.logger.Warnf("Failed to copy from cache, will download: %v", err)
return p.downloadAndCache(fileLocation, destPath)
}

p.logger.Debugf("Used cached file for %s", fileLocation.Path)
return nil
}

func (p *packager) downloadAndCache(
fileLocation messages.FileLocation,
destPath string,
) error {
// Download the file
if _, err := p.storage.DownloadFile(fileLocation, destPath); err != nil {
return err
}

// Cache the downloaded file
if p.fileCache != nil {
if err := p.fileCache.CacheFile(fileLocation, destPath); err != nil {
p.logger.Warnf("Failed to cache file %s: %v", fileLocation.Path, err)
}
}

return nil
}

func (p *packager) uploadNonEmptyFile(filePath string, outputFileLocation messages.FileLocation) error {
if fi, err := os.Stat(filePath); err == nil {
if fi.Size() == 0 {
return nil
}
} else {
if os.IsNotExist(err) {
// Missing file is treated as empty; nothing to upload.
return nil
}
return err
}

objPath := outputFileLocation.Path
Expand Down
Loading