Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions tests/k8s/control_plane_idle_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//go:build k8s_integration

package k8s_test

import (
"context"
"fmt"
"sort"
"strings"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// waitForControlPlaneIdle blocks until the control plane's worker pool
// looks quiescent: every worker pod is Running+Ready with no
// DeletionTimestamp set, and the set of worker pod names has been
// stable for `stableTicks` consecutive observations.
//
// This is the test-isolation primitive: tests that delete or otherwise
// disturb a worker pod should call this at the end (typically via
// t.Cleanup) so the next test doesn't start while the control plane is
// still mid-housekeeping. Without it, trailing pod create/delete
// activity from warm-pool replenishment and post-retire cleanup keeps
// the apiserver busy enough that the test's port-forward can drop at
// arbitrary moments, causing the next test's retry layer to re-issue
// in-flight queries — which manifests as "duplicate row" or other
// at-least-once artefacts in tests that don't assume idempotent ops.
//
// The signals here are K8s-API-only: pod phase, the Ready condition,
// DeletionTimestamp, and the name set across ticks. The runtime-store
// transitions (spawning, activating, draining) are not directly
// observed, but every CP-side state change that has visible cluster
// side effects (and thus drives apiserver load) does show up here,
// because the CP cannot move a worker row to a stable state (idle,
// hot, hot_idle) without first making K8s API calls.
//
// `stableTicks` defaults to 3 and `tickInterval` to 1s — three
// consecutive identical observations a second apart is the smallest
// window that reliably filters out the "create pod, immediately mark
// retired, delete pod" churn pattern we see during warm-pool
// reconciliation tick storms.
func waitForControlPlaneIdle(t *testing.T, timeout time.Duration) {
t.Helper()
const (
stableTicks = 3
tickInterval = 1 * time.Second
)
deadline := time.Now().Add(timeout)
lastSnapshot := ""
lastDetail := ""
stable := 0
for time.Now().Before(deadline) {
pods, err := clientset.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: "app=duckgres-worker",
})
if err != nil {
// Transient API errors during quiescence checks shouldn't
// fail the test outright — log and retry. A persistent
// failure will hit the outer deadline.
t.Logf("waitForControlPlaneIdle: list worker pods: %v", err)
time.Sleep(tickInterval)
continue
}
snapshot, detail, allHealthy := summarizeWorkerPodsForIdleness(pods.Items)
if allHealthy && snapshot == lastSnapshot {
stable++
if stable >= stableTicks {
return
}
} else {
stable = 0
lastSnapshot = snapshot
}
lastDetail = detail
time.Sleep(tickInterval)
}
t.Fatalf("waitForControlPlaneIdle: control plane did not quiesce within %s; last observation: %s", timeout, lastDetail)
}

// summarizeWorkerPodsForIdleness reduces a worker-pod list to a stable
// comparison key plus a human-readable detail string. allHealthy is
// false the moment any pod is in a transitional state (deleting, not
// running, or not ready).
func summarizeWorkerPodsForIdleness(pods []corev1.Pod) (snapshot, detail string, allHealthy bool) {
type podState struct {
name string
phase corev1.PodPhase
ready bool
deleting bool
}
states := make([]podState, 0, len(pods))
allHealthy = true
for i := range pods {
pod := &pods[i]
s := podState{name: pod.Name, phase: pod.Status.Phase}
if pod.DeletionTimestamp != nil {
s.deleting = true
allHealthy = false
}
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
s.ready = true
}
}
if pod.Status.Phase != corev1.PodRunning || !s.ready {
allHealthy = false
}
states = append(states, s)
}
sort.Slice(states, func(i, j int) bool { return states[i].name < states[j].name })
names := make([]string, len(states))
details := make([]string, len(states))
for i, s := range states {
names[i] = s.name
flag := "ok"
switch {
case s.deleting:
flag = "deleting"
case !s.ready:
flag = fmt.Sprintf("phase=%s ready=false", s.phase)
case s.phase != corev1.PodRunning:
flag = fmt.Sprintf("phase=%s", s.phase)
}
details[i] = fmt.Sprintf("%s(%s)", s.name, flag)
}
return strings.Join(names, ","), strings.Join(details, " "), allHealthy
}
35 changes: 17 additions & 18 deletions tests/k8s/ducklake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,29 +116,28 @@ func TestK8sDuckLakeDurabilityAcrossWorkerRestart(t *testing.T) {
if count != rows {
t.Fatalf("post-restart row count = %d, want %d (data did not survive worker restart)", count, rows)
}

// Wait for the CP to finish housekeeping the killed worker (cleanup
// of the orphan pod/secret, warm-pool replenishment activation)
// before returning. Without this, the trailing apiserver activity
// drops the next test's port-forward at an arbitrary point and the
// retry layer can re-issue in-flight queries.
waitForControlPlaneIdle(t, 60*time.Second)
}

// TestK8sDuckLakeConcurrentWriters exercises the PostHog DuckLake fork's
// conflict-retry path in the real k8s setup: N goroutines, each on its
// own connection, INSERT into the same table. With the fork's retry
// semantics every commit should eventually land; without retries (or with
// a regression that suppresses retries on certain SQLSTATEs) the test
// would either fail with a conflict error or end up with fewer (writer,
// id) tuples than expected.
// would either fail with a conflict error or end up with fewer rows than
// expected.
//
// We deliberately don't assert no-conflicts-occurred: that's the wrong
// invariant. The invariant is no-rows-lost — conflicts are fine as long
// as the retry layer makes every writer eventually succeed.
//
// We assert on COUNT(DISTINCT (writer, id)) rather than COUNT(*) so the
// test is resilient to the at-least-once retry semantics in
// retryDBOperationWithReconnect: that helper retries on any error,
// including post-commit connection drops where the server already
// applied the INSERT. A re-INSERT under those conditions produces an
// exact-duplicate (writer, id) tuple, so the distinct count is still
// 100 even when the raw row count is higher. The load-bearing
// property is "every (writer, id) is present at least once," not
// "exactly one copy of every row."
// as the retry layer makes every writer eventually succeed, and we
// assert on COUNT(*) so a regression that produces duplicate rows is
// caught here rather than silently masked.
func TestK8sDuckLakeConcurrentWriters(t *testing.T) {
tableName := fmt.Sprintf("ducklake.dl_concurrent_%d", time.Now().UnixNano())
const writers = 4
Expand Down Expand Up @@ -190,12 +189,12 @@ func TestK8sDuckLakeConcurrentWriters(t *testing.T) {
return
}

var distinct int
if err := retryScanIntWithReconnect("SELECT COUNT(*) FROM (SELECT DISTINCT writer, id FROM "+tableName+")", 30*time.Second, &distinct); err != nil {
t.Fatalf("count distinct concurrent tuples: %v", err)
var count int
if err := retryScanIntWithReconnect("SELECT COUNT(*) FROM "+tableName, 30*time.Second, &count); err != nil {
t.Fatalf("count concurrent rows: %v", err)
}
want := writers * rowsPerWriter
if distinct != want {
t.Fatalf("concurrent distinct (writer, id) tuples = %d, want %d — DuckLake fork's conflict retry did not preserve all writes", distinct, want)
if count != want {
t.Fatalf("concurrent row count = %d, want %d — DuckLake fork's conflict retry did not preserve all writes", count, want)
}
}
5 changes: 5 additions & 0 deletions tests/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ func TestK8sWorkerCrashRecovery(t *testing.T) {
if err := retryQueryWithReconnect("SELECT 1", 60*time.Second); err != nil {
t.Fatalf("query failed after worker crash recovery: %v", err)
}

// Wait for the CP to finish housekeeping the deleted worker so the
// next test doesn't run while the apiserver is still busy with
// replenishment/cleanup churn.
waitForControlPlaneIdle(t, 60*time.Second)
}

func TestK8sMultipleConcurrentConnections(t *testing.T) {
Expand Down
Loading