Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,13 @@ type WarmCapacityMissAggregate struct {
Count int64 `json:"count"`
}

// WarmCapacityWorkerStats is the grouped warm-worker state used for
// warm-capacity observability.
type WarmCapacityWorkerStats struct {
Scope string `json:"scope"`
ReadyWorkers int64 `json:"ready_workers"`
SpawningWorkers int64 `json:"spawning_workers"`
// WorkerLifecycleStats is the grouped worker lifecycle state used for
// cluster-wide worker observability.
type WorkerLifecycleStats struct {
Image string `json:"image"`
State WorkerState `json:"state"`
Binding string `json:"binding"`
Count int64 `json:"count"`
}

// WorkerRecord is the durable runtime coordination record for one worker pod.
Expand Down
34 changes: 18 additions & 16 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,26 +855,28 @@ func (cs *ConfigStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, er
return result.RowsAffected, nil
}

// ListWarmCapacityWorkerStats returns grouped cluster-wide warm-worker state
// for image-scoped warm-capacity observability.
func (cs *ConfigStore) ListWarmCapacityWorkerStats() ([]WarmCapacityWorkerStats, error) {
var out []WarmCapacityWorkerStats
// ListWorkerLifecycleStats returns grouped cluster-wide active worker lifecycle
// state by image and tenant binding for Prometheus observability.
func (cs *ConfigStore) ListWorkerLifecycleStats() ([]WorkerLifecycleStats, error) {
const bindingExpr = "CASE WHEN NULLIF(org_id, '') IS NULL THEN 'neutral' ELSE 'org_bound' END"
var out []WorkerLifecycleStats
err := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Select(
"('image:' || image) AS scope, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS ready_workers, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS spawning_workers",
WorkerStateIdle,
WorkerStateSpawning,
).
Where("org_id = ''").
Select("image, state, "+bindingExpr+" AS binding, COUNT(*)::bigint AS count").
Where("image <> ''").
Where("state IN ?", []WorkerState{WorkerStateIdle, WorkerStateSpawning}).
Group("image").
Order("scope ASC").
Where("state IN ?", []WorkerState{
WorkerStateSpawning,
WorkerStateIdle,
WorkerStateReserved,
WorkerStateActivating,
WorkerStateHot,
WorkerStateHotIdle,
WorkerStateDraining,
}).
Group("image, state, " + bindingExpr).
Order("image ASC, state ASC, binding ASC").
Scan(&out).Error
if err != nil {
return nil, fmt.Errorf("list warm capacity worker stats: %w", err)
return nil, fmt.Errorf("list worker lifecycle stats: %w", err)
}
return out, nil
}
Expand Down
18 changes: 0 additions & 18 deletions controlplane/flight_ingress_metrics_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ import (

// --- Per-org metrics (multi-tenant mode) ---

var orgWorkersActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_workers_active",
Help: "Number of active workers per org",
}, []string{"org"})

var orgWorkersIdleGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_workers_idle",
Help: "Number of idle workers per org",
}, []string{"org"})

var orgSessionsActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_sessions_active",
Help: "Number of active sessions per org",
Expand Down Expand Up @@ -53,14 +43,6 @@ var sniRoutingResolutionsCounter = promauto.NewCounterVec(prometheus.CounterOpts
Help: "SNI hostname prefix resolutions, partitioned by whether a hostname_alias was used",
}, []string{"protocol", "alias_used"})

func observeOrgWorkersActive(org string, count int) {
orgWorkersActiveGauge.WithLabelValues(org).Set(float64(count))
}

func observeOrgWorkersIdle(org string, count int) {
orgWorkersIdleGauge.WithLabelValues(org).Set(float64(count))
}

func observeOrgSessionsActive(org string, count int) {
orgSessionsActiveGauge.WithLabelValues(org).Set(float64(count))
}
Expand Down
6 changes: 6 additions & 0 deletions controlplane/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ControlPlaneJanitor struct {
retireOrphanWorker func(record configstore.WorkerRecord, reason string) // orphan-cleanup variant: handles any active state, skips local-pool bookkeeping
retireLocalWorker func(workerID int, reason string) bool // retires from in-memory pool + pod, returns false if not local
reconcileWarmCapacity func()
onStop func()
retireMismatchedVersionWorker func() // reaps one warm idle worker whose Deployment version differs from this CP's (leader-only)
cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only)
}
Expand Down Expand Up @@ -71,6 +72,11 @@ func (j *ControlPlaneJanitor) Run(ctx context.Context) {
if j == nil || j.store == nil {
return
}
defer func() {
if j.onStop != nil {
j.onStop()
}
}()

j.runOnce()

Expand Down
30 changes: 30 additions & 0 deletions controlplane/janitor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controlplane

import (
"context"
"errors"
"sync"
"testing"
Expand Down Expand Up @@ -125,6 +126,35 @@ func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) {
}
}

func TestControlPlaneJanitorRunInvokesOnStop(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
janitor := NewControlPlaneJanitor(store, time.Hour, 20*time.Second)
stopped := make(chan struct{})
janitor.onStop = func() {
close(stopped)
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
janitor.Run(ctx)
close(done)
}()

cancel()

select {
case <-stopped:
case <-time.After(time.Second):
t.Fatal("expected janitor onStop callback")
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("expected janitor run loop to exit")
}
}

func TestControlPlaneJanitorRunPrunesWarmCapacityMissBuckets(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC)
Expand Down
21 changes: 6 additions & 15 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p
p.workers[id] = w
p.stampNodeFirstSeenLocked(ready.nodeName)
workerCount := len(p.workers)
observeWarmPoolLifecycleGauges(p.workers)
p.mu.Unlock()
if publishIdle {
p.persistWorkerRecord(p.workerRecordFor(id, w, w.OwnerEpoch(), configstore.WorkerStateIdle, "", nil))
Expand Down Expand Up @@ -1234,7 +1233,6 @@ func (p *K8sWorkerPool) TransitionToHotIdleIfNoSessions(id int) bool {
}
w.lastUsed = time.Now()
hotIdleRecord := p.workerRecordFor(id, w, w.OwnerEpoch(), configstore.WorkerStateHotIdle, "", nil)
observeWarmPoolLifecycleGauges(p.workers)
p.mu.Unlock()
p.persistWorkerRecord(hotIdleRecord)
return true
Expand Down Expand Up @@ -1327,7 +1325,6 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana
return setErr
}
hotRecord := p.workerRecordFor(worker.ID, worker, worker.OwnerEpoch(), configstore.WorkerStateHot, "", nil)
observeWarmPoolLifecycleGauges(p.workers)
p.mu.Unlock()
p.persistWorkerRecord(hotRecord)
return nil
Expand Down Expand Up @@ -1436,7 +1433,6 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor
idle.IncrementOwnerEpoch()
idle.reservedAt = time.Now()
reservedRecord := p.workerRecordFor(idle.ID, idle, idle.OwnerEpoch(), configstore.WorkerStateReserved, "", nil)
observeWarmPoolLifecycleGauges(p.workers)
shouldReplenish := p.shouldReplenishWarmCapacityLocked()
var replenishID int
if shouldReplenish {
Expand Down Expand Up @@ -1484,16 +1480,16 @@ func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, rea
return
}

scope := "image:" + image
observeWarmCapacityMiss(scope, policy.reason)
observeWarmCapacityMiss(image, policy.reason)
if !policy.recordDynamicDemand {
return
}
if p.runtimeStore == nil {
return
}
scope := warmCapacityScopeForImage(image)
if err := p.runtimeStore.RecordWarmCapacityMiss(scope, policy.reason, time.Now()); err != nil {
slog.Warn("Failed to record warm capacity miss.", "scope", scope, "reason", policy.reason, "error", err)
slog.Warn("Failed to record warm capacity miss.", "image", image, "reason", policy.reason, "error", err)
}
}

Expand Down Expand Up @@ -1556,7 +1552,6 @@ func (p *K8sWorkerPool) reserveClaimedWorker(ctx context.Context, claimed *confi
return nil, err
}
worker.reservedAt = time.Now()
observeWarmPoolLifecycleGauges(p.workers)
if claimed.State != configstore.WorkerStateReserved {
reservedRecord = p.workerRecordFor(worker.ID, worker, worker.OwnerEpoch(), configstore.WorkerStateReserved, "", nil)
}
Expand Down Expand Up @@ -1836,7 +1831,6 @@ func (p *K8sWorkerPool) SpawnMinWorkers(count int) error {
p.mu.Lock()
p.spawning--
if err == nil {
observeWarmPoolLifecycleGauges(p.workers)
}
p.mu.Unlock()

Expand Down Expand Up @@ -1883,7 +1877,6 @@ func (p *K8sWorkerPool) SpawnMinWorkers(count int) error {
p.mu.Lock()
p.spawning--
if err == nil {
observeWarmPoolLifecycleGauges(p.workers)
}
p.mu.Unlock()

Expand Down Expand Up @@ -1969,7 +1962,7 @@ func (p *K8sWorkerPool) SpawnMinWorkersForImage(ctx context.Context, image strin
p.maxWorkers,
)
if err != nil {
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "error", len(slots))
observeWarmCapacityReconcileSpawns(image, "error", len(slots))
for _, s := range slots {
p.retireClaimedWorker(s, RetireReasonCrash)
}
Expand Down Expand Up @@ -2000,7 +1993,6 @@ func (p *K8sWorkerPool) SpawnMinWorkersForImage(ctx context.Context, image strin
p.mu.Lock()
p.spawning--
if err == nil {
observeWarmPoolLifecycleGauges(p.workers)
}
p.mu.Unlock()

Expand All @@ -2021,8 +2013,8 @@ func (p *K8sWorkerPool) SpawnMinWorkersForImage(ctx context.Context, image strin
successes++
}
}
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "success", successes)
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "error", failures)
observeWarmCapacityReconcileSpawns(image, "success", successes)
observeWarmCapacityReconcileSpawns(image, "error", failures)
return err
}

Expand Down Expand Up @@ -2774,7 +2766,6 @@ func (p *K8sWorkerPool) markWorkerRetiredInMemoryLocked(w *ManagedWorker, reason
}
_ = w.SetSharedState(nextState)
observeWorkerRetirement(reason)
observeWarmPoolLifecycleGauges(p.workers)
}

func (p *K8sWorkerPool) persistWorkerRecord(record *configstore.WorkerRecord) {
Expand Down
11 changes: 5 additions & 6 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,8 @@ func TestK8sPoolSpawnMinWorkersForImageSpawnsOnlyTheDeficit(t *testing.T) {
func TestK8sPoolSpawnMinWorkersForImageCountsMixedSpawnResults(t *testing.T) {
pool, _ := newTestK8sPool(t, 5)
image := "duckgres:metrics-mixed"
scope := warmCapacityImageScope(image)
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(scope, "success")
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(scope, "error")
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(image, "success")
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(image, "error")

nextID := 100
store := &captureRuntimeWorkerStore{
Expand All @@ -1026,10 +1025,10 @@ func TestK8sPoolSpawnMinWorkersForImageCountsMixedSpawnResults(t *testing.T) {
if err := pool.SpawnMinWorkersForImage(context.Background(), image, 2); err == nil {
t.Fatal("expected mixed spawn batch to return an error")
}
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, scope, "success"); got != 1 {
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, image, "success"); got != 1 {
t.Fatalf("expected one successful reconcile spawn, got %v", got)
}
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, scope, "error"); got != 1 {
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, image, "error"); got != 1 {
t.Fatalf("expected one failed reconcile spawn, got %v", got)
}
}
Expand Down Expand Up @@ -2950,7 +2949,7 @@ func TestRetireOneMismatchedVersionWorker_NoopWhenCPIDHasNoHashSuffix(t *testing
// These tests pin the expected behavior of the K8s-label-based reconciler.

// strandedReconcilerPool wires a K8sWorkerPool with a fake clientset and store
// for reconciler tests. Ownership labels aren't checked by the reconciler, so
// for reconciler tests. Owner labels aren't checked by the reconciler, so
// we keep the setup minimal.
func strandedReconcilerPool(t *testing.T, store RuntimeWorkerStore) (*K8sWorkerPool, *fake.Clientset) {
t.Helper()
Expand Down
Loading
Loading