Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "net/http/pprof" //nolint:gosec
"os"
"strings"
"sync/atomic"
"time"

"github.com/alecthomas/chroma/v2/quick"
Expand Down Expand Up @@ -81,7 +82,19 @@ func main() {

schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig)

cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider)
// metaStore is populated by config.Load once the metadata backend is
// constructed. The git strategy resolves its namespace through this
// pointer at strategy-construction time, by which point Load has set it.
var metaStore atomic.Pointer[metadatadb.Store]
gitMetadataProvider := func() (*metadatadb.Namespace, error) {
s := metaStore.Load()
if s == nil {
return nil, errors.New("metadata store not yet initialised")
}
return s.Namespace("git"), nil
}

cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider, gitMetadataProvider)

// Commands
switch { //nolint:gocritic
Expand All @@ -90,7 +103,7 @@ func main() {
return
}

mux, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars)
mux, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars, metaStore.Store)
fatalIfError(ctx, logger, err, "Failed to load config")

metricsClient, err := metrics.New(ctx, globalConfig.MetricsConfig)
Expand Down Expand Up @@ -126,6 +139,7 @@ func newRegistries(
cloneManagerProvider gitclone.ManagerProvider,
tokenManagerProvider githubapp.TokenManagerProvider,
s3ClientProvider s3client.ClientProvider,
gitMetadataProvider metadatadb.NamespaceProvider,
) (
*cache.Registry,
*metadatadb.Registry,
Expand All @@ -147,7 +161,7 @@ func newRegistries(
strategy.RegisterHermit(sr)
strategy.RegisterHost(sr)
strategy.RegisterHTTPProxy(sr)
git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider)
git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider, gitMetadataProvider)
gomod.Register(sr, cloneManagerProvider)

return cr, mr, sr
Expand All @@ -166,7 +180,7 @@ func printSchema(kctx *kong.Context, cr *cache.Registry, mr *metadatadb.Registry
}
}

func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) {
func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string, setMetadataStore func(*metadatadb.Store)) (http.Handler, error) {
mux := http.NewServeMux()

mux.HandleFunc("GET /_liveness", func(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -209,7 +223,7 @@ func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr
http.DefaultServeMux.ServeHTTP(w, r)
}))

handler, _, loaded, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars)
handler, _, loaded, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars, setMetadataStore)
if err != nil {
return nil, errors.Errorf("load config: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func classifyBlocks(ast *hcl.AST) (*classifiedBlocks, error) {
// It returns an http.Handler that wraps mux — any loaded strategies that implement
// strategy.Interceptor are applied as middleware before ServeMux route matching, so
// that they can inspect r.RequestURI rather than the path-only r.URL.Path.
//
// setMetadataStore, if non-nil, is invoked once with the constructed
// metadatadb.Store before any strategies are created. This lets callers wire
// up provider closures (e.g. a strategy.NamespaceProvider) that resolve
// against the store at strategy-construction time.
func Load(
ctx context.Context,
cr *cache.Registry,
Expand All @@ -154,6 +159,7 @@ func Load(
ast *hcl.AST,
mux *http.ServeMux,
vars map[string]string,
setMetadataStore func(*metadatadb.Store),
) (http.Handler, metadatadb.Backend, []strategy.Readier, error) {
logger := logging.FromContext(ctx)
expandVars(ast, vars)
Expand Down Expand Up @@ -190,6 +196,9 @@ func Load(
if err != nil {
return nil, nil, nil, errors.Errorf("%s: %w", classified.metadata.Pos, err)
}
if setMetadataStore != nil {
setMetadataStore(metadatadb.New(ctx, metadata))
}

cache := cache.MaybeNewTiered(ctx, caches)

Expand Down
2 changes: 1 addition & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestLoadRequiresMetadataBackend(t *testing.T) {
assert.NoError(t, err)

ctx := logging.ContextWithLogger(context.Background(), slog.Default())
_, _, _, err = Load(ctx, cr, mr, sr, ast, http.NewServeMux(), nil)
_, _, _, err = Load(ctx, cr, mr, sr, ast, http.NewServeMux(), nil, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "expected a metadata backend")
}
7 changes: 7 additions & 0 deletions internal/metadatadb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ type Namespace struct {
name string
}

// NamespaceProvider lazily resolves a *Namespace. It is intended for wiring
// strategies and other components that need persistent state but are
// constructed before the metadata Store is available. A nil provider, or one
// that returns a nil Namespace, indicates that no metadata store is wired up
// — callers must tolerate this for testability.
type NamespaceProvider func() (*Namespace, error)

// Flush forces an immediate sync with the backend.
func (n *Namespace) Flush(ctx context.Context) error {
return errors.Wrap(n.backend.Flush(ctx, n.name), "flush namespace")
Expand Down
43 changes: 41 additions & 2 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/snapshot"
"github.com/block/cachew/internal/strategy"
)

func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider) {
func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider, metadataProvider metadatadb.NamespaceProvider) {
strategy.Register(r, "git", "Caches Git repositories, including tarball snapshots.", func(ctx context.Context, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) {
return New(ctx, config, scheduler, cache, mux, cloneManagerProvider, tokenManagerProvider)
return New(ctx, config, scheduler, cache, mux, cloneManagerProvider, tokenManagerProvider, metadataProvider)
})
}

Expand Down Expand Up @@ -59,6 +60,7 @@ type Strategy struct {
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
metrics *gitMetrics
repoCounts *RepoCounts
ready atomic.Bool
}

Expand All @@ -70,6 +72,7 @@ func New(
mux strategy.Mux,
cloneManagerProvider gitclone.ManagerProvider,
tokenManagerProvider githubapp.TokenManagerProvider,
metadataProvider metadatadb.NamespaceProvider,
) (*Strategy, error) {
if _, err := exec.LookPath("git"); err != nil {
return nil, errors.New("git is required but not found in PATH")
Expand Down Expand Up @@ -117,6 +120,19 @@ func New(
return nil, errors.Wrap(err, "failed to create scheduler")
}

var repoCounts *RepoCounts
if metadataProvider != nil {
ns, err := metadataProvider()
if err != nil {
return nil, errors.Wrap(err, "resolve metadata namespace")
}
repoCounts = NewRepoCounts(ns)
if repoCounts != nil {
logger.InfoContext(ctx, "Per-repo clone histogram enabled",
"retention_days", repoCounts.retentionDays)
}
}

m := newGitMetrics()

s := &Strategy{
Expand All @@ -129,6 +145,16 @@ func New(
spools: make(map[string]*RepoSpools),
tokenManager: tokenManager,
metrics: m,
repoCounts: repoCounts,
}

if s.repoCounts != nil {
s.scheduler.SubmitPeriodicJob("repo-counts-reaper", "reap-repo-counts", defaultRepoCountsReapInterval, func(ctx context.Context) error {
if deleted := s.repoCounts.Reap(); deleted > 0 {
logging.FromContext(ctx).InfoContext(ctx, "Reaped stale repo clone counts", "deleted", deleted)
}
return nil
})
}
// Run startup fetches in the background so the HTTP listener (and
// /_liveness) come up immediately. /_readiness gates on Ready() so the
Expand Down Expand Up @@ -301,6 +327,19 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host
return
}

// Count this request in the per-repo clone/fetch histogram. Only real
// pack negotiations are counted: GET /info/refs (every fetch's
// discovery probe, every ls-remote, and the proxy's own staleness
// checks) and protocol v2 POST command=ls-refs (the v2 equivalent of
// info/refs) are both excluded so the histogram reflects user clones
// rather than discovery traffic. The increment happens after
// GetOrCreate so unvalidated upstream URLs cannot bloat the keyspace.
if isFetch, ferr := RequestCountsAsFetch(pathValue, r); ferr != nil {
logger.WarnContext(ctx, "Failed to inspect upload-pack body for clone counting", "error", ferr)
} else if isFetch {
s.repoCounts.IncrementClone(upstreamURL)
}

state := repo.State()
isInfoRefs := strings.HasSuffix(pathValue, "/info/refs")

Expand Down
41 changes: 34 additions & 7 deletions internal/strategy/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/strategy/git"
)

Expand Down Expand Up @@ -98,7 +99,7 @@ func TestNew(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mux := newTestMux()
cm := gitclone.NewManagerProvider(ctx, tt.config, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
if tt.wantError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.wantError)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
}

Expand All @@ -220,7 +221,7 @@ func TestNewIsReadyAfterWarm(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

deadline := time.Now().Add(5 * time.Second)
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestIntegrationWithMockUpstream(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Verify handlers exist
Expand Down Expand Up @@ -278,7 +279,7 @@ func TestNewMissingGitBinary(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "git")
}
Expand All @@ -302,7 +303,7 @@ func TestNewMissingSnapshotBinaries(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones-missing-tar"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "tar")
})
Expand All @@ -318,12 +319,38 @@ func TestNewMissingSnapshotBinaries(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones-missing-zstd"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "zstd")
})
}

// TestNewWithMetadataProvider verifies the metadata namespace provider is
// invoked and a non-nil namespace is accepted without error. The behaviour of
// the resulting RepoCounts is covered in repocounts_test.go.
func TestNewWithMetadataProvider(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})

store := metadatadb.New(ctx, metadatadb.NewMemoryBackend())
mux := newTestMux()
cm := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: filepath.Join(t.TempDir(), "clones"),
FetchInterval: 15,
}, nil)

var providerCalls int
_, err := git.New(
ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm,
func() (*githubapp.TokenManager, error) { return nil, nil }, //nolint:nilnil
func() (*metadatadb.Namespace, error) {
providerCalls++
return store.Namespace("git"), nil
},
)
assert.NoError(t, err)
assert.Equal(t, 1, providerCalls, "metadata namespace provider should be invoked exactly once")
}

func TestParseGitRefs(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})
_ = ctx
Expand Down
10 changes: 5 additions & 5 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
mux := http.NewServeMux()
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Start a test server with logging middleware
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) {
mux := http.NewServeMux()
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) {
}, nil)
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) {
}, nil)
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

strategy.SetHTTPTransport(&countingTransport{
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) {
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc,
func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Redirect all upstream proxy requests to the mock server.
Expand Down
4 changes: 2 additions & 2 deletions internal/strategy/git/repack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestRepackInterval(t *testing.T) {
}, nil)
s, err := git.New(ctx, git.Config{
RepackInterval: tt.repackInterval,
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
assert.True(t, s != nil)
})
Expand All @@ -65,7 +65,7 @@ func TestRepackScheduledForExistingRepos(t *testing.T) {
}, nil)
s, err := git.New(ctx, git.Config{
RepackInterval: 24 * time.Hour,
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
assert.True(t, s != nil)
}
Loading