-
Notifications
You must be signed in to change notification settings - Fork 109
docker: add blob-level mirror fallback in dockerImageSource #845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
QiWang19
wants to merge
1
commit into
containers:main
Choose a base branch
from
QiWang19:fallback-mirror
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+190
−2
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,15 +8,20 @@ import ( | |
| "fmt" | ||
| "io" | ||
| "math" | ||
| "math/rand/v2" | ||
| "mime" | ||
| "mime/multipart" | ||
| "net" | ||
| "net/http" | ||
| "net/url" | ||
| "os" | ||
| "os/exec" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/docker/distribution/registry/api/errcode" | ||
| v2 "github.com/docker/distribution/registry/api/v2" | ||
| digest "github.com/opencontainers/go-digest" | ||
| "github.com/sirupsen/logrus" | ||
| "go.podman.io/image/v5/docker/reference" | ||
|
|
@@ -48,6 +53,19 @@ type dockerImageSource struct { | |
| // State | ||
| cachedManifest []byte // nil if not loaded yet | ||
| cachedManifestMIMEType string // Only valid if cachedManifest != nil | ||
|
|
||
| // Mirror fallback: when blob fetch fails with a retryable error, try | ||
| // remaining mirrors before giving up. Protected by mirrorMu. | ||
| mirrorMu sync.Mutex | ||
| mirrorOverride *mirrorSource // If non-nil, this is the mirror and physicalRef for the override client | ||
| prevOverrides []*dockerClient // old overrides not yet closed | ||
| remainingSources []sysregistriesv2.PullSource | ||
| fallbackSys *types.SystemContext | ||
| } | ||
|
|
||
| type mirrorSource struct { | ||
| client *dockerClient | ||
| ref dockerReference | ||
| } | ||
|
|
||
| // newImageSource creates a new ImageSource for the specified image reference. | ||
|
|
@@ -91,14 +109,18 @@ func newImageSource(ctx context.Context, sys *types.SystemContext, ref dockerRef | |
| err error | ||
| } | ||
| attempts := []attempt{} | ||
| for _, pullSource := range pullSources { | ||
| for i, pullSource := range pullSources { | ||
| if sys != nil && sys.DockerLogMirrorChoice { | ||
| logrus.Infof("Trying to access %q", pullSource.Reference) | ||
| } else { | ||
| logrus.Debugf("Trying to access %q", pullSource.Reference) | ||
| } | ||
| s, err := newImageSourceAttempt(ctx, sys, ref, pullSource, registryConfig) | ||
| if err == nil { | ||
| if i+1 < len(pullSources) { | ||
| s.remainingSources = pullSources[i+1:] | ||
| s.fallbackSys = sys | ||
| } | ||
| return s, nil | ||
| } | ||
| logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err) | ||
|
|
@@ -205,6 +227,18 @@ func (s *dockerImageSource) Reference() types.ImageReference { | |
|
|
||
| // Close removes resources associated with an initialized ImageSource, if any. | ||
| func (s *dockerImageSource) Close() error { | ||
| s.mirrorMu.Lock() | ||
| prev := s.prevOverrides | ||
| override := s.mirrorOverride | ||
| s.prevOverrides = nil | ||
| s.mirrorOverride = nil | ||
| s.mirrorMu.Unlock() | ||
| for _, m := range prev { | ||
| m.Close() | ||
| } | ||
| if override != nil { | ||
| override.client.Close() | ||
| } | ||
| return s.c.Close() | ||
| } | ||
|
|
||
|
|
@@ -455,7 +489,161 @@ func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, | |
| // The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided. | ||
| // May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location. | ||
| func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { | ||
| return s.c.getBlob(ctx, s.physicalRef, info, cache) | ||
| return s.getBlob(ctx, info, cache) | ||
| } | ||
|
|
||
| func (s *dockerImageSource) getBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { | ||
| client, physRef, hasRemaining := s.getActiveSource() | ||
| reader, size, err := tryGetBlob(ctx, client, physRef, info, cache) | ||
| if err == nil || !hasRemaining { | ||
| return reader, size, err | ||
| } | ||
| if isMirrorTransientError(err) || isMirrorFallbackError(err) { | ||
| logrus.Debugf("Blob %s fetch from %q failed (%v), trying fallback sources", info.Digest, physRef.ref, err) | ||
| return s.getBlobWithMirrorFallback(ctx, info, cache, err, client) | ||
| } | ||
| return reader, size, err | ||
| } | ||
|
|
||
| func (s *dockerImageSource) getActiveSource() (*dockerClient, dockerReference, bool) { | ||
| s.mirrorMu.Lock() | ||
| defer s.mirrorMu.Unlock() | ||
| hasRemaining := len(s.remainingSources) > 0 | ||
| if s.mirrorOverride != nil { | ||
| return s.mirrorOverride.client, s.mirrorOverride.ref, hasRemaining | ||
| } | ||
| return s.c, s.physicalRef, hasRemaining | ||
| } | ||
|
|
||
| func tryGetBlob(ctx context.Context, client *dockerClient, physRef dockerReference, | ||
| info types.BlobInfo, cache types.BlobInfoCache, | ||
| ) (io.ReadCloser, int64, error) { | ||
| reader, size, err := client.getBlob(ctx, physRef, info, cache) | ||
| if err != nil && isMirrorTransientError(err) { | ||
| logrus.Debugf("Transient error fetching blob %s from %q, retrying: %v", info.Digest, physRef.ref, err) | ||
| delay := time.Second + rand.N(time.Second/10) // 1s + 10% jitter | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, 0, fmt.Errorf("%w (while retrying after: %v)", ctx.Err(), err) | ||
| case <-time.After(delay): | ||
| } | ||
| reader, size, err = client.getBlob(ctx, physRef, info, cache) | ||
| } | ||
| return reader, size, err | ||
| } | ||
|
|
||
| // isMirrorFallbackError returns true for errors where the blob is not present | ||
| // on this mirror but may be on another — warranting a fallback attempt. | ||
| // All tested registries (docker.io, registry.redhat.io, quay.io, registry.access.redhat.com) | ||
| // return BLOB_UNKNOWN for blob 404s, matching the OCI distribution spec. | ||
| func isMirrorFallbackError(err error) bool { | ||
| // Blob endpoint returns HTTP 404 with errcode JSON body | ||
| // {"errors":[{"code":"BLOB_UNKNOWN",...}]}. | ||
| // registryHTTPResponseToError → handleErrorResponse → parseHTTPErrorResponse | ||
| // returns errcode.Errors containing errcode.Error{Code: v2.ErrorCodeBlobUnknown}. | ||
| var ec errcode.ErrorCoder | ||
| if errors.As(err, &ec) && ec.ErrorCode() == v2.ErrorCodeBlobUnknown { | ||
| return true | ||
| } | ||
|
|
||
| // UnexpectedHTTPStatusError (capital U) is NOT matched here — for regular blobs, | ||
| // handleErrorResponse never produces it for 4xx (only for 5xx). For external blobs, | ||
| // getExternalBlob() produces it via newUnexpectedHTTPStatusError() directly for any | ||
| // non-200 including 404, but external URLs are fixed and mirror-independent — | ||
| // a different registry mirror cannot serve them. | ||
| return false | ||
| } | ||
|
|
||
| // isMirrorTransientError returns true for errors that are transient — the mirror | ||
| // probably has the blob but temporarily cannot serve it. | ||
| func isMirrorTransientError(err error) bool { | ||
| // HTTP 5xx: handleErrorResponse returns UnexpectedHTTPStatusError for status | ||
| // codes outside 400–499. Server-side error, another mirror may succeed. | ||
| var httpErr UnexpectedHTTPStatusError | ||
| if errors.As(err, &httpErr) && httpErr.StatusCode >= 500 { | ||
| return true | ||
| } | ||
|
|
||
| // Network timeout: makeRequest returns net.Error with Timeout() == true. | ||
| // The mirror is reachable but slow — worth retrying on another. | ||
| var netErr net.Error | ||
| return errors.As(err, &netErr) && netErr.Timeout() | ||
| } | ||
|
|
||
| func (s *dockerImageSource) getBlobWithMirrorFallback(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, originalErr error, failedClient *dockerClient) (io.ReadCloser, int64, error) { | ||
| // Held for the full fallback loop, including network I/O. This serializes | ||
| // concurrent blob fetchers during fallback — only one goroutine probes | ||
| // sources while the rest block on getActiveSource(). Acceptable trade-off: | ||
| // fallback is rare and serialization prevents thundering-herd probing. | ||
| s.mirrorMu.Lock() | ||
| defer s.mirrorMu.Unlock() | ||
|
|
||
| // If another goroutine already switched to a working mirror, try it first. | ||
| if s.mirrorOverride != nil && s.mirrorOverride.client != failedClient { | ||
| client, physRef := s.mirrorOverride.client, s.mirrorOverride.ref | ||
| reader, size, err := tryGetBlob(ctx, client, physRef, info, cache) | ||
| if err == nil { | ||
| return reader, size, nil | ||
| } | ||
| // Override failed — retire it so subsequent goroutines don't retry it. | ||
| s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client) | ||
| s.mirrorOverride = nil | ||
| } | ||
|
|
||
| registryConf, regErr := loadRegistryConfiguration(s.fallbackSys) | ||
| if regErr != nil { | ||
| logrus.Debugf("Mirror fallback: failed to load registry config: %v", regErr) | ||
| return nil, 0, originalErr | ||
| } | ||
|
|
||
| type attempt struct { | ||
| ref reference.Named | ||
| err error | ||
| } | ||
| attempts := []attempt{} | ||
| for len(s.remainingSources) > 0 { | ||
| pullSource := s.remainingSources[0] | ||
| s.remainingSources = s.remainingSources[1:] | ||
|
|
||
| logrus.Debugf("Trying to access %q", pullSource.Reference) | ||
|
|
||
| // newImageSourceAttempt calls ensureManifestIsLoaded(), which preserves | ||
| // the manifest-level filtering: only mirrors that can serve the manifest | ||
|
Comment on lines
+610
to
+611
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not?! If we got go the |
||
| // are accepted. | ||
| fallback, fallbackErr := newImageSourceAttempt(ctx, s.fallbackSys, s.logicalRef, pullSource, registryConf) | ||
| if fallbackErr != nil { | ||
| logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, fallbackErr) | ||
| attempts = append(attempts, attempt{ref: pullSource.Reference, err: fallbackErr}) | ||
| continue | ||
| } | ||
|
|
||
| reader, size, err := tryGetBlob(ctx, fallback.c, fallback.physicalRef, info, cache) | ||
| if err != nil { | ||
| fallback.c.Close() | ||
| logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err) | ||
| attempts = append(attempts, attempt{ref: pullSource.Reference, err: err}) | ||
| if !isMirrorTransientError(err) && !isMirrorFallbackError(err) { | ||
| break | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| if s.mirrorOverride != nil { | ||
| s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client) | ||
| } | ||
| s.mirrorOverride = &mirrorSource{client: fallback.c, ref: fallback.physicalRef} | ||
| logrus.Debugf("Blob fetch succeeded from fallback source %q, switching to it for future requests", pullSource.Reference) | ||
|
|
||
| return reader, size, nil | ||
| } | ||
| if len(attempts) > 0 { | ||
| extras := []string{} | ||
| for _, a := range attempts { | ||
| extras = append(extras, fmt.Sprintf("[%s: %v]", a.ref.String(), a.err)) | ||
| } | ||
| logrus.Debugf("(Fallback sources also failed: %s): %v", strings.Join(extras, "\n"), originalErr) | ||
| } | ||
| return nil, 0, originalErr | ||
| } | ||
|
|
||
| // GetSignaturesWithFormat returns the image's signatures. It may use a remote (= slow) service. | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other caller than
GetBlobitself?