Skip to content
Draft
10 changes: 8 additions & 2 deletions controlplane/configstore/lifecycle_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func workerSnapshotsFromRecords(records []WorkerRecord) []WorkerSnapshot {
// in-memory ManagedWorker caches owner_cp_instance_id and owner_epoch
// from the most recent claim/takeover/refresh. The lifecycle CAS itself
// is what enforces freshness: a stale lease will simply miss.
func NewWorkerLease(workerID int, ownerCPInstanceID string, ownerEpoch int64) WorkerLease {
return newWorkerLease(workerID, ownerCPInstanceID, ownerEpoch)
//
// image is the worker image the lease was minted against. It is not
// part of the CAS fence (the lease's owner_cp_instance_id + owner_epoch
// already establish identity) — it is carried so the lifecycle service
// can label transition metrics by image without a separate
// snapshot/read.
func NewWorkerLease(workerID int, ownerCPInstanceID string, ownerEpoch int64, image string) WorkerLease {
return newWorkerLease(workerID, ownerCPInstanceID, ownerEpoch, image)
}
18 changes: 17 additions & 1 deletion controlplane/configstore/lifecycle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ type WorkerLease struct {
workerID int
ownerCPInstanceID string
ownerEpoch int64
image string
}

// newWorkerLease constructs a WorkerLease. Package-private so leases can
// only be minted by store methods that have actually established
// ownership.
func newWorkerLease(workerID int, ownerCPInstanceID string, ownerEpoch int64) WorkerLease {
func newWorkerLease(workerID int, ownerCPInstanceID string, ownerEpoch int64, image string) WorkerLease {
return WorkerLease{
workerID: workerID,
ownerCPInstanceID: ownerCPInstanceID,
ownerEpoch: ownerEpoch,
image: image,
}
}

Expand All @@ -112,6 +114,11 @@ func (l WorkerLease) OwnerCPInstanceID() string { return l.ownerCPInstanceID }
// previous one becomes stale and any CAS attempted with it will miss.
func (l WorkerLease) OwnerEpoch() int64 { return l.ownerEpoch }

// Image returns the worker image the lease was minted against. Surfaced
// so lifecycle-transition metrics can label the operation by image
// without an extra round-trip to read the durable row.
func (l WorkerLease) Image() string { return l.image }

// TransitionOutcomeReason classifies why a lifecycle transition did or
// did not happen. The values are stable and meant for telemetry — PR 6
// will hang per-image metrics off these labels.
Expand Down Expand Up @@ -143,6 +150,15 @@ const (
// because the supposed-orphan's owner CP was no longer expired.
TransitionOutcomeFenceMissCPRevived TransitionOutcomeReason = "fence_miss_cp_revived"

// TransitionOutcomeFenceMissLease indicates a lease-fenced CAS
// (Drain / RetireDrained / MarkLostFromLease) missed without an
// extra read to distinguish state vs. owner_epoch vs. state-
// restriction causes. The label is intentionally generic: lease
// CAS WHERE clauses combine state + owner + epoch, and a single
// boolean rowsAffected can't tell them apart. Tighter labels
// require a follow-up GetWorkerRecord round-trip.
TransitionOutcomeFenceMissLease TransitionOutcomeReason = "fence_miss_lease"

// TransitionOutcomeRowMissing indicates the row could not be found in
// the runtime store (hard-deleted or never created).
TransitionOutcomeRowMissing TransitionOutcomeReason = "row_missing"
Expand Down
13 changes: 7 additions & 6 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,13 @@ type WarmCapacityMissAggregate struct {
Count int64 `json:"count"`
}

// WarmCapacityWorkerStats is the grouped warm-worker state used for
// warm-capacity observability.
type WarmCapacityWorkerStats struct {
Scope string `json:"scope"`
ReadyWorkers int64 `json:"ready_workers"`
SpawningWorkers int64 `json:"spawning_workers"`
// WorkerLifecycleStats is the grouped worker lifecycle state used for
// cluster-wide worker observability.
type WorkerLifecycleStats struct {
Image string `json:"image"`
State WorkerState `json:"state"`
Binding string `json:"binding"`
Count int64 `json:"count"`
}

// WorkerRecord is the durable runtime coordination record for one worker pod.
Expand Down
34 changes: 18 additions & 16 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,26 +859,28 @@ func (cs *ConfigStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, er
return result.RowsAffected, nil
}

// ListWarmCapacityWorkerStats returns grouped cluster-wide warm-worker state
// for image-scoped warm-capacity observability.
func (cs *ConfigStore) ListWarmCapacityWorkerStats() ([]WarmCapacityWorkerStats, error) {
var out []WarmCapacityWorkerStats
// ListWorkerLifecycleStats returns grouped cluster-wide active worker lifecycle
// state by image and tenant binding for Prometheus observability.
func (cs *ConfigStore) ListWorkerLifecycleStats() ([]WorkerLifecycleStats, error) {
const bindingExpr = "CASE WHEN NULLIF(org_id, '') IS NULL THEN 'neutral' ELSE 'org_bound' END"
var out []WorkerLifecycleStats
err := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Select(
"('image:' || image) AS scope, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS ready_workers, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS spawning_workers",
WorkerStateIdle,
WorkerStateSpawning,
).
Where("org_id = ''").
Select("image, state, "+bindingExpr+" AS binding, COUNT(*)::bigint AS count").
Where("image <> ''").
Where("state IN ?", []WorkerState{WorkerStateIdle, WorkerStateSpawning}).
Group("image").
Order("scope ASC").
Where("state IN ?", []WorkerState{
WorkerStateSpawning,
WorkerStateIdle,
WorkerStateReserved,
WorkerStateActivating,
WorkerStateHot,
WorkerStateHotIdle,
WorkerStateDraining,
}).
Group("image, state, " + bindingExpr).
Order("image ASC, state ASC, binding ASC").
Scan(&out).Error
if err != nil {
return nil, fmt.Errorf("list warm capacity worker stats: %w", err)
return nil, fmt.Errorf("list worker lifecycle stats: %w", err)
}
return out, nil
}
Expand Down
18 changes: 0 additions & 18 deletions controlplane/flight_ingress_metrics_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ import (

// --- Per-org metrics (multi-tenant mode) ---

var orgWorkersActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_workers_active",
Help: "Number of active workers per org",
}, []string{"org"})

var orgWorkersIdleGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_workers_idle",
Help: "Number of idle workers per org",
}, []string{"org"})

var orgSessionsActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "duckgres_org_sessions_active",
Help: "Number of active sessions per org",
Expand Down Expand Up @@ -53,14 +43,6 @@ var sniRoutingResolutionsCounter = promauto.NewCounterVec(prometheus.CounterOpts
Help: "SNI hostname prefix resolutions, partitioned by whether a hostname_alias was used",
}, []string{"protocol", "alias_used"})

func observeOrgWorkersActive(org string, count int) {
orgWorkersActiveGauge.WithLabelValues(org).Set(float64(count))
}

func observeOrgWorkersIdle(org string, count int) {
orgWorkersIdleGauge.WithLabelValues(org).Set(float64(count))
}

func observeOrgSessionsActive(org string, count int) {
orgSessionsActiveGauge.WithLabelValues(org).Set(float64(count))
}
Expand Down
12 changes: 9 additions & 3 deletions controlplane/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ControlPlaneJanitor struct {
lifecycle *WorkerLifecycle // every per-worker transition flows through this; nil disables per-worker reaping for that tick.
lifecycleNilWarned sync.Once // one-shot guard so the misconfiguration error doesn't flood at the janitor tick rate.
reconcileWarmCapacity func()
onStop func()
retireMismatchedVersionWorker func() // reaps one warm idle worker whose Deployment version differs from this CP's (leader-only)
cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only)
}
Expand Down Expand Up @@ -70,6 +71,11 @@ func (j *ControlPlaneJanitor) Run(ctx context.Context) {
if j == nil || j.store == nil {
return
}
defer func() {
if j.onStop != nil {
j.onStop()
}
}()

j.runOnce()

Expand Down Expand Up @@ -127,7 +133,7 @@ func (j *ControlPlaneJanitor) runOnce() {
slog.Info("Janitor retiring orphaned workers.", "count", len(orphaned))
}
for _, snap := range orphaned {
if _, err := j.lifecycle.RetireOrphanFromSnapshot(snap, janitorRetireReasonOrphaned); err != nil {
if _, err := j.lifecycle.RetireOrphanFromSnapshot(snap, janitorRetireReasonOrphaned, LifecycleOriginJanitorOrphan); err != nil {
slog.Warn("Janitor failed to retire orphan worker.", "worker_id", snap.WorkerID(), "error", err)
}
}
Expand All @@ -140,7 +146,7 @@ func (j *ControlPlaneJanitor) runOnce() {
slog.Warn("Janitor failed to list stuck workers.", "error", err)
} else {
for _, snap := range stuckWorkers {
if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, janitorRetireReasonStuckActivating); err != nil {
if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, janitorRetireReasonStuckActivating, LifecycleOriginJanitorStuckActivating); err != nil {
slog.Warn("Janitor failed to retire stuck worker.", "worker_id", snap.WorkerID(), "error", err)
}
}
Expand All @@ -153,7 +159,7 @@ func (j *ControlPlaneJanitor) runOnce() {
slog.Warn("Janitor failed to list expired hot-idle workers.", "error", err)
}
for _, snap := range expired {
if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, "hot_idle_ttl_expired"); err != nil {
if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, "hot_idle_ttl_expired", LifecycleOriginJanitorHotIdleTTL); err != nil {
slog.Warn("Janitor failed to retire hot-idle worker.", "worker_id", snap.WorkerID(), "error", err)
}
}
Expand Down
30 changes: 30 additions & 0 deletions controlplane/janitor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controlplane

import (
"context"
"errors"
"sync"
"testing"
Expand Down Expand Up @@ -144,6 +145,35 @@ func TestControlPlaneJanitorRunExpiresStaleInstances(t *testing.T) {
}
}

func TestControlPlaneJanitorRunInvokesOnStop(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
janitor := NewControlPlaneJanitor(store, time.Hour, 20*time.Second)
stopped := make(chan struct{})
janitor.onStop = func() {
close(stopped)
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
janitor.Run(ctx)
close(done)
}()

cancel()

select {
case <-stopped:
case <-time.After(time.Second):
t.Fatal("expected janitor onStop callback")
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("expected janitor run loop to exit")
}
}

func TestControlPlaneJanitorRunPrunesWarmCapacityMissBuckets(t *testing.T) {
store := &captureControlPlaneExpiryStore{}
now := time.Date(2026, time.March, 26, 15, 0, 0, 0, time.UTC)
Expand Down
Loading
Loading