-
Notifications
You must be signed in to change notification settings - Fork 26
feat: add dynamic timeout retry with file-size-based parameters #468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4e65858
92c9ca3
cf1bcf2
be391fd
2fcdba2
4cdd8ae
5047acc
fcdbcb0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,7 +19,10 @@ package backend | |||||||||||||||||||
| import ( | ||||||||||||||||||||
| "context" | ||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||
| "errors" | ||||||||||||||||||||
| "fmt" | ||||||||||||||||||||
| "sync" | ||||||||||||||||||||
| "time" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| "github.com/bmatcuk/doublestar/v4" | ||||||||||||||||||||
| legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1" | ||||||||||||||||||||
|
|
@@ -31,6 +34,7 @@ import ( | |||||||||||||||||||
| internalpb "github.com/modelpack/modctl/internal/pb" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/backend/remote" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/config" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/retrypolicy" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Fetch fetches partial files to the output. | ||||||||||||||||||||
|
|
@@ -101,9 +105,12 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e | |||||||||||||||||||
| pb.Start() | ||||||||||||||||||||
| defer pb.Stop() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| g, ctx := errgroup.WithContext(ctx) | ||||||||||||||||||||
| g := new(errgroup.Group) | ||||||||||||||||||||
| g.SetLimit(cfg.Concurrency) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| var mu sync.Mutex | ||||||||||||||||||||
| var errs []error | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logrus.Infof("fetch: fetching %d matched layers", len(layers)) | ||||||||||||||||||||
| for _, layer := range layers { | ||||||||||||||||||||
| g.Go(func() error { | ||||||||||||||||||||
|
|
@@ -113,17 +120,45 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e | |||||||||||||||||||
| default: | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logrus.Debugf("fetch: processing layer %s", layer.Digest) | ||||||||||||||||||||
| if err := pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer); err != nil { | ||||||||||||||||||||
| return err | ||||||||||||||||||||
| var annoFilepath string | ||||||||||||||||||||
| if layer.Annotations != nil { | ||||||||||||||||||||
| if layer.Annotations[modelspec.AnnotationFilepath] != "" { | ||||||||||||||||||||
| annoFilepath = layer.Annotations[modelspec.AnnotationFilepath] | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| annoFilepath = layer.Annotations[legacymodelspec.AnnotationFilepath] | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+123
to
130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic to extract the filepath from annotations is duplicated in For example, you could define this function: func getAnnotationFilepath(annotations map[string]string) string {
if annotations == nil {
return ""
}
if path := annotations[modelspec.AnnotationFilepath]; path != "" {
return path
}
return annotations[legacymodelspec.AnnotationFilepath]
}And then call it here.
Suggested change
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| logrus.Debugf("fetch: successfully processed layer %s", layer.Digest) | ||||||||||||||||||||
| logrus.Debugf("fetch: processing layer %s", layer.Digest) | ||||||||||||||||||||
| if err := retrypolicy.Do(ctx, func(rctx context.Context) error { | ||||||||||||||||||||
| return pullAndExtractFromRemote(rctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer) | ||||||||||||||||||||
| }, retrypolicy.DoOpts{ | ||||||||||||||||||||
| FileSize: layer.Size, | ||||||||||||||||||||
| FileName: annoFilepath, | ||||||||||||||||||||
| Config: &cfg.RetryConfig, | ||||||||||||||||||||
| OnRetry: func(attempt uint, reason string, backoff time.Duration) { | ||||||||||||||||||||
| if bar := pb.Get(layer.Digest.String()); bar != nil { | ||||||||||||||||||||
| bar.SetRefill(bar.Current()) | ||||||||||||||||||||
| bar.SetCurrent(0) | ||||||||||||||||||||
| bar.EwmaSetCurrent(0, time.Second) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| }); err != nil { | ||||||||||||||||||||
| mu.Lock() | ||||||||||||||||||||
| errs = append(errs, err) | ||||||||||||||||||||
| mu.Unlock() | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| logrus.Debugf("fetch: successfully processed layer %s", layer.Digest) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return nil | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if err := g.Wait(); err != nil { | ||||||||||||||||||||
| _ = g.Wait() | ||||||||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||||||||
| return fmt.Errorf("fetch cancelled: %w", ctx.Err()) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if err := errors.Join(errs...); err != nil { | ||||||||||||||||||||
| return err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,15 +19,17 @@ package backend | |||||||||||||||||||
| import ( | ||||||||||||||||||||
| "context" | ||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||
| "errors" | ||||||||||||||||||||
| "fmt" | ||||||||||||||||||||
| "io" | ||||||||||||||||||||
| "os" | ||||||||||||||||||||
| "path/filepath" | ||||||||||||||||||||
| "strings" | ||||||||||||||||||||
| "sync" | ||||||||||||||||||||
| "time" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| common "d7y.io/api/v2/pkg/apis/common/v2" | ||||||||||||||||||||
| dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2" | ||||||||||||||||||||
| "github.com/avast/retry-go/v4" | ||||||||||||||||||||
| "github.com/bmatcuk/doublestar/v4" | ||||||||||||||||||||
| legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1" | ||||||||||||||||||||
| modelspec "github.com/modelpack/model-spec/specs-go/v1" | ||||||||||||||||||||
|
|
@@ -41,6 +43,7 @@ import ( | |||||||||||||||||||
| "github.com/modelpack/modctl/pkg/archiver" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/backend/remote" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/config" | ||||||||||||||||||||
| "github.com/modelpack/modctl/pkg/retrypolicy" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // fetchByDragonfly fetches partial files via Dragonfly gRPC service based on pattern matching. | ||||||||||||||||||||
|
|
@@ -124,9 +127,12 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf | |||||||||||||||||||
| defer pb.Stop() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Process layers concurrently. | ||||||||||||||||||||
| g, ctx := errgroup.WithContext(ctx) | ||||||||||||||||||||
| g := new(errgroup.Group) | ||||||||||||||||||||
| g.SetLimit(cfg.Concurrency) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| var mu sync.Mutex | ||||||||||||||||||||
| var errs []error | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logrus.Infof("fetch: fetching %d matched layers via dragonfly", len(layers)) | ||||||||||||||||||||
| for _, layer := range layers { | ||||||||||||||||||||
| g.Go(func() error { | ||||||||||||||||||||
|
|
@@ -138,14 +144,21 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf | |||||||||||||||||||
|
|
||||||||||||||||||||
| logrus.Debugf("fetch: processing layer %s via dragonfly", layer.Digest) | ||||||||||||||||||||
| if err := fetchLayerByDragonfly(ctx, pb, dfdaemon.NewDfdaemonDownloadClient(conn), ref, manifest, layer, authToken, cfg); err != nil { | ||||||||||||||||||||
| return err | ||||||||||||||||||||
| mu.Lock() | ||||||||||||||||||||
| errs = append(errs, err) | ||||||||||||||||||||
| mu.Unlock() | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest) | ||||||||||||||||||||
| return nil | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if err := g.Wait(); err != nil { | ||||||||||||||||||||
| _ = g.Wait() | ||||||||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||||||||
| return fmt.Errorf("fetch cancelled: %w", ctx.Err()) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| if err := errors.Join(errs...); err != nil { | ||||||||||||||||||||
| return err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -155,18 +168,38 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf | |||||||||||||||||||
|
|
||||||||||||||||||||
| // fetchLayerByDragonfly handles downloading and extracting a single layer via Dragonfly. | ||||||||||||||||||||
| func fetchLayerByDragonfly(ctx context.Context, pb *internalpb.ProgressBar, client dfdaemon.DfdaemonDownloadClient, ref Referencer, manifest ocispec.Manifest, desc ocispec.Descriptor, authToken string, cfg *config.Fetch) error { | ||||||||||||||||||||
| err := retry.Do(func() error { | ||||||||||||||||||||
| var annoFilepath string | ||||||||||||||||||||
| if desc.Annotations != nil { | ||||||||||||||||||||
| if desc.Annotations[modelspec.AnnotationFilepath] != "" { | ||||||||||||||||||||
| annoFilepath = desc.Annotations[modelspec.AnnotationFilepath] | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| annoFilepath = desc.Annotations[legacymodelspec.AnnotationFilepath] | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+171
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic to extract the filepath from annotations is duplicated in For example, you could define this function: func getAnnotationFilepath(annotations map[string]string) string {
if annotations == nil {
return ""
}
if path := annotations[modelspec.AnnotationFilepath]; path != "" {
return path
}
return annotations[legacymodelspec.AnnotationFilepath]
}And then call it here.
Suggested change
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| err := retrypolicy.Do(ctx, func(rctx context.Context) error { | ||||||||||||||||||||
| logrus.Debugf("fetch: processing layer %s", desc.Digest) | ||||||||||||||||||||
| cfg.Hooks.BeforePullLayer(desc, manifest) // Call before hook | ||||||||||||||||||||
| err := downloadAndExtractFetchLayer(ctx, pb, client, ref, desc, authToken, cfg) | ||||||||||||||||||||
| err := downloadAndExtractFetchLayer(rctx, pb, client, ref, desc, authToken, cfg) | ||||||||||||||||||||
| cfg.Hooks.AfterPullLayer(desc, err) // Call after hook | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| err = fmt.Errorf("pull: failed to download and extract layer %s: %w", desc.Digest, err) | ||||||||||||||||||||
| logrus.Error(err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return err | ||||||||||||||||||||
| }, append(defaultRetryOpts, retry.Context(ctx))...) | ||||||||||||||||||||
| }, retrypolicy.DoOpts{ | ||||||||||||||||||||
| FileSize: desc.Size, | ||||||||||||||||||||
| FileName: annoFilepath, | ||||||||||||||||||||
| Config: &cfg.RetryConfig, | ||||||||||||||||||||
| OnRetry: func(attempt uint, reason string, backoff time.Duration) { | ||||||||||||||||||||
| if bar := pb.Get(desc.Digest.String()); bar != nil { | ||||||||||||||||||||
| bar.SetRefill(bar.Current()) | ||||||||||||||||||||
| bar.SetCurrent(0) | ||||||||||||||||||||
| bar.EwmaSetCurrent(0, time.Second) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| }) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| err = fmt.Errorf("fetch: failed to download and extract layer %s: %w", desc.Digest, err) | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a data race condition here. The
existing.msgfield is being modified without a lock, while it can be concurrently read by the progress bar's rendering goroutine. This can lead to unpredictable behavior or crashes.To fix this, you should use a write lock to protect both the read from the
p.barsmap and the subsequent write to themsgfield. The lock should be released before callingp.Addto avoid deadlocks, asp.Addacquires its own locks.