Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions pkg/config/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -105,32 +106,72 @@
return ""
}

// Read loads an agent configuration from an OCI artifact.
//
// This method is intentionally defensive:
// - The local content store is treated as a cache, never as a source of truth.
// - Any missing or inconsistent local state (ErrStoreCorrupted) triggers a re-fetch.
// - A forced pull is used as a last resort to self-heal corrupted stores.
func (a ociSource) Read(ctx context.Context) ([]byte, error) {
// Check if we have a local copy (without loading content)
store, err := content.NewStore()
if err != nil {
return nil, fmt.Errorf("failed to create content store: %w", err)
}

_, metaErr := store.GetArtifactMetadata(a.reference)
hasLocal := metaErr == nil
tryLoad := func() ([]byte, error) {
af, err := store.GetArtifact(a.reference)
if err != nil {
return nil, err
}
return []byte(af), nil
}

// 1. Try local first
data, err := tryLoad()
if err == nil {
return data, nil
}

isCorrupted := errors.Is(err, content.ErrStoreCorrupted)

// Try to pull from remote (only pulls if digest changed)
if _, pullErr := remote.Pull(ctx, a.reference, false); pullErr != nil {
if !hasLocal {
// No local copy and can't pull, error out
return nil, fmt.Errorf("failed to pull OCI image %s: %w", a.reference, pullErr)
// 2. Try normal pull (digest check)
if _, pullErr := remote.Pull(ctx, a.reference, false); pullErr == nil {
data, err = tryLoad()
if err == nil {
return data, nil
}
slog.Debug("Failed to check for OCI reference updates, using cached version", "ref", a.reference, "error", pullErr)
isCorrupted = isCorrupted || errors.Is(err, content.ErrStoreCorrupted)
} else {
slog.Debug(
"OCI pull failed, will evaluate fallback",
"ref", a.reference,
"error", pullErr,
)
}

// Load the agent contents from the store
af, err := store.GetArtifact(a.reference)
if err != nil {
return nil, fmt.Errorf("failed to load agent from store: %w", err)
// 3. Force re-pull if store is corrupted or inconsistent
if isCorrupted {
slog.Warn(
"Local OCI store corrupted or inconsistent, forcing re-pull",
"ref", a.reference,
)

if _, pullErr := remote.Pull(ctx, a.reference, true); pullErr != nil {
return nil, fmt.Errorf("failed to force re-pull OCI image %s: %w", a.reference, pullErr)
}

data, err = tryLoad()
if err == nil {
return data, nil
}
}

return []byte(af), nil
return nil, fmt.Errorf(
"failed to load agent from OCI source %s after retrying and re-fetching: %w",
a.reference,
err,
)

Check failure on line 174 in pkg/config/sources.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)
}

// urlSource is used to load an agent configuration from an HTTP/HTTPS URL.
Expand Down
100 changes: 76 additions & 24 deletions pkg/content/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -17,6 +18,11 @@ import (
"github.com/google/go-containerregistry/pkg/v1/tarball"
)

// ErrStoreCorrupted indicates that the local artifact store is in an
// inconsistent or partially missing state (e.g. missing tar, refs or metadata).
// Callers may safely attempt to re-fetch the artifact from the remote source.
var ErrStoreCorrupted = errors.New("local artifact store corrupted")

// Store manages the local content store for artifacts
type Store struct {
baseDir string
Expand Down Expand Up @@ -111,20 +117,30 @@ func (s *Store) StoreArtifact(img v1.Image, reference string) (string, error) {

// GetArtifactImage loads an artifact by digest or reference and returns it as a v1.Image
func (s *Store) GetArtifactImage(identifier string) (v1.Image, error) {
// Resolve the identifier (reference or digest) to a content digest.
// Any failure here means the local store is incomplete or inconsistent.
digest, err := s.resolveIdentifier(identifier)
if err != nil {
return nil, err
}

// Artifacts are stored locally as tarballs named by their digest.
artifactPath := filepath.Join(s.baseDir, digest+".tar")

if _, err := os.Stat(artifactPath); os.IsNotExist(err) {
return nil, fmt.Errorf("artifact %s not found in store", identifier)
// If the tarball is missing, the local store is considered corrupted.
// Callers can safely attempt to re-fetch the artifact from the remote source.
if _, err := os.Stat(artifactPath); err != nil {
if os.IsNotExist(err) {
return nil, ErrStoreCorrupted
}
return nil, fmt.Errorf("checking artifact file: %w", err)
}

// Load the OCI image from the local tarball.
// Any failure at this stage indicates a partially written or corrupted artifact.
img, err := tarball.ImageFromPath(artifactPath, nil)
if err != nil {
return nil, fmt.Errorf("loading image from tar file %s: %w", artifactPath, err)
return nil, ErrStoreCorrupted
}

return img, nil
Expand All @@ -139,8 +155,11 @@ func (s *Store) GetArtifactPath(identifier string) (string, error) {

artifactPath := filepath.Join(s.baseDir, digest+".tar")

if _, err := os.Stat(artifactPath); os.IsNotExist(err) {
return "", fmt.Errorf("artifact %s not found in store", identifier)
if _, err := os.Stat(artifactPath); err != nil {
if os.IsNotExist(err) {
return "", ErrStoreCorrupted
}
return "", err
}

return artifactPath, nil
Expand All @@ -157,28 +176,41 @@ func (s *Store) GetArtifactMetadata(identifier string) (*ArtifactMetadata, error
}

func (s *Store) GetArtifact(identifier string) (string, error) {
// Load the artifact image from the local store.
// Any error here is propagated so callers can decide whether to re-fetch.
img, err := s.GetArtifactImage(identifier)
if err != nil {
return "", err
}

// Extract layers from the OCI image.
// A failure indicates an invalid or partially written image.
layers, err := img.Layers()
if err != nil {
return "", err
return "", ErrStoreCorrupted
}

var buf bytes.Buffer
// An artifact without layers is considered invalid.
// This should never happen for a correctly stored agent.
if len(layers) == 0 {
return "", ErrStoreCorrupted
}

// Agents are expected to be stored in the first layer.
// If decompression fails, the local store is considered corrupted.
layer := layers[0]
b, err := layer.Uncompressed()
rc, err := layer.Uncompressed()
if err != nil {
return "", err
return "", ErrStoreCorrupted
}
defer rc.Close()

_, err = io.Copy(&buf, b)
if err != nil {
return "", err
// Read the full layer content into memory.
// Any I/O error here means the artifact cannot be trusted.
var buf bytes.Buffer
if _, err := io.Copy(&buf, rc); err != nil {
return "", ErrStoreCorrupted
}
b.Close()

return buf.String(), nil
}
Expand Down Expand Up @@ -232,32 +264,49 @@ func (s *Store) DeleteArtifact(identifier string) error {
return nil
}

// resolveIdentifier resolves a digest or reference to a digest
// resolveIdentifier resolves a user-provided identifier (digest or reference)
// into a concrete content digest stored in the local artifact store.
func (s *Store) resolveIdentifier(identifier string) (string, error) {
// If identifier doesn't contain ":" and looks like a reference (not a hex digest),
// add ":latest" tag
if !strings.Contains(identifier, ":") {
identifier += ":latest"
// If the identifier is already a digest, we can return it directly.
// This bypasses the refs lookup entirely.
if strings.HasPrefix(identifier, "sha256:") {
return identifier, nil
}

digest, err := s.resolveReference(identifier)
if err != nil {
return "", err
// If no tag is provided, default to ":latest".
// This mirrors standard OCI reference semantics.
if !strings.Contains(identifier, ":") {
identifier += ":latest"
}

return digest, nil
// Resolve the reference to a digest via the refs store.
// Any failure here indicates the local store is missing or inconsistent.
return s.resolveReference(identifier)
}

// resolveReference resolves a reference to a digest
// resolveReference resolves an OCI reference (e.g. repo:tag)
// to a concrete digest using the local refs index.
func (s *Store) resolveReference(reference string) (string, error) {
refsDir := filepath.Join(s.baseDir, "refs")

// References are mapped to digests using a stable hash of the reference string.
// This avoids filesystem issues with slashes, colons, etc.
refHash := sha256.Sum256([]byte(reference))
refFile := filepath.Join(refsDir, hex.EncodeToString(refHash[:]))

// Read the stored digest for this reference.
// If the file is missing, the local store is considered corrupted.
data, err := os.ReadFile(refFile)
if err != nil {
return "", fmt.Errorf("reference %s not found: %w", reference, err)
if os.IsNotExist(err) {
// This is the exact failure mode reported in the issue:
// the tar/metadata may exist, but the reference index is missing.
return "", ErrStoreCorrupted
}
return "", fmt.Errorf("reading reference file: %w", err)
}

// The file content is expected to be the digest string.
return strings.TrimSpace(string(data)), nil
}

Expand Down Expand Up @@ -317,6 +366,9 @@ func (s *Store) loadMetadata(digest string) (*ArtifactMetadata, error) {
metadataPath := filepath.Join(s.baseDir, digest+".json")
data, err := os.ReadFile(metadataPath)
if err != nil {
if os.IsNotExist(err) {
return nil, ErrStoreCorrupted
}
return nil, fmt.Errorf("reading metadata file: %w", err)
}

Expand Down
Loading