Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ec2 v1.144.0
github.com/aws/aws-sdk-go-v2/service/ecr v1.24.4
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.0
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501023620-447539c5d94e
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501193849-3099897f000a
github.com/beam-cloud/clip v0.0.0-20250424185136-5f40b560b510
github.com/beam-cloud/go-runc v0.0.0-20250226192420-34dad0fdc737
github.com/beam-cloud/redislock v0.0.0-20250201162619-1b534b3be324
Expand Down Expand Up @@ -339,7 +339,7 @@ require (
tags.cncf.io/container-device-interface/specs-go v0.8.0 // indirect
)

replace github.com/yandex-cloud/geesefs => github.com/beam-cloud/geesefs v0.0.0-20250423015758-3404807b4804
replace github.com/yandex-cloud/geesefs => github.com/beam-cloud/geesefs v0.0.0-20250425203001-229587fa117e

replace github.com/aws/aws-sdk-go => github.com/beam-cloud/geesefs/s3ext v0.0.0-20250423015758-3404807b4804

Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.1/go.mod h1:jiNR3JqT15Dm+QWq2SRgh
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501023620-447539c5d94e h1:rX+jkn/Ln5qL+erKOYDPeFzU9/7rZQKaUpZAJg0/jf0=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501023620-447539c5d94e/go.mod h1:RrA2ruMma4/dN9Sa6wwhyAO1uI6di+tlLB5wuM7TuvQ=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250425202639-4806ac69ec31 h1:cx6r917ppQ0x4eA4cucWtgQxAORkD4kHU0YdaDk/HXg=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250425202639-4806ac69ec31/go.mod h1:RrA2ruMma4/dN9Sa6wwhyAO1uI6di+tlLB5wuM7TuvQ=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501193849-3099897f000a h1:ppC20cEfiX4zdl9oZjLZbyHfSn2h910EYWdKlOou3KM=
github.com/beam-cloud/blobcache-v2 v0.0.0-20250501193849-3099897f000a/go.mod h1:RrA2ruMma4/dN9Sa6wwhyAO1uI6di+tlLB5wuM7TuvQ=
github.com/beam-cloud/clip v0.0.0-20250424185136-5f40b560b510 h1:CoYog6ZQ81/Kvs1n41RTCj2LhnJuIjWyPi2Jw+qkwKM=
github.com/beam-cloud/clip v0.0.0-20250424185136-5f40b560b510/go.mod h1:GtCHH6ik2SkIFo+GdN/l0+kAzxhLTUjSboIB+7Br6dk=
github.com/beam-cloud/geesefs v0.0.0-20250423015758-3404807b4804 h1:o9dbE3cgDGGx65jEdp6VV0TR3vMWnuLxZyrjTrDDh+w=
github.com/beam-cloud/geesefs v0.0.0-20250423015758-3404807b4804/go.mod h1:utihEuMyzBOeZ6oU2ozzZkJmyzbYBuYrxsLUo1DfZXs=
github.com/beam-cloud/geesefs v0.0.0-20250425203001-229587fa117e h1:a4yeRoJIInKGvFP2FpIjscrCg7NLB1ISCcgSrAgjuSY=
github.com/beam-cloud/geesefs v0.0.0-20250425203001-229587fa117e/go.mod h1:D+wQKZWm78Z4DhrUmOxpoHLTGBQrwNAy7jZtT25KB3Q=
github.com/beam-cloud/geesefs/s3ext v0.0.0-20250423015758-3404807b4804 h1:36wEQvhNswD//3ugmDcMUwslSvUtdFD3d3tRgNFJXl0=
github.com/beam-cloud/geesefs/s3ext v0.0.0-20250423015758-3404807b4804/go.mod h1:YT41ScwaZw9hYfM0WbYZ64sQLNhPxWZFOXJOPug7O5M=
github.com/beam-cloud/go-runc v0.0.0-20250226192420-34dad0fdc737 h1:9dvNZag3gegzIl5i8W861lsnhnwz4joHr40K1HPcpmw=
Expand Down
15 changes: 6 additions & 9 deletions pkg/worker/criu_cedana.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"buf.build/gen/go/cedana/criu/protocolbuffers/go/criu"
common "github.com/beam-cloud/beta9/pkg/common"
types "github.com/beam-cloud/beta9/pkg/types"
blobcache "github.com/beam-cloud/blobcache-v2/pkg"
"github.com/beam-cloud/go-runc"
cedana "github.com/cedana/cedana/pkg/client"
"github.com/cedana/cedana/pkg/config"
Expand Down Expand Up @@ -249,16 +250,12 @@ func (c *CedanaCRIUManager) CacheCheckpoint(containerId, checkpointPath string)

// Remove the leading "/" from the checkpoint path
sourcePath := checkpointPath[1:]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think this // Remove the leading "/" from the checkpoint path is necessary anymore. But maybe leave it for now.

_, err := client.StoreContentFromFUSE(struct {
Path string
}{
_, err := client.StoreContentFromFUSE(blobcache.ContentSourceFUSE{
Path: sourcePath,
}, struct {
RoutingKey string
Lock bool
}{
RoutingKey: sourcePath,
Lock: true,
}, blobcache.StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: sourcePath,
Lock: true,
})
if err != nil {
return "", err
Expand Down
15 changes: 6 additions & 9 deletions pkg/worker/criu_nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"

types "github.com/beam-cloud/beta9/pkg/types"
blobcache "github.com/beam-cloud/blobcache-v2/pkg"
"github.com/beam-cloud/go-runc"
"github.com/hashicorp/go-multierror"
"github.com/panjf2000/ants/v2"
Expand Down Expand Up @@ -170,16 +171,12 @@ func (c *NvidiaCRIUManager) cacheDir(containerId, checkpointPath string) error {
sourcePath := path[1:]

defer wg.Done()
_, err := client.StoreContentFromFUSE(struct {
Path string
}{
_, err := client.StoreContentFromFUSE(blobcache.ContentSourceFUSE{
Path: sourcePath,
}, struct {
RoutingKey string
Lock bool
}{
RoutingKey: sourcePath,
Lock: true,
}, blobcache.StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: sourcePath,
Lock: true,
})
if err != nil {
storeContentErrMu.Lock()
Expand Down
27 changes: 10 additions & 17 deletions pkg/worker/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ func (cm *FileCacheManager) CacheFilesInPath(sourcePath string) {
}

if !info.IsDir() {
_, err := cm.client.StoreContentFromFUSE(struct {
Path string
}{
_, err := cm.client.StoreContentFromFUSE(blobcache.ContentSourceFUSE{
Path: path,
}, struct {
RoutingKey string
Lock bool
}{
RoutingKey: path,
}, blobcache.StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: path,
Lock: true,
})
if err != nil {
log.Error().Str("path", path).Err(err).Msg("failed to cache file")
Expand Down Expand Up @@ -120,16 +117,12 @@ func (cm *FileCacheManager) initWorkspace(workspaceName string) (string, error)
return workspaceVolumePath, nil
}

_, err = cm.client.StoreContentFromFUSE(struct {
Path string
}{
_, err = cm.client.StoreContentFromFUSE(blobcache.ContentSourceFUSE{
Path: fileName,
}, struct {
RoutingKey string
Lock bool
}{
RoutingKey: fileName,
Lock: true,
}, blobcache.StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: fileName,
Lock: true,
})
if err != nil {
return "", err
Expand Down
27 changes: 15 additions & 12 deletions pkg/worker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequ
startTime := time.Now()

if c.cacheClient != nil && !isBuildContainer {
sourcePath := fmt.Sprintf("/images/%s.clip", imageId)
imageKey := fmt.Sprintf("%s.clip", imageId)
sourcePath := filepath.Join("/", c.config.ImageService.Registries.S3.Primary.Region, c.config.ImageService.Registries.S3.Primary.BucketName, imageKey)

// Create constant backoff
b := backoff.NewConstantBackOff(300 * time.Millisecond)
imageLocked := false

operation := func() error {
baseBlobFsContentPath := fmt.Sprintf("%s/%s", baseFileCachePath, sourcePath)
baseBlobFsContentPath := filepath.Join(baseFileCachePath, sourcePath)
if _, err := os.Stat(baseBlobFsContentPath); err == nil && c.cacheClient.IsPathCachedNearby(ctx, sourcePath) {
localCachePath = baseBlobFsContentPath
return nil
Expand All @@ -205,16 +206,18 @@ func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequ
return errors.New("image locked")
}

_, err := c.cacheClient.StoreContentFromFUSE(struct {
Path string
}{
Path: sourcePath,
}, struct {
RoutingKey string
Lock bool
}{
RoutingKey: sourcePath,
Lock: true,
_, err := c.cacheClient.StoreContentFromS3(blobcache.ContentSourceS3{
Path: imageKey,
BucketName: c.config.ImageService.Registries.S3.Primary.BucketName,
Region: c.config.ImageService.Registries.S3.Primary.Region,
EndpointURL: c.config.ImageService.Registries.S3.Primary.Endpoint,
AccessKey: c.config.ImageService.Registries.S3.Primary.AccessKey,
SecretKey: c.config.ImageService.Registries.S3.Primary.SecretKey,
ForcePathStyle: c.config.ImageService.Registries.S3.Primary.ForcePathStyle,
}, blobcache.StoreContentOptions{
CreateCacheFSEntry: true,
RoutingKey: sourcePath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sourcePath construction is changed but still used as the routing key, which might cause inconsistency with previously cached items

Lock: true,
})
if err != nil {
if err == blobcache.ErrUnableToAcquireLock {
Expand Down
Loading