Skip to content

Commit 1ab992c

Browse files
committed
Fix concurrency, resource, and reliability issues in mirror
- Wire job contexts to server shutdown context so jobs are canceled on server stop instead of running indefinitely - Defer context cancel in runJob so completed jobs don't leak contexts - Cap error accumulation in progressTracker to 1000 entries to prevent OOM on large mirror operations with many failures - Add panic recovery in errgroup workers to prevent process crashes - Use defer for db.Close() in runMirror CLI to ensure cleanup on all error paths
1 parent 7bb944b commit 1ab992c

6 files changed

Lines changed: 52 additions & 38 deletions

File tree

cmd/proxy/main.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -432,11 +432,12 @@ func runMirror() {
432432
fmt.Fprintf(os.Stderr, "error opening database: %v\n", err)
433433
os.Exit(1)
434434
}
435+
defer func() { _ = db.Close() }()
435436

436437
if err := db.MigrateSchema(); err != nil {
437438
_ = db.Close()
438439
fmt.Fprintf(os.Stderr, "error migrating schema: %v\n", err)
439-
os.Exit(1)
440+
os.Exit(1) //nolint:gocritic // db closed above
440441
}
441442

442443
// Open storage
@@ -448,7 +449,7 @@ func runMirror() {
448449
if err != nil {
449450
_ = db.Close()
450451
fmt.Fprintf(os.Stderr, "error opening storage: %v\n", err)
451-
os.Exit(1)
452+
os.Exit(1) //nolint:gocritic // db closed above
452453
}
453454

454455
// Build proxy (reuses same pipeline as serve)
@@ -470,27 +471,22 @@ func runMirror() {
470471
if *dryRun {
471472
items, err := m.RunDryRun(ctx, source)
472473
if err != nil {
473-
_ = db.Close()
474474
fmt.Fprintf(os.Stderr, "error: %v\n", err)
475475
os.Exit(1)
476476
}
477477
fmt.Printf("Would mirror %d package versions:\n", len(items))
478478
for _, item := range items {
479479
fmt.Printf(" %s\n", item)
480480
}
481-
_ = db.Close()
482481
return
483482
}
484483

485484
progress, err := m.Run(ctx, source)
486485
if err != nil {
487-
_ = db.Close()
488486
fmt.Fprintf(os.Stderr, "error: %v\n", err)
489487
os.Exit(1)
490488
}
491489

492-
_ = db.Close()
493-
494490
fmt.Printf("Mirror complete: %d downloaded, %d skipped (cached), %d failed, %s total\n",
495491
progress.Completed, progress.Skipped, progress.Failed, formatSize(progress.Bytes))
496492

internal/mirror/job.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,19 @@ type JobRequest struct {
4141

4242
// JobStore manages in-memory mirror jobs.
4343
type JobStore struct {
44-
mu sync.RWMutex
45-
jobs map[string]*Job
46-
mirror *Mirror
44+
mu sync.RWMutex
45+
jobs map[string]*Job
46+
mirror *Mirror
47+
parentCtx context.Context
4748
}
4849

49-
// NewJobStore creates a new job store.
50-
func NewJobStore(m *Mirror) *JobStore {
50+
// NewJobStore creates a new job store. The parent context is used as the base
51+
// for all job contexts so that jobs are canceled when the server shuts down.
52+
func NewJobStore(ctx context.Context, m *Mirror) *JobStore {
5153
return &JobStore{
52-
jobs: make(map[string]*Job),
53-
mirror: m,
54+
jobs: make(map[string]*Job),
55+
mirror: m,
56+
parentCtx: ctx,
5457
}
5558
}
5659

@@ -62,7 +65,7 @@ func (js *JobStore) Create(req JobRequest) (string, error) {
6265
}
6366

6467
id := newJobID()
65-
ctx, cancel := context.WithCancel(context.Background())
68+
ctx, cancel := context.WithCancel(js.parentCtx)
6669

6770
job := &Job{
6871
ID: id,
@@ -75,7 +78,7 @@ func (js *JobStore) Create(req JobRequest) (string, error) {
7578
js.jobs[id] = job
7679
js.mu.Unlock()
7780

78-
go js.runJob(ctx, job, source)
81+
go js.runJob(ctx, cancel, job, source)
7982

8083
return id, nil
8184
}
@@ -144,7 +147,9 @@ func (js *JobStore) StartCleanup(ctx context.Context) {
144147
}
145148
}
146149

147-
func (js *JobStore) runJob(ctx context.Context, job *Job, source Source) {
150+
func (js *JobStore) runJob(ctx context.Context, cancel context.CancelFunc, job *Job, source Source) {
151+
defer cancel()
152+
148153
js.mu.Lock()
149154
job.State = JobStateRunning
150155
js.mu.Unlock()

internal/mirror/job_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package mirror
22

33
import (
4+
"context"
45
"testing"
56
"time"
67
)
78

89
func TestJobStoreCreateAndGet(t *testing.T) {
910
m := setupTestMirror(t, 1)
10-
js := NewJobStore(m)
11+
js := NewJobStore(context.Background(), m)
1112

1213
id, err := js.Create(JobRequest{
1314
PURLs: []string{"pkg:npm/lodash@4.17.21"},
@@ -34,7 +35,7 @@ func TestJobStoreCreateAndGet(t *testing.T) {
3435

3536
func TestJobStoreGetNotFound(t *testing.T) {
3637
m := setupTestMirror(t, 1)
37-
js := NewJobStore(m)
38+
js := NewJobStore(context.Background(), m)
3839

3940
job := js.Get("nonexistent")
4041
if job != nil {
@@ -44,7 +45,7 @@ func TestJobStoreGetNotFound(t *testing.T) {
4445

4546
func TestJobStoreCancelNotFound(t *testing.T) {
4647
m := setupTestMirror(t, 1)
47-
js := NewJobStore(m)
48+
js := NewJobStore(context.Background(), m)
4849

4950
if js.Cancel("nonexistent") {
5051
t.Error("expected Cancel to return false for nonexistent job")
@@ -53,7 +54,7 @@ func TestJobStoreCancelNotFound(t *testing.T) {
5354

5455
func TestJobStoreCreateInvalidRequest(t *testing.T) {
5556
m := setupTestMirror(t, 1)
56-
js := NewJobStore(m)
57+
js := NewJobStore(context.Background(), m)
5758

5859
_, err := js.Create(JobRequest{})
5960
if err == nil {
@@ -63,7 +64,7 @@ func TestJobStoreCreateInvalidRequest(t *testing.T) {
6364

6465
func TestJobStoreMultipleJobs(t *testing.T) {
6566
m := setupTestMirror(t, 1)
66-
js := NewJobStore(m)
67+
js := NewJobStore(context.Background(), m)
6768

6869
id1, err := js.Create(JobRequest{PURLs: []string{"pkg:npm/lodash@4.17.21"}})
6970
if err != nil {
@@ -88,7 +89,7 @@ func TestJobStoreMultipleJobs(t *testing.T) {
8889

8990
func TestSourceFromRequestPURLs(t *testing.T) {
9091
m := setupTestMirror(t, 1)
91-
js := NewJobStore(m)
92+
js := NewJobStore(context.Background(), m)
9293

9394
source, err := js.sourceFromRequest(JobRequest{PURLs: []string{"pkg:npm/lodash@1.0.0"}})
9495
if err != nil {
@@ -101,7 +102,7 @@ func TestSourceFromRequestPURLs(t *testing.T) {
101102

102103
func TestSourceFromRequestRegistry(t *testing.T) {
103104
m := setupTestMirror(t, 1)
104-
js := NewJobStore(m)
105+
js := NewJobStore(context.Background(), m)
105106

106107
source, err := js.sourceFromRequest(JobRequest{Registry: "npm"})
107108
if err != nil {
@@ -114,7 +115,7 @@ func TestSourceFromRequestRegistry(t *testing.T) {
114115

115116
func TestJobStoreCleanup(t *testing.T) {
116117
m := setupTestMirror(t, 1)
117-
js := NewJobStore(m)
118+
js := NewJobStore(context.Background(), m)
118119

119120
// Add a completed job with old CreatedAt
120121
js.mu.Lock()

internal/mirror/mirror.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,18 @@ func newProgressTracker() *progressTracker {
7878
return pt
7979
}
8080

81+
const maxTrackedErrors = 1000
82+
8183
func (pt *progressTracker) addError(eco, name, version, err string) {
8284
pt.mu.Lock()
83-
pt.errors = append(pt.errors, MirrorError{
84-
Ecosystem: eco,
85-
Name: name,
86-
Version: version,
87-
Error: err,
88-
})
85+
if len(pt.errors) < maxTrackedErrors {
86+
pt.errors = append(pt.errors, MirrorError{
87+
Ecosystem: eco,
88+
Name: name,
89+
Version: version,
90+
Error: err,
91+
})
92+
}
8993
pt.mu.Unlock()
9094
}
9195

@@ -132,7 +136,15 @@ func (m *Mirror) Run(ctx context.Context, source Source) (*Progress, error) {
132136
g.SetLimit(m.workers)
133137

134138
for _, item := range items {
135-
g.Go(func() error {
139+
g.Go(func() (err error) {
140+
defer func() {
141+
if r := recover(); r != nil {
142+
m.logger.Error("panic in mirror worker", "recover", r,
143+
"ecosystem", item.Ecosystem, "name", item.Name, "version", item.Version)
144+
tracker.failed.Add(1)
145+
tracker.addError(item.Ecosystem, item.Name, item.Version, fmt.Sprintf("panic: %v", r))
146+
}
147+
}()
136148
m.mirrorOne(gctx, item, tracker)
137149
return nil // never fail the group; errors are tracked
138150
})

internal/server/mirror_api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func setupMirrorAPI(t *testing.T) *MirrorAPIHandler {
4343
proxy := handler.NewProxy(db, store, fetcher, resolver, logger)
4444

4545
m := mirror.New(proxy, db, store, logger, 1)
46-
js := mirror.NewJobStore(m)
46+
js := mirror.NewJobStore(context.Background(), m)
4747
return NewMirrorAPIHandler(js)
4848
}
4949

internal/server/server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,13 @@ func (s *Server) Start() error {
236236
r.Get("/api/compare/{ecosystem}/{name}/{fromVersion}/{toVersion}", s.handleCompareDiff)
237237
r.Get("/package/{ecosystem}/{name}/compare/{versions}", s.handleComparePage)
238238

239+
// Start background context (used by mirror jobs and cleanup)
240+
bgCtx, bgCancel := context.WithCancel(context.Background())
241+
s.cancel = bgCancel
242+
239243
// Mirror API endpoints
240244
mirrorSvc := mirror.New(proxy, s.db, s.storage, s.logger, 4) //nolint:mnd // default concurrency
241-
jobStore := mirror.NewJobStore(mirrorSvc)
245+
jobStore := mirror.NewJobStore(bgCtx, mirrorSvc)
242246
mirrorAPI := NewMirrorAPIHandler(jobStore)
243247
r.Post("/api/mirror", mirrorAPI.HandleCreate)
244248
r.Get("/api/mirror/{id}", mirrorAPI.HandleGet)
@@ -257,10 +261,6 @@ func (s *Server) Start() error {
257261
"base_url", s.cfg.BaseURL,
258262
"storage", s.cfg.Storage.Path, //nolint:staticcheck // backwards compat
259263
"database", s.cfg.Database.Path)
260-
261-
// Start background goroutines
262-
bgCtx, bgCancel := context.WithCancel(context.Background())
263-
s.cancel = bgCancel
264264
go s.updateCacheStatsMetrics()
265265
go jobStore.StartCleanup(bgCtx)
266266

0 commit comments

Comments
 (0)