Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 190 additions & 2 deletions image/docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ import (
"fmt"
"io"
"math"
"math/rand/v2"
"mime"
"mime/multipart"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"strings"
"sync"
"time"

"github.com/docker/distribution/registry/api/errcode"
v2 "github.com/docker/distribution/registry/api/v2"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"go.podman.io/image/v5/docker/reference"
Expand Down Expand Up @@ -48,6 +53,19 @@ type dockerImageSource struct {
// State
cachedManifest []byte // nil if not loaded yet
cachedManifestMIMEType string // Only valid if cachedManifest != nil

// Mirror fallback: when blob fetch fails with a retryable error, try
// remaining mirrors before giving up. Protected by mirrorMu.
mirrorMu sync.Mutex
mirrorOverride *mirrorSource // If non-nil, this is the mirror and physicalRef for the override client
prevOverrides []*dockerClient // old overrides not yet closed
remainingSources []sysregistriesv2.PullSource
fallbackSys *types.SystemContext
}

type mirrorSource struct {
client *dockerClient
ref dockerReference
}

// newImageSource creates a new ImageSource for the specified image reference.
Expand Down Expand Up @@ -91,14 +109,18 @@ func newImageSource(ctx context.Context, sys *types.SystemContext, ref dockerRef
err error
}
attempts := []attempt{}
for _, pullSource := range pullSources {
for i, pullSource := range pullSources {
if sys != nil && sys.DockerLogMirrorChoice {
logrus.Infof("Trying to access %q", pullSource.Reference)
} else {
logrus.Debugf("Trying to access %q", pullSource.Reference)
}
s, err := newImageSourceAttempt(ctx, sys, ref, pullSource, registryConfig)
if err == nil {
if i+1 < len(pullSources) {
s.remainingSources = pullSources[i+1:]
s.fallbackSys = sys
}
return s, nil
}
logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err)
Expand Down Expand Up @@ -205,6 +227,18 @@ func (s *dockerImageSource) Reference() types.ImageReference {

// Close removes resources associated with an initialized ImageSource, if any.
func (s *dockerImageSource) Close() error {
s.mirrorMu.Lock()
prev := s.prevOverrides
override := s.mirrorOverride
s.prevOverrides = nil
s.mirrorOverride = nil
s.mirrorMu.Unlock()
for _, m := range prev {
m.Close()
}
if override != nil {
override.client.Close()
}
return s.c.Close()
}

Expand Down Expand Up @@ -455,7 +489,161 @@ func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo,
// The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided.
// May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location.
func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) {
return s.c.getBlob(ctx, s.physicalRef, info, cache)
return s.getBlob(ctx, info, cache)
}

func (s *dockerImageSource) getBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other caller than GetBlob itself?

client, physRef, hasRemaining := s.getActiveSource()
reader, size, err := tryGetBlob(ctx, client, physRef, info, cache)
if err == nil || !hasRemaining {
return reader, size, err
}
if isMirrorTransientError(err) || isMirrorFallbackError(err) {
logrus.Debugf("Blob %s fetch from %q failed (%v), trying fallback sources", info.Digest, physRef.ref, err)
return s.getBlobWithMirrorFallback(ctx, info, cache, err, client)
}
return reader, size, err
}

func (s *dockerImageSource) getActiveSource() (*dockerClient, dockerReference, bool) {
s.mirrorMu.Lock()
defer s.mirrorMu.Unlock()
hasRemaining := len(s.remainingSources) > 0
if s.mirrorOverride != nil {
return s.mirrorOverride.client, s.mirrorOverride.ref, hasRemaining
}
return s.c, s.physicalRef, hasRemaining
}

func tryGetBlob(ctx context.Context, client *dockerClient, physRef dockerReference,
info types.BlobInfo, cache types.BlobInfoCache,
) (io.ReadCloser, int64, error) {
reader, size, err := client.getBlob(ctx, physRef, info, cache)
if err != nil && isMirrorTransientError(err) {
logrus.Debugf("Transient error fetching blob %s from %q, retrying: %v", info.Digest, physRef.ref, err)
delay := time.Second + rand.N(time.Second/10) // 1s + 10% jitter
select {
case <-ctx.Done():
return nil, 0, fmt.Errorf("%w (while retrying after: %v)", ctx.Err(), err)
case <-time.After(delay):
}
reader, size, err = client.getBlob(ctx, physRef, info, cache)
}
return reader, size, err
}

// isMirrorFallbackError returns true for errors where the blob is not present
// on this mirror but may be on another — warranting a fallback attempt.
// All tested registries (docker.io, registry.redhat.io, quay.io, registry.access.redhat.com)
// return BLOB_UNKNOWN for blob 404s, matching the OCI distribution spec.
func isMirrorFallbackError(err error) bool {
// Blob endpoint returns HTTP 404 with errcode JSON body
// {"errors":[{"code":"BLOB_UNKNOWN",...}]}.
// registryHTTPResponseToError → handleErrorResponse → parseHTTPErrorResponse
// returns errcode.Errors containing errcode.Error{Code: v2.ErrorCodeBlobUnknown}.
var ec errcode.ErrorCoder
if errors.As(err, &ec) && ec.ErrorCode() == v2.ErrorCodeBlobUnknown {
return true
}

// UnexpectedHTTPStatusError (capital U) is NOT matched here — for regular blobs,
// handleErrorResponse never produces it for 4xx (only for 5xx). For external blobs,
// getExternalBlob() produces it via newUnexpectedHTTPStatusError() directly for any
// non-200 including 404, but external URLs are fixed and mirror-independent —
// a different registry mirror cannot serve them.
return false
}

// isMirrorTransientError returns true for errors that are transient — the mirror
// probably has the blob but temporarily cannot serve it.
func isMirrorTransientError(err error) bool {
// HTTP 5xx: handleErrorResponse returns UnexpectedHTTPStatusError for status
// codes outside 400–499. Server-side error, another mirror may succeed.
var httpErr UnexpectedHTTPStatusError
if errors.As(err, &httpErr) && httpErr.StatusCode >= 500 {
return true
}

// Network timeout: makeRequest returns net.Error with Timeout() == true.
// The mirror is reachable but slow — worth retrying on another.
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}

func (s *dockerImageSource) getBlobWithMirrorFallback(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, originalErr error, failedClient *dockerClient) (io.ReadCloser, int64, error) {
// Held for the full fallback loop, including network I/O. This serializes
// concurrent blob fetchers during fallback — only one goroutine probes
// sources while the rest block on getActiveSource(). Acceptable trade-off:
// fallback is rare and serialization prevents thundering-herd probing.
s.mirrorMu.Lock()
defer s.mirrorMu.Unlock()

// If another goroutine already switched to a working mirror, try it first.
if s.mirrorOverride != nil && s.mirrorOverride.client != failedClient {
client, physRef := s.mirrorOverride.client, s.mirrorOverride.ref
reader, size, err := tryGetBlob(ctx, client, physRef, info, cache)
if err == nil {
return reader, size, nil
}
// Override failed — retire it so subsequent goroutines don't retry it.
s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client)
s.mirrorOverride = nil
}

registryConf, regErr := loadRegistryConfiguration(s.fallbackSys)
if regErr != nil {
logrus.Debugf("Mirror fallback: failed to load registry config: %v", regErr)
return nil, 0, originalErr
}

type attempt struct {
ref reference.Named
err error
}
attempts := []attempt{}
for len(s.remainingSources) > 0 {
pullSource := s.remainingSources[0]
s.remainingSources = s.remainingSources[1:]

logrus.Debugf("Trying to access %q", pullSource.Reference)

// newImageSourceAttempt calls ensureManifestIsLoaded(), which preserves
// the manifest-level filtering: only mirrors that can serve the manifest
Comment on lines +610 to +611
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not?! If we got go the getBlob stage, cachedManifest is already set and ensure… does nothing.

// are accepted.
fallback, fallbackErr := newImageSourceAttempt(ctx, s.fallbackSys, s.logicalRef, pullSource, registryConf)
if fallbackErr != nil {
logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, fallbackErr)
attempts = append(attempts, attempt{ref: pullSource.Reference, err: fallbackErr})
continue
}

reader, size, err := tryGetBlob(ctx, fallback.c, fallback.physicalRef, info, cache)
if err != nil {
fallback.c.Close()
logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err)
attempts = append(attempts, attempt{ref: pullSource.Reference, err: err})
if !isMirrorTransientError(err) && !isMirrorFallbackError(err) {
break
}
continue
}

if s.mirrorOverride != nil {
s.prevOverrides = append(s.prevOverrides, s.mirrorOverride.client)
}
s.mirrorOverride = &mirrorSource{client: fallback.c, ref: fallback.physicalRef}
logrus.Debugf("Blob fetch succeeded from fallback source %q, switching to it for future requests", pullSource.Reference)

return reader, size, nil
}
if len(attempts) > 0 {
extras := []string{}
for _, a := range attempts {
extras = append(extras, fmt.Sprintf("[%s: %v]", a.ref.String(), a.err))
}
logrus.Debugf("(Fallback sources also failed: %s): %v", strings.Join(extras, "\n"), originalErr)
}
return nil, 0, originalErr
}

// GetSignaturesWithFormat returns the image's signatures. It may use a remote (= slow) service.
Expand Down
Loading