Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ block-test.hcl
.env

.claude/
.idea/

# Binaries
/cachew
Expand Down
4 changes: 3 additions & 1 deletion cmd/cachew/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ type RestoreCmd struct {
func (c *RestoreCmd) Run(ctx context.Context, cache cache.Cache) error {
fmt.Fprintf(os.Stderr, "Restoring to %s...\n", c.Directory) //nolint:forbidigo
namespacedCache := cache.Namespace(c.Namespace)
if err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory, c.ZstdThreads); err != nil {
result, err := snapshot.Restore(ctx, namespacedCache, c.Key.Key(), c.Directory, c.ZstdThreads)
if err != nil {
return errors.Wrap(err, "failed to restore snapshot")
}
fmt.Fprintf(os.Stderr, "Restored %d bytes in %dms\n", result.BytesRead, result.Duration.Milliseconds()) //nolint:forbidigo

fmt.Fprintf(os.Stderr, "Snapshot restored: %s\n", c.Key.String()) //nolint:forbidigo
return nil
Expand Down
38 changes: 35 additions & 3 deletions internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,51 @@ func StreamTo(ctx context.Context, w io.Writer, directory string, excludePattern
return errors.Join(errs...)
}

// RestoreResult contains metadata about a completed restore operation.
type RestoreResult struct {
// BytesRead is the number of compressed bytes read from the cache.
BytesRead int64
// Duration is the wall-clock time for the restore operation.
Duration time.Duration
}

// countingReader wraps an io.Reader and counts the number of bytes read.
type countingReader struct {
r io.Reader
n int64
}

func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.n += int64(n)
return n, err //nolint:wrapcheck
}

// Restore downloads an archive from the cache and extracts it to a directory.
//
// The archive is decompressed with zstd and extracted with tar, preserving
// all file permissions, ownership, and symlinks.
// The operation is fully streaming - no temporary files are created.
// threads controls zstd parallelism; 0 uses all available CPU cores.
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) error {
func Restore(ctx context.Context, remote cache.Cache, key cache.Key, directory string, threads int) (*RestoreResult, error) {
start := time.Now()

rc, _, err := remote.Open(ctx, key)
if err != nil {
return errors.Wrap(err, "failed to open object")
return nil, errors.Wrap(err, "failed to open object")
}
defer rc.Close()

return Extract(ctx, rc, directory, threads)
cr := &countingReader{r: rc}

if err := Extract(ctx, cr, directory, threads); err != nil {
return nil, err
}

return &RestoreResult{
BytesRead: cr.n,
Duration: time.Since(start),
}, nil
}

// Extract decompresses a zstd+tar stream into directory, preserving all file
Expand All @@ -183,6 +214,7 @@ func Extract(ctx context.Context, r io.Reader, directory string, threads int) er
threads = runtime.NumCPU()
}

// Create target directory if it doesn't exist
if err := os.MkdirAll(directory, 0o750); err != nil {
return errors.Wrap(err, "failed to create target directory")
}
Expand Down
18 changes: 9 additions & 9 deletions internal/snapshot/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestCreateAndRestoreRoundTrip(t *testing.T) {
assert.Equal(t, "application/zstd", headers.Get("Content-Type"))

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

content1, err := os.ReadFile(filepath.Join(dstDir, "file1.txt"))
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestCreateWithExcludePatterns(t *testing.T) {
assert.NoError(t, err)

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

_, err = os.Stat(filepath.Join(dstDir, "include.txt"))
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestCreateExcludesOnlyGitLockFiles(t *testing.T) {
assert.NoError(t, err)

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

// Tracked lock files must be present.
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestCreatePreservesSymlinks(t *testing.T) {
assert.NoError(t, err)

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

info, err := os.Lstat(filepath.Join(dstDir, "link.txt"))
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestRestoreNonexistentKey(t *testing.T) {
key := cache.Key{1, 2, 3}

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.Error(t, err)
}

Expand All @@ -237,7 +237,7 @@ func TestRestoreCreatesTargetDirectory(t *testing.T) {
assert.NoError(t, err)

dstDir := filepath.Join(t.TempDir(), "nested", "target")
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

content, err := os.ReadFile(filepath.Join(dstDir, "file.txt"))
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestRestoreContextCancellation(t *testing.T) {
cancel()

dstDir := t.TempDir()
err = snapshot.Restore(cancelCtx, mem, key, dstDir, 0)
_, err = snapshot.Restore(cancelCtx, mem, key, dstDir, 0)
assert.Error(t, err)
}

Expand All @@ -283,7 +283,7 @@ func TestCreateEmptyDirectory(t *testing.T) {
assert.NoError(t, err)

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

entries, err := os.ReadDir(dstDir)
Expand All @@ -307,7 +307,7 @@ func TestCreateWithNestedDirectories(t *testing.T) {
assert.NoError(t, err)

dstDir := t.TempDir()
err = snapshot.Restore(ctx, mem, key, dstDir, 0)
_, err = snapshot.Restore(ctx, mem, key, dstDir, 0)
assert.NoError(t, err)

content, err := os.ReadFile(filepath.Join(dstDir, "a", "b", "c", "d", "e", "deep.txt"))
Expand Down
31 changes: 22 additions & 9 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func New(
logger.InfoContext(ctx, "Startup fetch completed for existing repo", "upstream", repo.UpstreamURL(),
"duration", time.Since(start))

recordCloneMetrics(ctx, "local", time.Since(start), 0)

postRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
Expand Down Expand Up @@ -469,8 +471,10 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

logger.InfoContext(ctx, "Attempting mirror snapshot restore", "upstream", upstream)

if err := s.tryRestoreSnapshot(ctx, repo); err != nil {
logger.InfoContext(ctx, "Mirror snapshot restore failed, falling back to clone", "upstream", upstream, "error", err)
cloneStart := time.Now()
restoreResult, restoreErr := s.tryRestoreSnapshot(ctx, repo)
if restoreErr != nil {
logger.InfoContext(ctx, "Mirror snapshot restore failed, falling back to clone", "upstream", upstream, "error", restoreErr)
} else {
logger.InfoContext(ctx, "Mirror snapshot restored, fetching to freshen", "upstream", upstream)

Expand Down Expand Up @@ -500,6 +504,9 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

logger.InfoContext(ctx, "Post-restore fetch completed, serving", "upstream", upstream)

recordCloneSuccess(ctx, "mirror")
recordCloneMetrics(ctx, "mirror", time.Since(cloneStart), restoreResult.BytesRead)

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
Expand All @@ -522,11 +529,15 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

if err != nil {
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
recordCloneFailure(ctx, "upstream", err)
return
}

logger.InfoContext(ctx, "Clone completed", "upstream", upstream, "path", repo.Path())

recordCloneSuccess(ctx, "upstream")
recordCloneMetrics(ctx, "upstream", time.Since(cloneStart), 0)

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
Expand All @@ -539,28 +550,30 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
// Mirror snapshots are bare repositories that can be extracted and used directly
// without any conversion. On failure the repo path is cleaned up so the caller
// can fall back to clone.
func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) error {
func (s *Strategy) tryRestoreSnapshot(ctx context.Context, repo *gitclone.Repository) (*snapshot.RestoreResult, error) {
cacheKey := mirrorSnapshotCacheKey(repo.UpstreamURL())

if err := os.MkdirAll(filepath.Dir(repo.Path()), 0o750); err != nil {
return errors.Wrap(err, "create parent directory for restore")
return nil, errors.Wrap(err, "create parent directory for restore")
}

logger := logging.FromContext(ctx)

if err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads); err != nil {
result, err := snapshot.Restore(ctx, s.cache, cacheKey, repo.Path(), s.config.ZstdThreads)
if err != nil {
_ = os.RemoveAll(repo.Path())
return errors.Wrap(err, "restore mirror snapshot")
return nil, errors.Wrap(err, "restore mirror snapshot")
}
logger.InfoContext(ctx, "Mirror snapshot extracted", "upstream", repo.UpstreamURL(), "path", repo.Path())
logger.InfoContext(ctx, "Mirror snapshot extracted", "upstream", repo.UpstreamURL(), "path", repo.Path(),
"bytes_read", result.BytesRead, "duration_ms", result.Duration.Milliseconds())

if err := repo.MarkRestored(ctx); err != nil {
_ = os.RemoveAll(repo.Path())
return errors.Wrap(err, "mark restored")
return nil, errors.Wrap(err, "mark restored")
}
logger.InfoContext(ctx, "Repository marked as restored", "upstream", repo.UpstreamURL(), "state", repo.State())

return nil
return result, nil
}

func (s *Strategy) maybeBackgroundFetch(repo *gitclone.Repository) {
Expand Down
Loading
Loading