Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ Run with config file:
| `DUCKGRES_MANAGED_HOSTNAME_SUFFIXES` | Comma-separated managed hostname suffixes such as `.dw.us.postwh.com` | - |
| `DUCKGRES_K8S_MAX_WORKERS` | Global cap for shared K8s workers (`0` means Duckgres does not impose a cap) | `0` |
| `DUCKGRES_K8S_SHARED_WARM_TARGET` | Default-image neutral shared warm-worker target for K8s multi-tenant mode (`0` disables prewarm; subject to `DUCKGRES_K8S_MAX_WORKERS`) | `0` |
| `DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED` | Enable configstore-driven dynamic warm-capacity target computation from recent no-idle misses | `true` |
| `DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW` | Recent no-idle miss window used for dynamic warm-capacity demand | `2m` |
| `DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER` | Recent misses required for one extra dynamic warm worker | `8` |
| `DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL` | Retention TTL for warm-capacity miss buckets; clamped to at least the miss window | `15m` |
| `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING` | Max dynamic extra warm workers per image (`0` means unlimited) | `0` |
| `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING` | Max dynamic extra warm workers across images (`0` means unlimited) | `0` |
| `DUCKGRES_DUCKLAKE_METADATA_STORE` | DuckLake metadata connection string | - |
| `DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED` | Attach a Delta Lake catalog/table during worker boot/activation | `false` |
| `DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH` | Delta Lake catalog/table path; defaults to sibling `delta/` prefix at the DuckLake object-store root when enabled | Derived |
Expand Down Expand Up @@ -283,6 +289,21 @@ Options:
-sni-routing-mode string Hostname routing: off, passthrough, or enforce
-managed-hostname-suffixes string
Comma-separated managed tenant hostname suffixes
-k8s-max-workers int Max K8s workers in the shared pool, 0=unbounded
-k8s-shared-warm-target int
Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled
-k8s-dynamic-warm-capacity-enabled
Enable configstore-driven dynamic warm-capacity target computation (default true)
-k8s-warm-capacity-miss-window string
Recent no-idle miss window for dynamic warm-capacity demand (default 2m)
-k8s-warm-capacity-misses-per-worker int
Recent misses required for one extra dynamic warm worker (default 8)
-k8s-warm-capacity-demand-ttl string
Retention TTL for warm-capacity miss buckets (default 15m)
-k8s-warm-capacity-dynamic-image-ceiling int
Max dynamic extra warm workers per image, 0=unlimited
-k8s-warm-capacity-dynamic-total-ceiling int
Max dynamic extra warm workers across images, 0=unlimited
```

## DuckDB Extensions
Expand Down
12 changes: 12 additions & 0 deletions configresolve/cliflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
k8sWorkerServiceAccount := fs.String("k8s-worker-service-account", "", "Neutral ServiceAccount name for K8s worker pods (default: duckgres-worker) (env: DUCKGRES_K8S_WORKER_SERVICE_ACCOUNT)")
k8sMaxWorkers := fs.Int("k8s-max-workers", 0, "Max K8s workers in the shared pool, 0=unbounded (env: DUCKGRES_K8S_MAX_WORKERS)")
k8sSharedWarmTarget := fs.Int("k8s-shared-warm-target", 0, "Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled (env: DUCKGRES_K8S_SHARED_WARM_TARGET)")
k8sDynamicWarmCapacityEnabled := fs.Bool("k8s-dynamic-warm-capacity-enabled", true, "Enable configstore-driven dynamic warm-capacity target computation (default true; use --k8s-dynamic-warm-capacity-enabled=false to disable; env: DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED)")
k8sWarmCapacityMissWindow := fs.String("k8s-warm-capacity-miss-window", "", "Recent no-idle miss window for dynamic warm-capacity demand (default: 2m) (env: DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW)")
k8sWarmCapacityMissesPerWorker := fs.Int("k8s-warm-capacity-misses-per-worker", 0, "Recent misses required for one extra dynamic warm worker (default: 8) (env: DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER)")
k8sWarmCapacityDemandTTL := fs.String("k8s-warm-capacity-demand-ttl", "", "Retention TTL for warm-capacity miss buckets (default: 15m) (env: DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL)")
k8sWarmCapacityDynamicImageCeiling := fs.Int("k8s-warm-capacity-dynamic-image-ceiling", 0, "Max dynamic extra warm workers per image, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING)")
k8sWarmCapacityDynamicTotalCeiling := fs.Int("k8s-warm-capacity-dynamic-total-ceiling", 0, "Max dynamic extra warm workers across images, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING)")
awsRegion := fs.String("aws-region", "", "AWS region for STS client (env: DUCKGRES_AWS_REGION)")
queryLog := fs.Bool("query-log", true, "Enable/disable DuckLake query log (use --query-log=false to disable; env: DUCKGRES_QUERY_LOG_ENABLED)")

Expand Down Expand Up @@ -136,6 +142,12 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
cli.K8sWorkerServiceAccount = *k8sWorkerServiceAccount
cli.K8sMaxWorkers = *k8sMaxWorkers
cli.K8sSharedWarmTarget = *k8sSharedWarmTarget
cli.K8sDynamicWarmCapacityEnabled = *k8sDynamicWarmCapacityEnabled
cli.K8sWarmCapacityMissWindow = *k8sWarmCapacityMissWindow
cli.K8sWarmCapacityMissesPerWorker = *k8sWarmCapacityMissesPerWorker
cli.K8sWarmCapacityDemandTTL = *k8sWarmCapacityDemandTTL
cli.K8sWarmCapacityDynamicImageCeiling = *k8sWarmCapacityDynamicImageCeiling
cli.K8sWarmCapacityDynamicTotalCeiling = *k8sWarmCapacityDynamicTotalCeiling
cli.AWSRegion = *awsRegion
cli.QueryLog = *queryLog
return cli
Expand Down
6 changes: 6 additions & 0 deletions configresolve/cliflags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func fieldNameToFlagName(name string) string {
{"K8sWorkerNamespace", "k8s-worker-namespace"},
{"K8sWorkerConfigMap", "k8s-worker-configmap"},
{"K8sSharedWarmTarget", "k8s-shared-warm-target"},
{"K8sDynamicWarmCapacityEnabled", "k8s-dynamic-warm-capacity-enabled"},
{"K8sWarmCapacityMissWindow", "k8s-warm-capacity-miss-window"},
{"K8sWarmCapacityMissesPerWorker", "k8s-warm-capacity-misses-per-worker"},
{"K8sWarmCapacityDemandTTL", "k8s-warm-capacity-demand-ttl"},
{"K8sWarmCapacityDynamicImageCeiling", "k8s-warm-capacity-dynamic-image-ceiling"},
{"K8sWarmCapacityDynamicTotalCeiling", "k8s-warm-capacity-dynamic-total-ceiling"},
{"K8sWorkerImage", "k8s-worker-image"},
{"K8sWorkerSecret", "k8s-worker-secret"},
{"K8sWorkerPort", "k8s-worker-port"},
Expand Down
156 changes: 94 additions & 62 deletions configresolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,67 +27,73 @@ import (
type CLIInputs struct {
Set map[string]bool

Host string
Port int
FlightPort int
FlightSessionIdleTTL string
FlightSessionReapInterval string
FlightHandleIdleTTL string
FlightSessionTokenTTL string
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
SessionInitTimeout string
MemoryLimit string
Threads int
MemoryBudget string
MemoryRebalance bool
DuckLakeDeltaCatalogEnabled bool
DuckLakeDeltaCatalogPath string
DuckLakeDefaultSpecVersion string
IcebergEnabled bool
IcebergTableBucket string
IcebergRegion string
IcebergNamespace string
ProcessMinWorkers int
ProcessMaxWorkers int
ProcessRetireOnSessionEnd bool
WorkerQueueTimeout string
WorkerIdleTimeout string
HandoverDrainTimeout string
ACMEDomain string
ACMEEmail string
ACMECacheDir string
ACMEDNSProvider string
ACMEDNSZoneID string
MaxConnections int
ConfigStoreConn string
ConfigPollInterval string
InternalSecret string
SNIRoutingMode string
ManagedHostnameSuffixes string
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
K8sControlPlaneID string
K8sWorkerPort int
K8sWorkerSecret string
K8sWorkerConfigMap string
K8sWorkerImagePullPolicy string
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sSharedWarmTarget int
K8sWorkerCPURequest string
K8sWorkerMemoryRequest string
K8sWorkerNodeSelector string
K8sWorkerTolerationKey string
K8sWorkerTolerationValue string
K8sWorkerExclusiveNode bool
AWSRegion string
QueryLog bool
Host string
Port int
FlightPort int
FlightSessionIdleTTL string
FlightSessionReapInterval string
FlightHandleIdleTTL string
FlightSessionTokenTTL string
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
SessionInitTimeout string
MemoryLimit string
Threads int
MemoryBudget string
MemoryRebalance bool
DuckLakeDeltaCatalogEnabled bool
DuckLakeDeltaCatalogPath string
DuckLakeDefaultSpecVersion string
IcebergEnabled bool
IcebergTableBucket string
IcebergRegion string
IcebergNamespace string
ProcessMinWorkers int
ProcessMaxWorkers int
ProcessRetireOnSessionEnd bool
WorkerQueueTimeout string
WorkerIdleTimeout string
HandoverDrainTimeout string
ACMEDomain string
ACMEEmail string
ACMECacheDir string
ACMEDNSProvider string
ACMEDNSZoneID string
MaxConnections int
ConfigStoreConn string
ConfigPollInterval string
InternalSecret string
SNIRoutingMode string
ManagedHostnameSuffixes string
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
K8sControlPlaneID string
K8sWorkerPort int
K8sWorkerSecret string
K8sWorkerConfigMap string
K8sWorkerImagePullPolicy string
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sSharedWarmTarget int
K8sDynamicWarmCapacityEnabled bool
K8sWarmCapacityMissWindow string
K8sWarmCapacityMissesPerWorker int
K8sWarmCapacityDemandTTL string
K8sWarmCapacityDynamicImageCeiling int
K8sWarmCapacityDynamicTotalCeiling int
K8sWorkerCPURequest string
K8sWorkerMemoryRequest string
K8sWorkerNodeSelector string
K8sWorkerTolerationKey string
K8sWorkerTolerationValue string
K8sWorkerExclusiveNode bool
AWSRegion string
QueryLog bool
}

type Resolved struct {
Expand Down Expand Up @@ -195,7 +201,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
var k8sWorkerSecret, k8sWorkerConfigMap, k8sWorkerImagePullPolicy string
k8sWorkerServiceAccount := controlplane.DefaultK8sWorkerServiceAccount
var k8sMaxWorkers, k8sSharedWarmTarget int
var k8sDynamicWarmCapacityEnabled bool
k8sDynamicWarmCapacityEnabled := true
k8sWarmCapacityMissWindow := controlplane.DefaultWarmCapacityMissWindow
k8sWarmCapacityMissesPerWorker := controlplane.DefaultWarmCapacityMissesPerWorker
k8sWarmCapacityDemandTTL := controlplane.DefaultWarmCapacityDemandTTL
Expand Down Expand Up @@ -1155,6 +1161,32 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
if cli.Set["k8s-shared-warm-target"] {
k8sSharedWarmTarget = cli.K8sSharedWarmTarget
}
if cli.Set["k8s-dynamic-warm-capacity-enabled"] {
k8sDynamicWarmCapacityEnabled = cli.K8sDynamicWarmCapacityEnabled
}
if cli.Set["k8s-warm-capacity-miss-window"] {
if d, err := time.ParseDuration(cli.K8sWarmCapacityMissWindow); err == nil {
k8sWarmCapacityMissWindow = d
} else {
warn("Invalid --k8s-warm-capacity-miss-window duration: " + err.Error())
}
}
if cli.Set["k8s-warm-capacity-misses-per-worker"] {
k8sWarmCapacityMissesPerWorker = cli.K8sWarmCapacityMissesPerWorker
}
if cli.Set["k8s-warm-capacity-demand-ttl"] {
if d, err := time.ParseDuration(cli.K8sWarmCapacityDemandTTL); err == nil {
k8sWarmCapacityDemandTTL = d
} else {
warn("Invalid --k8s-warm-capacity-demand-ttl duration: " + err.Error())
}
}
if cli.Set["k8s-warm-capacity-dynamic-image-ceiling"] {
k8sWarmCapacityDynamicImageCeiling = cli.K8sWarmCapacityDynamicImageCeiling
}
if cli.Set["k8s-warm-capacity-dynamic-total-ceiling"] {
k8sWarmCapacityDynamicTotalCeiling = cli.K8sWarmCapacityDynamicTotalCeiling
}
if cli.Set["aws-region"] {
awsRegion = cli.AWSRegion
}
Expand Down
50 changes: 33 additions & 17 deletions configresolve/resolve_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestResolveEffectiveExposesDuckLakeDefaultSpecVersionForControlPlane(t *tes
func TestResolveEffectiveDefaultsK8sDynamicWarmCapacityConfig(t *testing.T) {
resolved := ResolveEffective(nil, CLIInputs{}, nil, nil)

if resolved.K8sDynamicWarmCapacityEnabled {
t.Fatal("expected dynamic warm capacity to default disabled")
if !resolved.K8sDynamicWarmCapacityEnabled {
t.Fatal("expected dynamic warm capacity to default enabled")
}
if resolved.K8sWarmCapacityMissWindow != controlplane.DefaultWarmCapacityMissWindow {
t.Fatalf("expected miss window %s, got %s", controlplane.DefaultWarmCapacityMissWindow, resolved.K8sWarmCapacityMissWindow)
Expand All @@ -53,8 +53,8 @@ func TestResolveEffectiveDefaultsK8sDynamicWarmCapacityConfig(t *testing.T) {
}
}

func TestResolveEffectiveK8sDynamicWarmCapacityEnvOverridesFile(t *testing.T) {
fileEnabled := false
func TestResolveEffectiveK8sDynamicWarmCapacityPrecedence(t *testing.T) {
fileEnabled := true
fileCfg := &configloader.FileConfig{
K8s: configloader.K8sFileConfig{
DynamicWarmCapacityEnabled: &fileEnabled,
Expand All @@ -73,28 +73,44 @@ func TestResolveEffectiveK8sDynamicWarmCapacityEnvOverridesFile(t *testing.T) {
"DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING": "3",
"DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_TOTAL_CEILING": "5",
}
cli := CLIInputs{
Set: map[string]bool{
"k8s-dynamic-warm-capacity-enabled": true,
"k8s-warm-capacity-miss-window": true,
"k8s-warm-capacity-misses-per-worker": true,
"k8s-warm-capacity-demand-ttl": true,
"k8s-warm-capacity-dynamic-image-ceiling": true,
"k8s-warm-capacity-dynamic-total-ceiling": true,
},
K8sDynamicWarmCapacityEnabled: false,
K8sWarmCapacityMissWindow: "5m",
K8sWarmCapacityMissesPerWorker: 7,
K8sWarmCapacityDemandTTL: "14m",
K8sWarmCapacityDynamicImageCeiling: 8,
K8sWarmCapacityDynamicTotalCeiling: 9,
}

resolved := ResolveEffective(fileCfg, CLIInputs{}, func(key string) string {
resolved := ResolveEffective(fileCfg, cli, func(key string) string {
return env[key]
}, nil)

if !resolved.K8sDynamicWarmCapacityEnabled {
t.Fatal("expected env true to override file false")
if resolved.K8sDynamicWarmCapacityEnabled {
t.Fatal("expected explicit CLI false to override file/env true")
}
if resolved.K8sWarmCapacityMissWindow != 4*time.Minute {
t.Fatalf("expected miss window 4m, got %s", resolved.K8sWarmCapacityMissWindow)
if resolved.K8sWarmCapacityMissWindow != 5*time.Minute {
t.Fatalf("expected miss window 5m, got %s", resolved.K8sWarmCapacityMissWindow)
}
if resolved.K8sWarmCapacityMissesPerWorker != 6 {
t.Fatalf("expected misses per worker 6, got %d", resolved.K8sWarmCapacityMissesPerWorker)
if resolved.K8sWarmCapacityMissesPerWorker != 7 {
t.Fatalf("expected misses per worker 7, got %d", resolved.K8sWarmCapacityMissesPerWorker)
}
if resolved.K8sWarmCapacityDemandTTL != 13*time.Minute {
t.Fatalf("expected demand TTL 13m, got %s", resolved.K8sWarmCapacityDemandTTL)
if resolved.K8sWarmCapacityDemandTTL != 14*time.Minute {
t.Fatalf("expected demand TTL 14m, got %s", resolved.K8sWarmCapacityDemandTTL)
}
if resolved.K8sWarmCapacityDynamicImageCeiling != 3 {
t.Fatalf("expected image ceiling 3, got %d", resolved.K8sWarmCapacityDynamicImageCeiling)
if resolved.K8sWarmCapacityDynamicImageCeiling != 8 {
t.Fatalf("expected image ceiling 8, got %d", resolved.K8sWarmCapacityDynamicImageCeiling)
}
if resolved.K8sWarmCapacityDynamicTotalCeiling != 5 {
t.Fatalf("expected total ceiling 5, got %d", resolved.K8sWarmCapacityDynamicTotalCeiling)
if resolved.K8sWarmCapacityDynamicTotalCeiling != 9 {
t.Fatalf("expected total ceiling 9, got %d", resolved.K8sWarmCapacityDynamicTotalCeiling)
}
}

Expand Down
25 changes: 0 additions & 25 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"crypto/x509"
Expand Down Expand Up @@ -100,13 +99,6 @@ type K8sWorkerPool struct {

activatingTimeout time.Duration // max time a worker can stay in reserved/activating before being reaped

// warmCapacityMisses counts ReserveSharedWorker calls that returned
// WarmCapacityExhaustedError since the last ConsumeWarmCapacityDemand call.
// The janitor's warm-capacity reconciler drains this counter each tick and
// scales the warm pool to absorb the observed demand in one shot, rather
// than creeping up at the static SharedWarmTarget floor while cold tenants
// retry on 45-second backoffs. Atomically accessed; no lock needed.
warmCapacityMisses atomic.Int64
}

// NewK8sWorkerPool creates a K8sWorkerPool using in-cluster credentials.
Expand Down Expand Up @@ -1484,7 +1476,6 @@ func (p *K8sWorkerPool) recordWarmCapacityMiss(assignment *WorkerAssignment, rea
if !policy.recordDynamicDemand {
return
}
p.warmCapacityMisses.Add(1)
if p.runtimeStore == nil {
return
}
Expand Down Expand Up @@ -2921,22 +2912,6 @@ func (p *K8sWorkerPool) WarmCapacityTarget() int {
return p.minWorkers
}

// ConsumeWarmCapacityDemand returns the number of ReserveSharedWorker calls
// that have hit WarmCapacityExhausted (excluding per-org cap misses) since
// the last call, atomically resetting the counter. The warm-pool reconciler
// adds this to the static target each tick so a cold burst of N tenants
// scales the pool toward N workers in one or two ticks, instead of creeping
// up while clients re-arrive on their 45-second retry hints. Scale-down
// stays under the idle reaper's slower cadence so steady-state idle dips
// don't thrash the pool.
func (p *K8sWorkerPool) ConsumeWarmCapacityDemand() int {
n := p.warmCapacityMisses.Swap(0)
if n < 0 {
return 0
}
return int(n)
}

// SetPerImageWarmTargets replaces the per-image warm-worker floor. Each entry
// asks the pool to keep at least N warm-idle workers running with the given
// image. This is layered on top of SetWarmCapacityTarget — the per-image floor
Expand Down
Loading
Loading