Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ type WarmCapacityMissAggregate struct {
Count int64 `json:"count"`
}

// WarmCapacityWorkerStats is the grouped warm-worker state used for
// warm-capacity observability.
type WarmCapacityWorkerStats struct {
Scope string `json:"scope"`
ReadyWorkers int64 `json:"ready_workers"`
SpawningWorkers int64 `json:"spawning_workers"`
}

// WorkerRecord is the durable runtime coordination record for one worker pod.
type WorkerRecord struct {
WorkerID int `gorm:"primaryKey" json:"worker_id"`
Expand Down
24 changes: 24 additions & 0 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,30 @@ func (cs *ConfigStore) PruneWarmCapacityMissBuckets(before time.Time) (int64, er
return result.RowsAffected, nil
}

// ListWarmCapacityWorkerStats returns grouped cluster-wide warm-worker state
// for image-scoped warm-capacity observability.
func (cs *ConfigStore) ListWarmCapacityWorkerStats() ([]WarmCapacityWorkerStats, error) {
var out []WarmCapacityWorkerStats
err := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Select(
"('image:' || image) AS scope, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS ready_workers, "+
"COALESCE(SUM(CASE WHEN state = ? THEN 1 ELSE 0 END), 0)::bigint AS spawning_workers",
WorkerStateIdle,
WorkerStateSpawning,
).
Where("org_id = ''").
Where("image <> ''").
Where("state IN ?", []WorkerState{WorkerStateIdle, WorkerStateSpawning}).
Group("image").
Order("scope ASC").
Scan(&out).Error
if err != nil {
return nil, fmt.Errorf("list warm capacity worker stats: %w", err)
}
return out, nil
}

// UpsertControlPlaneInstance inserts or updates a runtime control-plane instance row.
func (cs *ConfigStore) UpsertControlPlaneInstance(instance *ControlPlaneInstance) error {
if err := cs.db.Table(cs.runtimeTable(instance.TableName())).Clauses(clause.OnConflict{
Expand Down
29 changes: 21 additions & 8 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,13 +1473,6 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor

func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, reason configstore.WorkerClaimMissReason) {
policy := warmCapacityMissPolicyForReason(reason)
if !policy.recordDynamicDemand {
return
}
if p.runtimeStore == nil {
return
}

image := ""
if assignment != nil {
image = strings.TrimSpace(assignment.Image)
Expand All @@ -1492,6 +1485,13 @@ func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, rea
}

scope := "image:" + image
observeWarmCapacityMiss(scope, policy.reason)
if !policy.recordDynamicDemand {
return
}
if p.runtimeStore == nil {
return
}
if err := p.runtimeStore.RecordWarmCapacityMiss(scope, policy.reason, time.Now()); err != nil {
slog.Warn("Failed to record warm capacity miss.", "scope", scope, "reason", policy.reason, "error", err)
}
Expand Down Expand Up @@ -1969,6 +1969,7 @@ func (p *K8sWorkerPool) SpawnMinWorkersForImage(ctx context.Context, image strin
p.maxWorkers,
)
if err != nil {
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "error", len(slots))
for _, s := range slots {
p.retireClaimedWorker(s, RetireReasonCrash)
}
Expand Down Expand Up @@ -2010,7 +2011,19 @@ func (p *K8sWorkerPool) SpawnMinWorkersForImage(ctx context.Context, image strin
}(i, slot)
}
wg.Wait()
return stderrors.Join(errs...)
err := stderrors.Join(errs...)
successes := 0
failures := 0
for _, spawnErr := range errs {
if spawnErr != nil {
failures++
} else {
successes++
}
}
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "success", successes)
observeWarmCapacityReconcileSpawns(warmCapacityImageScope(image), "error", failures)
return err
}

// HealthCheckLoop periodically checks worker health.
Expand Down
39 changes: 39 additions & 0 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,45 @@ func TestK8sPoolSpawnMinWorkersForImageSpawnsOnlyTheDeficit(t *testing.T) {
}
}

func TestK8sPoolSpawnMinWorkersForImageCountsMixedSpawnResults(t *testing.T) {
pool, _ := newTestK8sPool(t, 5)
image := "duckgres:metrics-mixed"
scope := warmCapacityImageScope(image)
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(scope, "success")
warmCapacityReconcileSpawnsCounter.DeleteLabelValues(scope, "error")

nextID := 100
store := &captureRuntimeWorkerStore{
perImageSpawnedFunc: func(image string) *configstore.WorkerRecord {
nextID++
return &configstore.WorkerRecord{
WorkerID: nextID,
PodName: fmt.Sprintf("duckgres-worker-test-cp-%d", nextID),
State: configstore.WorkerStateSpawning,
OwnerCPInstanceID: pool.cpInstanceID,
Image: image,
}
},
}
pool.runtimeStore = store
pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error {
if id == 102 {
return errors.New("spawn failed")
}
return nil
}

if err := pool.SpawnMinWorkersForImage(context.Background(), image, 2); err == nil {
t.Fatal("expected mixed spawn batch to return an error")
}
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, scope, "success"); got != 1 {
t.Fatalf("expected one successful reconcile spawn, got %v", got)
}
if got := counterLabelValues(warmCapacityReconcileSpawnsCounter, scope, "error"); got != 1 {
t.Fatalf("expected one failed reconcile spawn, got %v", got)
}
}

func TestK8sPoolSpawnMinWorkersForImageNoOpWhenIdleCountAtTarget(t *testing.T) {
pool, _ := newTestK8sPool(t, 5)
store := &captureRuntimeWorkerStore{}
Expand Down
128 changes: 123 additions & 5 deletions controlplane/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log/slog"
"net/http"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -249,13 +250,17 @@ func SetupMultiTenant(
janitor.retireLocalWorker = func(workerID int, reason string) bool {
return router.sharedPool.retireWorkerWithReason(workerID, reason)
}
lastWarmCapacityTargets := map[string]int{}
var lastWarmCapacityRecentMisses []configstore.WarmCapacityMissAggregate
var lastWarmCapacityWorkerStats []configstore.WarmCapacityWorkerStats
lastWarmCapacityGlobalCapBlocked := false
janitor.reconcileWarmCapacity = func() {
snap := store.Snapshot()
if snap == nil {
return
}
baseTargets := router.computeBaseWarmCapacityTargets(snap)
targets, err := computeEffectiveWarmCapacityTargets(
targetSnapshot, err := computeEffectiveWarmCapacityTargetSnapshot(
baseTargets,
store,
cfg.K8s,
Expand All @@ -264,6 +269,25 @@ func SetupMultiTenant(
if err != nil {
slog.Warn("Janitor failed to read dynamic warm-capacity demand; reconciling base warm targets only.", "error", err)
}
observeWarmCapacityRecentMisses(targetSnapshot.RecentMisses, lastWarmCapacityRecentMisses)
lastWarmCapacityRecentMisses = cloneWarmCapacityMissAggregates(targetSnapshot.RecentMisses)
observeWarmCapacityTargets(targetSnapshot.BaseTargets, targetSnapshot.EffectiveTargets, cfg.K8s.MaxWorkers, lastWarmCapacityTargets)
if stats, statsErr := listWarmCapacityWorkerStats(store); statsErr != nil {
slog.Warn("Janitor failed to read warm-capacity worker stats.", "error", statsErr)
} else {
observeWarmCapacityWorkerStats(stats, lastWarmCapacityWorkerStats)
lastWarmCapacityWorkerStats = cloneWarmCapacityWorkerStats(stats)
}
logWarmCapacityTargetChanges(lastWarmCapacityTargets, targetSnapshot.BaseTargets, targetSnapshot.EffectiveTargets)
lastWarmCapacityTargets = cloneWarmCapacityTargets(targetSnapshot.EffectiveTargets)

globalCapBlocked := warmCapacityGlobalCapBlocksDemand(targetSnapshot.BaseTargets, targetSnapshot.EffectiveTargets, targetSnapshot.RecentMisses, cfg.K8s)
if globalCapBlocked && !lastWarmCapacityGlobalCapBlocked {
slog.Info("Global worker cap prevents dynamic warm capacity.", "max_workers", cfg.K8s.MaxWorkers, "base_target_total", sumIntMap(targetSnapshot.BaseTargets), "effective_target_total", sumIntMap(targetSnapshot.EffectiveTargets))
}
lastWarmCapacityGlobalCapBlocked = globalCapBlocked

targets := targetSnapshot.EffectiveTargets
router.sharedPool.SetPerImageWarmTargets(targets)
reconcileWarmCapacityImageTargets(router.sharedPool, targets)
}
Expand Down Expand Up @@ -405,13 +429,32 @@ type warmCapacityMissAggregateLister interface {
ListWarmCapacityMissesSince(since time.Time, reasons ...configstore.WorkerClaimMissReason) ([]configstore.WarmCapacityMissAggregate, error)
}

type warmCapacityWorkerStatsLister interface {
ListWarmCapacityWorkerStats() ([]configstore.WarmCapacityWorkerStats, error)
}

type warmCapacityTargetSnapshot struct {
BaseTargets map[string]int
EffectiveTargets map[string]int
RecentMisses []configstore.WarmCapacityMissAggregate
}

func computeEffectiveWarmCapacityTargets(baseTargets map[string]int, lister warmCapacityMissAggregateLister, cfg K8sConfig, now time.Time) (map[string]int, error) {
snap, err := computeEffectiveWarmCapacityTargetSnapshot(baseTargets, lister, cfg, now)
return snap.EffectiveTargets, err
}

func computeEffectiveWarmCapacityTargetSnapshot(baseTargets map[string]int, lister warmCapacityMissAggregateLister, cfg K8sConfig, now time.Time) (warmCapacityTargetSnapshot, error) {
snap := warmCapacityTargetSnapshot{
BaseTargets: sanitizeWarmCapacityTargets(baseTargets),
EffectiveTargets: sanitizeWarmCapacityTargets(baseTargets),
}
dynamicCfg := dynamicWarmCapacityConfigFromK8s(cfg)
if !dynamicCfg.Enabled {
return sanitizeWarmCapacityTargets(baseTargets), nil
return snap, nil
}
if lister == nil {
return sanitizeWarmCapacityTargets(baseTargets), nil
return snap, nil
}
window := cfg.WarmCapacityMissWindow
if window <= 0 {
Expand All @@ -422,9 +465,84 @@ func computeEffectiveWarmCapacityTargets(baseTargets map[string]int, lister warm
}
aggregates, err := lister.ListWarmCapacityMissesSince(now.Add(-window), configstore.WorkerClaimMissReasonNoIdle)
if err != nil {
return sanitizeWarmCapacityTargets(baseTargets), err
return snap, err
}
snap.RecentMisses = aggregates
snap.EffectiveTargets = computeDynamicWarmCapacityTargets(snap.BaseTargets, aggregates, dynamicCfg)
return snap, nil
}

func listWarmCapacityWorkerStats(lister warmCapacityWorkerStatsLister) ([]configstore.WarmCapacityWorkerStats, error) {
if lister == nil {
return nil, nil
}
return lister.ListWarmCapacityWorkerStats()
}

func logWarmCapacityTargetChanges(previous, baseTargets, effectiveTargets map[string]int) {
for _, image := range warmCapacityTargetScopes(previous, baseTargets, effectiveTargets) {
effective := positiveMapValue(effectiveTargets, image)
if positiveMapValue(previous, image) == effective {
continue
}
base := positiveMapValue(baseTargets, image)
demand := effective - base
if demand < 0 {
demand = 0
}
slog.Info("Warm capacity target changed.",
"scope", warmCapacityImageScope(image),
"base_target", base,
"demand_target", demand,
"effective_target", effective,
)
}
}

func cloneWarmCapacityTargets(targets map[string]int) map[string]int {
out := make(map[string]int, len(targets))
for image, target := range targets {
if strings.TrimSpace(image) == "" || target <= 0 {
continue
}
out[image] = target
}
return out
}

func cloneWarmCapacityMissAggregates(aggregates []configstore.WarmCapacityMissAggregate) []configstore.WarmCapacityMissAggregate {
if len(aggregates) == 0 {
return nil
}
out := make([]configstore.WarmCapacityMissAggregate, len(aggregates))
copy(out, aggregates)
return out
}

func cloneWarmCapacityWorkerStats(stats []configstore.WarmCapacityWorkerStats) []configstore.WarmCapacityWorkerStats {
if len(stats) == 0 {
return nil
}
out := make([]configstore.WarmCapacityWorkerStats, len(stats))
copy(out, stats)
return out
}

func warmCapacityGlobalCapBlocksDemand(baseTargets, effectiveTargets map[string]int, aggregates []configstore.WarmCapacityMissAggregate, cfg K8sConfig) bool {
if cfg.MaxWorkers <= 0 || len(aggregates) == 0 {
return false
}
effectiveTotal := sumIntMap(effectiveTargets)
if effectiveTotal < cfg.MaxWorkers {
return false
}
dynamicCfg := dynamicWarmCapacityConfigFromK8s(cfg)
if !dynamicCfg.Enabled {
return false
}
return computeDynamicWarmCapacityTargets(baseTargets, aggregates, dynamicCfg), nil
dynamicCfg.MaxWorkers = 0
uncappedTargets := computeDynamicWarmCapacityTargets(baseTargets, aggregates, dynamicCfg)
return sumIntMap(uncappedTargets) > effectiveTotal
}

func reconcileWarmCapacityImageTargets(pool *K8sWorkerPool, targets map[string]int) {
Expand Down
35 changes: 35 additions & 0 deletions controlplane/multitenant_warm_capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,41 @@ func TestComputeEffectiveWarmCapacityTargetsFallsBackToBaseOnReadError(t *testin
}
}

func TestWarmCapacityGlobalCapBlocksDemandDetectsPartialHeadroom(t *testing.T) {
base := map[string]int{"posthog/duckgres:default": 2}
aggregates := []configstore.WarmCapacityMissAggregate{
warmCapacityMissAggregate("image:posthog/duckgres:default", configstore.WorkerClaimMissReasonNoIdle, 32),
}
cfg := K8sConfig{
DynamicWarmCapacityEnabled: true,
WarmCapacityMissesPerWorker: 8,
MaxWorkers: 3,
}
effective := computeDynamicWarmCapacityTargets(base, aggregates, dynamicWarmCapacityConfigFromK8s(cfg))

if !warmCapacityGlobalCapBlocksDemand(base, effective, aggregates, cfg) {
t.Fatal("expected global cap block when one free slot cannot satisfy all dynamic demand")
}
}

func TestWarmCapacityGlobalCapBlocksDemandIgnoresDynamicCeiling(t *testing.T) {
base := map[string]int{"posthog/duckgres:default": 2}
aggregates := []configstore.WarmCapacityMissAggregate{
warmCapacityMissAggregate("image:posthog/duckgres:default", configstore.WorkerClaimMissReasonNoIdle, 32),
}
cfg := K8sConfig{
DynamicWarmCapacityEnabled: true,
WarmCapacityMissesPerWorker: 8,
WarmCapacityDynamicTotalCeiling: 1,
MaxWorkers: 10,
}
effective := computeDynamicWarmCapacityTargets(base, aggregates, dynamicWarmCapacityConfigFromK8s(cfg))

if warmCapacityGlobalCapBlocksDemand(base, effective, aggregates, cfg) {
t.Fatal("did not expect global cap block when only the dynamic total ceiling limits demand")
}
}

func TestReconcileWarmCapacityImageTargetsSpawnsPerImageTargets(t *testing.T) {
pool, _ := newTestK8sPool(t, 0)
store := &captureRuntimeWorkerStore{}
Expand Down
Loading
Loading