Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .claude/skills/multitenant-down/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
name: multitenant-down
description: Tear down the multi-tenant K8s stack, kill port-forwards, and clean up worker pods. Use when done testing multi-tenant mode.
---

Tear down the multi-tenant K8s stack:

1. Kill all port-forwards:
```bash
pkill -f 'port-forward.*duckgres' 2>/dev/null
```

2. Delete the duckgres namespace (removes control plane, workers, RBAC, services):
```bash
kubectl delete namespace duckgres --ignore-not-found
```

3. Stop the config store:
```bash
docker compose -f k8s/local-config-store.compose.yaml down
```

4. Clean up the TLS cert:
```bash
rm -f /tmp/duckgres-server.crt
```

5. Report to the user that the stack is torn down.
35 changes: 35 additions & 0 deletions .claude/skills/multitenant-up/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
name: multitenant-up
description: Build, deploy, and port-forward the multi-tenant control plane on local Kubernetes. Use when you need to test duckgres in multi-tenant mode with per-team worker pools.
---

Boot up the multi-tenant K8s stack:

1. Build and deploy:
```bash
just run-multitenant-local
```

2. Kill any stale port-forwards and start fresh ones:
```bash
pkill -f 'port-forward.*duckgres' 2>/dev/null; sleep 1
kubectl -n duckgres port-forward svc/duckgres 5432:5432 &>/dev/null &
kubectl -n duckgres port-forward svc/duckgres 8815:8815 &>/dev/null &
kubectl -n duckgres port-forward deployment/duckgres-control-plane 9090:9090 &>/dev/null &
```

3. Grab the admin API token:
```bash
kubectl -n duckgres logs deployment/duckgres-control-plane | grep 'admin API token'
```

4. Extract the TLS cert for Flight SQL clients:
```bash
kubectl -n duckgres exec deployment/duckgres-control-plane -- cat /certs/server.crt > /tmp/duckgres-server.crt
```

5. Report to the user:
- Admin dashboard: http://localhost:9090 (show the token)
- PG: `PGSSLMODE=require PGPASSWORD=postgres psql -h localhost -U postgres`
- Flight SQL (DuckDB): `GRPC_DEFAULT_SSL_ROOTS_FILE_PATH=/tmp/duckgres-server.crt duckdb` then `INSTALL duckhog FROM community; LOAD duckhog; ATTACH 'hog:memory?user=postgres&password=postgres&flight_server=grpc+tls://localhost:8815' AS mt;`
- Default credentials: postgres / postgres
1 change: 1 addition & 0 deletions controlplane/admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type SessionStatus struct {
PID int32 `json:"pid"`
WorkerID int `json:"worker_id"`
Org string `json:"org"`
Protocol string `json:"protocol"`
}

// ClusterStatus aggregates cluster state for the dashboard.
Expand Down
7 changes: 6 additions & 1 deletion controlplane/admin/static/sessions.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ <h1 class="text-2xl font-bold mb-6">Active Sessions</h1>
return;
}
let html = '<table class="w-full"><thead><tr class="text-left text-gray-400 text-sm border-b border-gray-700">' +
'<th class="p-3">PID</th><th class="p-3">Org</th><th class="p-3">Worker</th></tr></thead><tbody>';
'<th class="p-3">PID</th><th class="p-3">Org</th><th class="p-3">Protocol</th><th class="p-3">Worker</th></tr></thead><tbody>';
sessions.forEach(s => {
const proto = s.protocol || 'postgres';
const badge = proto === 'flight'
? '<span class="px-2 py-0.5 rounded text-xs font-medium bg-blue-900 text-blue-300">flight</span>'
: '<span class="px-2 py-0.5 rounded text-xs font-medium bg-green-900 text-green-300">postgres</span>';
html += `<tr class="border-b border-gray-700">
<td class="p-3 font-mono">${s.pid}</td>
<td class="p-3">${esc(s.org)}</td>
<td class="p-3">${badge}</td>
<td class="p-3">${s.worker_id}</td></tr>`;
});
html += '</tbody></table>';
Expand Down
26 changes: 22 additions & 4 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cloudflare/tableflip"
"github.com/posthog/duckgres/server"
"github.com/posthog/duckgres/server/flightsqlingress"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand Down Expand Up @@ -1102,9 +1103,26 @@ func (cp *ControlPlane) startFlightIngress() {
return
}

// Flight ingress requires a single session manager (not multi-tenant).
if cp.sessions == nil {
slog.Info("Flight ingress disabled in multi-tenant mode.")
var validator flightsqlingress.CredentialValidator
var provider flightsqlingress.SessionProvider

switch {
case cp.configStore != nil && cp.orgRouter != nil:
// Multi-tenant: auth via config store, sessions routed per-org.
validator = flightsqlingress.FuncCredentialValidator(func(username, password string) bool {
_, ok := cp.configStore.ValidateUser(username, password)
return ok
})
provider = &orgRoutedSessionProvider{
orgRouter: cp.orgRouter,
pidSession: make(map[int32]*SessionManager),
}
case cp.sessions != nil:
// Single-tenant: static users map, single session manager.
validator = &flightsqlingress.MapCredentialValidator{Users: cp.cfg.Users}
provider = &flightSessionProvider{sm: cp.sessions}
default:
slog.Warn("Flight ingress disabled: no session manager or config store available.")
return
}

Expand All @@ -1119,7 +1137,7 @@ func (cp *ControlPlane) startFlightIngress() {
var flightIngress *FlightIngress
var err error
for attempt := 0; attempt < 10; attempt++ {
flightIngress, err = NewFlightIngress(cp.cfg.Host, cp.cfg.FlightPort, cp.tlsConfig, cp.cfg.Users, cp.sessions, cp.rateLimiter, flightCfg)
flightIngress, err = NewFlightIngress(cp.cfg.Host, cp.cfg.FlightPort, cp.tlsConfig, validator, provider, cp.rateLimiter, flightCfg)
if err == nil {
break
}
Expand Down
74 changes: 72 additions & 2 deletions controlplane/flight_ingress.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package controlplane

import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"sync"

"github.com/posthog/duckgres/server"
"github.com/posthog/duckgres/server/flightsqlingress"
Expand All @@ -12,12 +16,78 @@ type FlightIngressConfig = flightsqlingress.Config
type FlightIngress = flightsqlingress.FlightIngress

// NewFlightIngress creates a control-plane Flight SQL ingress listener.
func NewFlightIngress(host string, port int, tlsConfig *tls.Config, users map[string]string, sm *SessionManager, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error) {
return flightsqlingress.NewFlightIngress(host, port, tlsConfig, users, sm, cfg, flightsqlingress.Options{
func NewFlightIngress(host string, port int, tlsConfig *tls.Config, validator flightsqlingress.CredentialValidator, provider flightsqlingress.SessionProvider, rateLimiter *server.RateLimiter, cfg FlightIngressConfig) (*FlightIngress, error) {
return flightsqlingress.NewFlightIngress(host, port, tlsConfig, validator, provider, cfg, flightsqlingress.Options{
RateLimiter: rateLimiter,
Hooks: flightsqlingress.Hooks{
OnSessionCountChanged: observeFlightAuthSessions,
OnSessionsReaped: observeFlightSessionsReaped,
},
})
}

// flightSessionProvider wraps a SessionManager and labels sessions as "flight".
type flightSessionProvider struct {
sm *SessionManager
}

func (p *flightSessionProvider) CreateSession(ctx context.Context, username string, pid int32, memoryLimit string, threads int) (int32, *server.FlightExecutor, error) {
workerPID, executor, err := p.sm.CreateSession(ctx, username, pid, memoryLimit, threads)
if err != nil {
return 0, nil, err
}
p.sm.SetProtocol(workerPID, "flight")
return workerPID, executor, nil
}

func (p *flightSessionProvider) DestroySession(pid int32) {
p.sm.DestroySession(pid)
}

// orgRoutedSessionProvider routes Flight SQL session operations to the correct
// org's SessionManager based on the username→org mapping in the config store.
type orgRoutedSessionProvider struct {
orgRouter OrgRouterInterface

mu sync.RWMutex
pidSession map[int32]*SessionManager // pid → owning session manager
}

func (p *orgRoutedSessionProvider) CreateSession(ctx context.Context, username string, pid int32, memoryLimit string, threads int) (int32, *server.FlightExecutor, error) {
_, sessions, _, ok := p.orgRouter.StackForUser(username)
if !ok {
slog.Warn("Flight SQL session: no org stack for user.", "username", username)
return 0, nil, fmt.Errorf("no org configured for user %q", username)
}

// SessionManager.resolveSessionLimits handles rebalancer defaults,
// so pass memoryLimit/threads through as-is.
workerPID, executor, err := sessions.CreateSession(ctx, username, pid, memoryLimit, threads)
if err != nil {
return 0, nil, err
}

sessions.SetProtocol(workerPID, "flight")

p.mu.Lock()
p.pidSession[workerPID] = sessions
p.mu.Unlock()

return workerPID, executor, nil
}

func (p *orgRoutedSessionProvider) DestroySession(pid int32) {
p.mu.RLock()
sm, ok := p.pidSession[pid]
p.mu.RUnlock()
if !ok {
slog.Warn("Flight SQL destroy: no session manager for pid.", "pid", pid)
return
}

sm.DestroySession(pid)

p.mu.Lock()
delete(p.pidSession, pid)
p.mu.Unlock()
}
8 changes: 6 additions & 2 deletions controlplane/flight_ingress_adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package controlplane

import "testing"
import (
"testing"

"github.com/posthog/duckgres/server/flightsqlingress"
)

func TestNewFlightIngressAdapterValidation(t *testing.T) {
_, err := NewFlightIngress("127.0.0.1", 0, nil, map[string]string{}, nil, nil, FlightIngressConfig{})
_, err := NewFlightIngress("127.0.0.1", 0, nil, &flightsqlingress.MapCredentialValidator{}, nil, nil, FlightIngressConfig{})
if err == nil {
t.Fatalf("expected validation error for invalid port")
}
Expand Down
100 changes: 100 additions & 0 deletions controlplane/flight_ingress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package controlplane

import (
"context"
"sync"
"testing"
)

// mockOrgRouter implements OrgRouterInterface for testing.
type mockOrgRouter struct {
sessions *SessionManager
rebalancer *MemoryRebalancer
ok bool
}

func (m *mockOrgRouter) StackForUser(_ string) (WorkerPool, *SessionManager, *MemoryRebalancer, bool) {
return nil, m.sessions, m.rebalancer, m.ok
}

func (m *mockOrgRouter) ShutdownAll() {}

func TestOrgRoutedSessionProviderCreateSessionTeamNotFound(t *testing.T) {
provider := &orgRoutedSessionProvider{
orgRouter: &mockOrgRouter{ok: false},
pidSession: make(map[int32]*SessionManager),
}

_, _, err := provider.CreateSession(context.Background(), "unknown", 0, "", 0)
if err == nil {
t.Fatal("expected error for unknown org")
}
if err.Error() != `no org configured for user "unknown"` {
t.Fatalf("unexpected error message: %v", err)
}
}

func TestOrgRoutedSessionProviderDestroySessionRemovesPid(t *testing.T) {
sm := NewSessionManager(nil, nil)

provider := &orgRoutedSessionProvider{
orgRouter: &mockOrgRouter{sessions: sm, ok: true},
pidSession: map[int32]*SessionManager{
42: sm,
},
}

// Destroy known pid — should remove from map.
// sm.DestroySession(42) is a no-op for unknown internal session, which is fine;
// we're testing the adapter's pid map bookkeeping.
provider.DestroySession(42)

provider.mu.RLock()
_, exists := provider.pidSession[42]
provider.mu.RUnlock()
if exists {
t.Fatal("expected pid 42 to be removed from pidSession map after destroy")
}
}

func TestOrgRoutedSessionProviderDestroyUnknownPidNoOp(t *testing.T) {
provider := &orgRoutedSessionProvider{
orgRouter: &mockOrgRouter{ok: true},
pidSession: make(map[int32]*SessionManager),
}

// Should not panic.
provider.DestroySession(999)
}

func TestOrgRoutedSessionProviderConcurrentDestroys(t *testing.T) {
sm := NewSessionManager(nil, nil)

provider := &orgRoutedSessionProvider{
orgRouter: &mockOrgRouter{sessions: sm, ok: true},
pidSession: make(map[int32]*SessionManager),
}

// Pre-populate
for i := int32(0); i < 100; i++ {
provider.pidSession[i] = sm
}

// Concurrent destroys should not race.
var wg sync.WaitGroup
for i := int32(0); i < 100; i++ {
wg.Add(1)
go func(pid int32) {
defer wg.Done()
provider.DestroySession(pid)
}(i)
}
wg.Wait()

provider.mu.RLock()
remaining := len(provider.pidSession)
provider.mu.RUnlock()
if remaining != 0 {
t.Fatalf("expected all pids removed, got %d remaining", remaining)
}
}
1 change: 1 addition & 0 deletions controlplane/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (a *orgRouterAdapter) AllSessionStatuses() []admin.SessionStatus {
PID: s.PID,
WorkerID: s.WorkerID,
Org: name,
Protocol: s.Protocol,
})
}
}
Expand Down
11 changes: 11 additions & 0 deletions controlplane/session_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type SessionProgress struct {
type ManagedSession struct {
PID int32
WorkerID int
Protocol string // "postgres" or "flight"
SessionToken string
Executor *server.FlightExecutor
connCloser io.Closer // TCP connection, closed on worker crash to unblock the message loop
Expand Down Expand Up @@ -98,6 +99,7 @@ func (sm *SessionManager) CreateSession(ctx context.Context, username string, pi
session := &ManagedSession{
PID: pid,
WorkerID: worker.ID,
Protocol: "postgres",
SessionToken: sessionToken,
Executor: executor,
}
Expand Down Expand Up @@ -308,6 +310,15 @@ func (sm *SessionManager) UpdateProgress(workerID int, progress map[string]*Sess
}
}

// SetProtocol updates the protocol label for an active session.
func (sm *SessionManager) SetProtocol(pid int32, protocol string) {
sm.mu.RLock()
defer sm.mu.RUnlock()
if s, ok := sm.sessions[pid]; ok {
s.Protocol = protocol
}
}

// AllSessions returns a snapshot of all active sessions.
// The returned slice is safe to iterate without holding the lock.
func (sm *SessionManager) AllSessions() []*ManagedSession {
Expand Down
1 change: 0 additions & 1 deletion docs/shared-multitenant-control-plane-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ Separate from the end-user query path, operators use the admin API/dashboard to
- Change worker pod spec from global to be team-specific.
- Long-term allocator hardening: move per-team admission and reservation into the shared worker allocator so "check team capacity + reserve worker" is one atomic operation, with centrally tracked assigned and pending reservations per team instead of router-side counting.
- Implement non-disruptive customer-initiated duckgres login rotation mechanism.
- Enable Duckhog / Flight SQL in multi-tenant mode.
- Dev Infra
- Aurora cluster
- Deploy control plane in k8
Expand Down
Loading
Loading