Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,67 @@ func (p *K8sWorkerPool) cleanupOrphanedWorkerPods(ctx context.Context, minAge ti
return deleted
}

// cleanupOrphanedWorkerSecrets deletes worker RPC secrets whose pod
// no longer exists. Closes the recovery gap where a CP crashes
// between secret creation and pod creation — the secret would
// otherwise leak indefinitely because cleanupOrphanedWorkerPods
// only iterates pods. Runs from the janitor leader on the same
// tick as cleanupOrphanedWorkerPods.
//
// minAge protects newly-created secrets the spawner is still using
// (the spawn flow creates the secret first, then the pod).
//
// Returns the number of secrets deleted this call.
func (p *K8sWorkerPool) cleanupOrphanedWorkerSecrets(ctx context.Context, minAge time.Duration) int {
if p.clientset == nil {
return 0
}
// Worker RPC secrets are labeled at creation (worker_rpc_security.go
// ensureWorkerRPCSecret) with app=duckgres +
// duckgres/control-plane=<p.cpID> + duckgres/worker-pod=<podName>.
// The selector narrows by control-plane so each CP only reaps
// its own secrets — critical during rolling restarts and blue/green
// deployments where multiple CP replicas share the namespace and
// a peer's freshly-created secret (whose pod hasn't been spawned
// yet) must not be reaped. p.cpID is used unsanitized to match
// the value the creation path stamps.
secrets, err := p.clientset.CoreV1().Secrets(p.namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=duckgres,duckgres/control-plane=%s,duckgres/worker-pod", p.cpID),
})
if err != nil {
slog.Warn("Stranded-secret reconciler failed to list worker RPC secrets.", "error", err)
return 0
}
cutoff := time.Now().Add(-minAge)
deleted := 0
for i := range secrets.Items {
secret := &secrets.Items[i]
if secret.CreationTimestamp.Time.After(cutoff) {
continue
}
podName := secret.Labels["duckgres/worker-pod"]
if podName == "" {
continue
}
if _, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, podName, metav1.GetOptions{}); err == nil {
// Pod exists — leave the secret alone, the regular
// cleanupOrphanedWorkerPods path will reap both
// together when the pod's DB row is terminal.
continue
} else if !errors.IsNotFound(err) {
slog.Warn("Stranded-secret reconciler failed to load worker pod.", "worker_pod", podName, "error", err)
continue
}
if err := p.clientset.CoreV1().Secrets(p.namespace).Delete(ctx, secret.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
slog.Warn("Stranded-secret reconciler failed to delete secret.", "secret", secret.Name, "worker_pod", podName, "error", err)
continue
}
slog.Info("Stranded worker RPC secret reconciled.", "secret", secret.Name, "worker_pod", podName)
deleted++
}
return deleted
}

func (p *K8sWorkerPool) resolveCPUID(ctx context.Context) error {
pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.cpID, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -2914,13 +2975,26 @@ func (p *K8sWorkerPool) markWorkerLostForHealthLease(lease workerLeaseSnapshot)
}

func (p *K8sWorkerPool) markWorkerLostIfCurrentLease(lease workerLeaseSnapshot) (bool, error) {
if p.runtimeStore == nil {
lc := p.ensureLifecycle()
if lc == nil {
// No durable store wired (process backend / minimal test pool).
// The health-check caller treats true here as "CAS landed";
// for the no-store case we let it proceed to its in-memory
// cleanup path, which is the same behavior the old direct-
// store call had when runtimeStore was nil.
return true, nil
}
if lease.ownerCPInstanceID != p.cpInstanceID {
return false, nil
}
return p.runtimeStore.MarkWorkerLostIfCurrentLease(lease.workerID, p.cpInstanceID, lease.ownerEpoch, RetireReasonCrash)
outcome, err := lc.MarkLostFromLease(
configstore.NewWorkerLease(lease.workerID, p.cpInstanceID, lease.ownerEpoch),
RetireReasonCrash,
)
if err != nil {
return false, err
}
return outcome.Transitioned, nil
}

func (p *K8sWorkerPool) removeWorkerAfterLostLeaseLocked(lease workerLeaseSnapshot) (*ManagedWorker, int, int, bool) {
Expand Down
168 changes: 168 additions & 0 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4308,6 +4308,174 @@ func TestCleanupOrphanedWorkerPods_IgnoresNonWorkerPods(t *testing.T) {
}
}

func TestCleanupOrphanedWorkerSecrets_DeletesSecretsWithoutPods(t *testing.T) {
// A CP that crashed between secret creation and pod creation
// leaves a leaked secret. The pod-only reaper doesn't see it
// (it iterates pods); the sibling secret reaper picks it up.
store := &captureRuntimeWorkerStore{}
pool, cs := strandedReconcilerPool(t, store)
createdAt := metav1.NewTime(time.Now().Add(-time.Hour))
for _, podName := range []string{"duckgres-worker-leak-1", "duckgres-worker-leak-2"} {
_, err := cs.CoreV1().Secrets("default").Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pool.workerRPCSecretName(podName),
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{
"app": "duckgres",
"duckgres/control-plane": pool.cpID,
"duckgres/worker-pod": podName,
},
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("create orphan secret for %s: %v", podName, err)
}
}

deleted := pool.cleanupOrphanedWorkerSecrets(context.Background(), 2*time.Minute)
if deleted != 2 {
t.Fatalf("expected both orphan secrets deleted, got %d", deleted)
}
for _, podName := range []string{"duckgres-worker-leak-1", "duckgres-worker-leak-2"} {
_, err := cs.CoreV1().Secrets("default").Get(context.Background(), pool.workerRPCSecretName(podName), metav1.GetOptions{})
if err == nil {
t.Fatalf("expected secret for %s to be deleted", podName)
}
}
}

func TestCleanupOrphanedWorkerSecrets_KeepsSecretsForLivePods(t *testing.T) {
// A worker pod that's alive must keep its secret. The pod-cleanup
// path is responsible for reaping both together when the pod's DB
// row goes terminal; the secret-only reaper must not race that.
store := &captureRuntimeWorkerStore{}
pool, cs := strandedReconcilerPool(t, store)
podName := "duckgres-worker-alive"
createdAt := metav1.NewTime(time.Now().Add(-time.Hour))
if _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{"app": "duckgres-worker", "duckgres/worker-id": "100"},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create pod: %v", err)
}
if _, err := cs.CoreV1().Secrets("default").Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pool.workerRPCSecretName(podName),
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{
"app": "duckgres",
"duckgres/control-plane": pool.cpID,
"duckgres/worker-pod": podName,
},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create secret: %v", err)
}

if deleted := pool.cleanupOrphanedWorkerSecrets(context.Background(), 2*time.Minute); deleted != 0 {
t.Fatalf("expected zero deletions for live-pod secret, got %d", deleted)
}
if _, err := cs.CoreV1().Secrets("default").Get(context.Background(), pool.workerRPCSecretName(podName), metav1.GetOptions{}); err != nil {
t.Fatalf("expected secret to survive: %v", err)
}
}

func TestCleanupOrphanedWorkerSecrets_SkipsEmptyPodLabel(t *testing.T) {
// The selector "duckgres/worker-pod" matches any secret with that
// label key (including empty value). Guard against deleting
// secrets whose worker-pod label is malformed — we can't look up
// a pod with no name. The reaper continues past such secrets
// rather than attempting an empty-name Get.
store := &captureRuntimeWorkerStore{}
pool, cs := strandedReconcilerPool(t, store)
createdAt := metav1.NewTime(time.Now().Add(-time.Hour))
if _, err := cs.CoreV1().Secrets("default").Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "weird-secret-with-empty-worker-pod",
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{
"app": "duckgres",
"duckgres/control-plane": pool.cpID,
"duckgres/worker-pod": "",
},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create secret: %v", err)
}

if deleted := pool.cleanupOrphanedWorkerSecrets(context.Background(), 2*time.Minute); deleted != 0 {
t.Fatalf("expected reaper to skip secret with empty worker-pod label, got %d deletions", deleted)
}
if _, err := cs.CoreV1().Secrets("default").Get(context.Background(), "weird-secret-with-empty-worker-pod", metav1.GetOptions{}); err != nil {
t.Fatalf("expected malformed secret to survive: %v", err)
}
}

func TestCleanupOrphanedWorkerSecrets_OnlyReapsOwnControlPlane(t *testing.T) {
// Multi-CP namespace: each CP must only reap secrets it created.
// A peer CP's freshly-orphaned-looking secret (no matching pod
// from this CP's view, but actually still managed by the peer)
// must NOT be deleted.
store := &captureRuntimeWorkerStore{}
pool, cs := strandedReconcilerPool(t, store)
createdAt := metav1.NewTime(time.Now().Add(-time.Hour))
if _, err := cs.CoreV1().Secrets("default").Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "peer-cp-secret",
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{
"app": "duckgres",
"duckgres/control-plane": "some-other-cp",
"duckgres/worker-pod": "peer-worker",
},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create peer-cp secret: %v", err)
}

if deleted := pool.cleanupOrphanedWorkerSecrets(context.Background(), 2*time.Minute); deleted != 0 {
t.Fatalf("expected zero deletions of peer-CP secrets, got %d", deleted)
}
if _, err := cs.CoreV1().Secrets("default").Get(context.Background(), "peer-cp-secret", metav1.GetOptions{}); err != nil {
t.Fatalf("expected peer-CP secret to survive: %v", err)
}
}

func TestCleanupOrphanedWorkerSecrets_RespectsMinAge(t *testing.T) {
// minAge protects newly-created secrets that the spawn flow is
// still using (createSecret completed but createPod hasn't yet).
store := &captureRuntimeWorkerStore{}
pool, cs := strandedReconcilerPool(t, store)
podName := "duckgres-worker-fresh"
createdAt := metav1.NewTime(time.Now()) // Now — younger than 2m minAge.
if _, err := cs.CoreV1().Secrets("default").Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pool.workerRPCSecretName(podName),
Namespace: "default",
CreationTimestamp: createdAt,
Labels: map[string]string{
"app": "duckgres",
"duckgres/control-plane": pool.cpID,
"duckgres/worker-pod": podName,
},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create secret: %v", err)
}

if deleted := pool.cleanupOrphanedWorkerSecrets(context.Background(), 2*time.Minute); deleted != 0 {
t.Fatalf("expected fresh secret to survive minAge gate, got %d deletions", deleted)
}
}

// --- ShutdownAll draining-chain tests ---
//
// ShutdownAll is called when the CP pod receives SIGTERM from Kubernetes. The
Expand Down
19 changes: 16 additions & 3 deletions controlplane/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,24 @@ func SetupMultiTenant(
router.sharedPool.RetireOneMismatchedVersionWorker(ctx)
}
janitor.cleanupOrphanedWorkerPods = func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if n := router.sharedPool.cleanupOrphanedWorkerPods(ctx, 2*time.Minute); n > 0 {
// Pods and secrets each get their own 30s deadline so a slow
// pod-list (large namespace) can't starve the secret reaper
// behind it, and vice versa.
podCtx, podCancel := context.WithTimeout(context.Background(), 30*time.Second)
if n := router.sharedPool.cleanupOrphanedWorkerPods(podCtx, 2*time.Minute); n > 0 {
slog.Info("Stranded worker pods reconciled.", "count", n)
}
podCancel()

// Sibling reconciler that catches a secret created without a
// pod (spawn crashed between createSecret and createPod). The
// pod-cleanup loop above only iterates pods, so this is the
// only place that reclaims those orphans.
secretCtx, secretCancel := context.WithTimeout(context.Background(), 30*time.Second)
if n := router.sharedPool.cleanupOrphanedWorkerSecrets(secretCtx, 2*time.Minute); n > 0 {
slog.Info("Stranded worker RPC secrets reconciled.", "count", n)
}
secretCancel()
}

// Scheduler-side activator: a single SharedWorkerActivator instance
Expand Down
28 changes: 20 additions & 8 deletions controlplane/shared_worker_activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,30 @@ func (a *SharedWorkerActivator) RefreshCredentials(ctx context.Context, worker *
if a.lifecycle == nil {
return fmt.Errorf("refresh worker credentials requires a lifecycle service (worker %d)", worker.ID)
}
currentEpoch := worker.OwnerEpoch()
cpInstanceID := worker.OwnerCPInstanceID()
newLease, err := a.lifecycle.RefreshLease(configstore.NewWorkerLease(worker.ID, cpInstanceID, currentEpoch))
if err != nil {
// RefreshOwnerEpochAtomic holds the worker's epoch lock across the
// durable RefreshLease CAS. Without this, a concurrent
// ShutdownAll's OwnerEpoch() read could land between the durable
// bump and the in-memory SetOwnerEpoch, building a stale lease
// that CAS-misses (leaving the row in draining for the orphan
// sweep to reconcile later). The lock is held during the DB
// round-trip; that's the right trade-off — the round-trip is
// O(10ms) and ShutdownAll iterates workers serially anyway.
var newEpoch int64
if err := worker.RefreshOwnerEpochAtomic(func(currentEpoch int64) (int64, error) {
newLease, err := a.lifecycle.RefreshLease(configstore.NewWorkerLease(worker.ID, cpInstanceID, currentEpoch))
if err != nil {
return 0, err
}
newEpoch = newLease.OwnerEpoch()
return newEpoch, nil
}); err != nil {
// Keep the legacy "bump owner epoch for refresh" wrapper wording
// even though the lifecycle method is named RefreshLease, so
// log-grep / dashboard filters that pattern-match on the
// previous string still work.
// so log-grep / dashboard filters that pattern-match on the
// previous string still work. ErrWorkerOwnerEpochMismatch is
// preserved via %w.
return fmt.Errorf("bump owner epoch for refresh: %w", err)
}
newEpoch := newLease.OwnerEpoch()
worker.SetOwnerEpoch(newEpoch)

rpcPayload := server.WorkerActivationPayload{
WorkerControlMetadata: server.WorkerControlMetadata{
Expand Down
28 changes: 14 additions & 14 deletions controlplane/worker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ func (l *WorkerLifecycle) RetireIdleVariantFromSnapshot(snap configstore.WorkerS
}, nil
}

// MarkLostFromLease marks a lease-owned worker as lost. Intended for
// the health-checker path once it migrates off the direct
// MarkWorkerLostIfCurrentLease + retireWorkerPod choreography in
// markWorkerLostForHealthLease — that migration is deliberately not in
// PR 3 because the existing flow's separate "remove from local pool +
// pick a replacement" step needs threading through the lifecycle's
// PhysicalCleanup before this method can replace it cleanly.
// Triggers pod cleanup on success.
func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, podName, reason string) (configstore.TransitionOutcome, error) {
// MarkLostFromLease performs the lease-fenced CAS that transitions a
// worker row to lost. Used by the health-checker after it has
// confirmed the worker is unresponsive. Does NOT schedule pod
// cleanup — consistent with the other lease-based transitions
// (Drain, RetireDrained, RefreshLease), the caller orchestrates
// physical cleanup so it can interleave replenishment decisions,
// in-memory pool removal, and pod delete in the right order. (The
// snapshot-based variants RetireFromSnapshot/RetireOrphanFromSnapshot/
// RetireIdleVariantFromSnapshot do bundle cleanup because their
// callers don't have post-CAS choreography.)
func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, reason string) (configstore.TransitionOutcome, error) {
if l == nil {
return configstore.TransitionOutcome{Reason: configstore.TransitionOutcomeStoreError}, errors.New("worker lifecycle service not configured")
}
Expand All @@ -196,15 +198,13 @@ func (l *WorkerLifecycle) MarkLostFromLease(lease configstore.WorkerLease, podNa
return configstore.TransitionOutcome{Reason: configstore.TransitionOutcomeStoreError}, err
}
if !transitioned {
slog.Debug("Lifecycle mark-lost CAS missed; pod cleanup skipped.",
slog.Debug("Lifecycle mark-lost CAS missed.",
"worker_id", lease.WorkerID(), "reason", reason)
return configstore.TransitionOutcome{Reason: configstore.TransitionOutcomeFenceMissOwner}, nil
}
l.scheduleCleanup(lease.WorkerID(), podName, reason)
return configstore.TransitionOutcome{
Transitioned: true,
PhysicalCleanupScheduled: l.cleanup != nil,
Reason: configstore.TransitionOutcomeTransitioned,
Transitioned: true,
Reason: configstore.TransitionOutcomeTransitioned,
}, nil
}

Expand Down
Loading
Loading