Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Run with config file:
| `DUCKGRES_PROCESS_RETIRE_ON_SESSION_END` | Retire a process worker immediately after its last session ends instead of keeping it warm for reuse | `false` |
| `DUCKGRES_IDLE_TIMEOUT` | Connection idle timeout (e.g., `30m`, `1h`, `-1` to disable) | `24h` |
| `DUCKGRES_SESSION_INIT_TIMEOUT` | Session startup metadata initialization and catalog probe timeout | `10s` |
| `DUCKGRES_WORKER_QUEUE_TIMEOUT` | Max time to wait for worker acquisition and per-org connection-limit queue admission | `60s` |
| `DUCKGRES_WORKER_QUEUE_TIMEOUT` | Max time to wait for worker acquisition and per-org connection-limit queue admission; the managed K8s queue TTL uses this value | `60s` |
| `DUCKGRES_HANDOVER_DRAIN_TIMEOUT` | Max time to drain planned shutdowns and upgrades before forcing exit | `24h` in process mode, `15m` in remote K8s mode |
| `DUCKGRES_SNI_ROUTING_MODE` | Multi-tenant managed-hostname routing: `off`, `passthrough`, or `enforce`. Postgres uses the requested dbname first; managed SNI must resolve to the same org, and SNI supplies the database only when dbname is empty. | `off` |
| `DUCKGRES_MANAGED_HOSTNAME_SUFFIXES` | Comma-separated managed hostname suffixes such as `.dw.us.postwh.com` | - |
Expand Down Expand Up @@ -513,7 +513,7 @@ cfg := server.Config{
Built-in rate limiting protects against brute-force authentication attacks:

- **Failed attempt tracking**: Bans IPs after too many failed auth attempts
- **Connection limits**: Limits concurrent connections per IP
- **Connection limits**: Limits concurrent connections per IP and, when configured, total concurrent sessions. In K8s multi-tenant mode, org `max_connections` is enforced cluster-wide through runtime-store leases.
- **Auto-cleanup**: Expired records are automatically cleaned up

```yaml
Expand All @@ -522,6 +522,7 @@ rate_limit:
failed_attempt_window: "5m" # Within 5 minutes
ban_duration: "15m" # Ban lasts 15 minutes
max_connections_per_ip: 100 # Max concurrent connections
max_connections: 16 # Max total concurrent sessions (0 = unlimited)
```

## Usage Examples
Expand Down
35 changes: 35 additions & 0 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,41 @@ type FlightSessionRecord struct {

func (FlightSessionRecord) TableName() string { return "flight_session_records" }

// OrgConnectionQueueEntry is a cluster-wide FIFO admission request for one org
// connection. Rows expire quickly; they coordinate fairness across CP replicas.
type OrgConnectionQueueEntry struct {
RequestID string `gorm:"primaryKey;size:64" json:"request_id"`
OrgID string `gorm:"size:255;not null;index:idx_org_connection_queue_pending,priority:1" json:"org_id"`
CPInstanceID string `gorm:"size:255;not null;index" json:"cp_instance_id"`
PID int32 `gorm:"not null" json:"pid"`
Protocol string `gorm:"size:32;not null" json:"protocol"`
EnqueuedAt time.Time `gorm:"not null;index:idx_org_connection_queue_pending,priority:2" json:"enqueued_at"`
ExpiresAt time.Time `gorm:"not null;index" json:"expires_at"`
GrantedAt *time.Time `gorm:"index" json:"granted_at,omitempty"`
CanceledAt *time.Time `gorm:"index" json:"canceled_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

func (OrgConnectionQueueEntry) TableName() string { return "org_connection_queue" }

// OrgConnectionLease is the durable cluster-wide admission lease for a live
// session. Capacity checks count active leases, ignoring owners whose CP row
// has expired.
type OrgConnectionLease struct {
LeaseID string `gorm:"primaryKey;size:64" json:"lease_id"`
RequestID string `gorm:"size:64;not null;uniqueIndex" json:"request_id"`
OrgID string `gorm:"size:255;not null;index" json:"org_id"`
CPInstanceID string `gorm:"size:255;not null;index" json:"cp_instance_id"`
PID int32 `gorm:"not null" json:"pid"`
Protocol string `gorm:"size:32;not null" json:"protocol"`
AcquiredAt time.Time `gorm:"not null" json:"acquired_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

func (OrgConnectionLease) TableName() string { return "org_connection_leases" }

// OrgConfig is a convenience view combining org metadata with resource limits.
//
// HostnameAlias is a plain string here (empty == "no alias") because snapshot
Expand Down
325 changes: 325 additions & 0 deletions controlplane/configstore/org_connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
package configstore

import (
"errors"
"fmt"
"strings"
"time"

"gorm.io/gorm"
"gorm.io/gorm/clause"
)

const missingOwnerOrgConnectionLeaseGrace = 5 * time.Minute

// EnqueueOrgConnectionRequest inserts a pending cluster-wide connection
// admission request. FIFO ordering is scoped to org_id and ordered by
// enqueued_at, then request_id.
func (cs *ConfigStore) EnqueueOrgConnectionRequest(entry *OrgConnectionQueueEntry) error {
if entry == nil {
return fmt.Errorf("org connection queue entry is required")
}
if strings.TrimSpace(entry.RequestID) == "" {
return fmt.Errorf("org connection request id is required")
}
if strings.TrimSpace(entry.OrgID) == "" {
return fmt.Errorf("org connection org id is required")
}
if entry.EnqueuedAt.IsZero() {
entry.EnqueuedAt = time.Now()
}
if entry.ExpiresAt.IsZero() {
return fmt.Errorf("org connection request expiry is required")
}
ttl := entry.ExpiresAt.Sub(entry.EnqueuedAt)
if ttl <= 0 {
return fmt.Errorf("org connection request expiry must be after enqueue time")
}

entryCopy := *entry
if err := cs.db.Transaction(func(tx *gorm.DB) error {
now, err := cs.orgConnectionDatabaseNow(tx)
if err != nil {
return err
}
entryCopy.EnqueuedAt = now
entryCopy.ExpiresAt = now.Add(ttl)
return tx.Table(cs.runtimeTable(entryCopy.TableName())).Create(&entryCopy).Error
}); err != nil {
return fmt.Errorf("enqueue org connection request: %w", err)
}
return nil
}

// TryAcquireOrgConnectionLease attempts to grant one queued request under a
// cluster-wide per-org limit. A nil lease means the request is still waiting
// behind FIFO order or active capacity.
func (cs *ConfigStore) TryAcquireOrgConnectionLease(requestID string, maxConnections int, _ time.Time) (*OrgConnectionLease, error) {
if strings.TrimSpace(requestID) == "" {
return nil, fmt.Errorf("org connection request id is required")
}

for {
lease, retry, err := cs.tryAcquireOrgConnectionLeaseOnce(requestID, maxConnections)
if retry {
continue
}
if err != nil {
return nil, fmt.Errorf("try acquire org connection lease: %w", err)
}
return lease, nil
}
}

type orgConnectionRuntimeTables struct {
queue string
lease string
}

func (cs *ConfigStore) orgConnectionRuntimeTables() orgConnectionRuntimeTables {
return orgConnectionRuntimeTables{
queue: cs.runtimeTable((&OrgConnectionQueueEntry{}).TableName()),
lease: cs.runtimeTable((&OrgConnectionLease{}).TableName()),
}
}

func (cs *ConfigStore) tryAcquireOrgConnectionLeaseOnce(requestID string, maxConnections int) (*OrgConnectionLease, bool, error) {
tables := cs.orgConnectionRuntimeTables()
var lease *OrgConnectionLease
retryWithFreshOrg := false

err := cs.db.Transaction(func(tx *gorm.DB) error {
orgID, found, err := cs.orgIDForConnectionRequest(tx, tables.queue, requestID)
if err != nil || !found {
return err
}
if err := tx.Exec("SELECT pg_advisory_xact_lock(?)", advisoryLockKey("duckgres:org-connections:"+orgID)).Error; err != nil {
return err
}
now, err := cs.orgConnectionDatabaseNow(tx)
if err != nil {
return err
}
if err := cs.cleanupOrgConnectionRowsLocked(tx, orgID, now); err != nil {
return err
}

request, found, err := cs.lockOrgConnectionRequest(tx, tables.queue, requestID)
if err != nil || !found {
return err
}
if request.OrgID != orgID {
retryWithFreshOrg = true
return nil
}

existing, found, err := cs.existingOrgConnectionLease(tx, tables.lease, requestID)
if err != nil || found {
lease = existing
return err
}
if !request.ExpiresAt.After(now) || request.CanceledAt != nil || request.GrantedAt != nil {
return nil
}
atHead, err := cs.isOrgConnectionQueueHead(tx, tables.queue, request, now)
if err != nil || !atHead {
return err
}
if maxConnections > 0 {
count, err := cs.countActiveOrgConnectionLeases(tx, orgID)
if err != nil {
return err
}
if count >= int64(maxConnections) {
return nil
}
}

created, err := cs.createOrgConnectionLease(tx, request, now)
if err != nil {
return err
}
lease = created
return nil
})
return lease, retryWithFreshOrg, err
}

func (cs *ConfigStore) orgConnectionDatabaseNow(tx *gorm.DB) (time.Time, error) {
var now time.Time
if err := tx.Raw("SELECT clock_timestamp()").Scan(&now).Error; err != nil {
return time.Time{}, err
}
return now, nil
}

func (cs *ConfigStore) orgIDForConnectionRequest(tx *gorm.DB, queueTable, requestID string) (string, bool, error) {
var requestOrg struct {
OrgID string
}
if err := tx.Table(queueTable).
Select("org_id").
Where("request_id = ?", requestID).
Take(&requestOrg).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return "", false, nil
}
return "", false, err
}
return requestOrg.OrgID, true, nil
}

func (cs *ConfigStore) lockOrgConnectionRequest(tx *gorm.DB, queueTable, requestID string) (*OrgConnectionQueueEntry, bool, error) {
var request OrgConnectionQueueEntry
if err := tx.Table(queueTable).
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("request_id = ?", requestID).
Take(&request).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, nil
}
return nil, false, err
}
return &request, true, nil
}

func (cs *ConfigStore) existingOrgConnectionLease(tx *gorm.DB, leaseTable, requestID string) (*OrgConnectionLease, bool, error) {
var existing OrgConnectionLease
if err := tx.Table(leaseTable).
Where("request_id = ?", requestID).
Take(&existing).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false, nil
}
return nil, false, err
}
return &existing, true, nil
}

func (cs *ConfigStore) isOrgConnectionQueueHead(tx *gorm.DB, queueTable string, request *OrgConnectionQueueEntry, now time.Time) (bool, error) {
var head OrgConnectionQueueEntry
if err := tx.Table(queueTable).
Where("org_id = ? AND granted_at IS NULL AND canceled_at IS NULL AND expires_at > ?", request.OrgID, now).
Order("enqueued_at ASC, request_id ASC").
Limit(1).
Take(&head).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
return false, err
}
return head.RequestID == request.RequestID, nil
}

func (cs *ConfigStore) createOrgConnectionLease(tx *gorm.DB, request *OrgConnectionQueueEntry, now time.Time) (*OrgConnectionLease, error) {
granted := now
created := &OrgConnectionLease{
LeaseID: request.RequestID,
RequestID: request.RequestID,
OrgID: request.OrgID,
CPInstanceID: request.CPInstanceID,
PID: request.PID,
Protocol: request.Protocol,
AcquiredAt: now,
}
if err := tx.Table(cs.runtimeTable(created.TableName())).Create(created).Error; err != nil {
return nil, err
}
if err := tx.Table(cs.runtimeTable(request.TableName())).
Where("request_id = ?", request.RequestID).
Updates(map[string]any{
"granted_at": granted,
"updated_at": now,
}).Error; err != nil {
return nil, err
}
return created, nil
}

// ReleaseOrgConnectionLease releases one active cluster-wide connection lease.
func (cs *ConfigStore) ReleaseOrgConnectionLease(leaseID string) error {
if strings.TrimSpace(leaseID) == "" {
return nil
}
err := cs.db.Transaction(func(tx *gorm.DB) error {
if err := tx.Table(cs.runtimeTable((&OrgConnectionLease{}).TableName())).
Where("lease_id = ?", leaseID).
Delete(&OrgConnectionLease{}).Error; err != nil {
return err
}
return tx.Table(cs.runtimeTable((&OrgConnectionQueueEntry{}).TableName())).
Where("request_id = ?", leaseID).
Delete(&OrgConnectionQueueEntry{}).Error
})
if err != nil {
return fmt.Errorf("release org connection lease: %w", err)
}
return nil
}

// CancelOrgConnectionRequest removes a pending queue request. It does not
// delete an already-granted lease; callers must release leases explicitly.
func (cs *ConfigStore) CancelOrgConnectionRequest(requestID string, _ time.Time) error {
if strings.TrimSpace(requestID) == "" {
return nil
}
result := cs.db.Table(cs.runtimeTable((&OrgConnectionQueueEntry{}).TableName())).
Where("request_id = ? AND granted_at IS NULL AND canceled_at IS NULL", requestID).
Delete(&OrgConnectionQueueEntry{})
if result.Error != nil {
return fmt.Errorf("cancel org connection request: %w", result.Error)
}
return nil
}

// ActiveOrgConnectionLeaseCount returns the active cluster-wide lease count for
// an org, ignoring leases owned by expired control-plane instances.
func (cs *ConfigStore) ActiveOrgConnectionLeaseCount(orgID string) (int64, error) {
var count int64
err := cs.db.Transaction(func(tx *gorm.DB) error {
var err error
count, err = cs.countActiveOrgConnectionLeases(tx, orgID)
return err
})
if err != nil {
return 0, fmt.Errorf("count active org connection leases: %w", err)
}
return count, nil
}

func (cs *ConfigStore) cleanupOrgConnectionRowsLocked(tx *gorm.DB, orgID string, now time.Time) error {
queueTable := cs.runtimeTable((&OrgConnectionQueueEntry{}).TableName())
leaseTable := cs.runtimeTable((&OrgConnectionLease{}).TableName())
cpTable := cs.runtimeTable((&ControlPlaneInstance{}).TableName())

if err := tx.Table(queueTable).
Where("org_id = ? AND granted_at IS NULL AND canceled_at IS NULL AND expires_at <= ?", orgID, now).
Delete(&OrgConnectionQueueEntry{}).Error; err != nil {
return err
}

if err := tx.Exec("DELETE FROM "+leaseTable+" AS l USING "+cpTable+" AS cp "+
"WHERE l.cp_instance_id = cp.id AND l.org_id = ? AND cp.state = ?",
orgID, ControlPlaneInstanceStateExpired).Error; err != nil {
return err
}

return tx.Exec(
"DELETE FROM "+leaseTable+" AS l "+
"WHERE l.org_id = ? AND l.acquired_at <= ? "+
"AND NOT EXISTS (SELECT 1 FROM "+cpTable+" AS cp WHERE cp.id = l.cp_instance_id)",
orgID, now.Add(-missingOwnerOrgConnectionLeaseGrace),
).Error
}

func (cs *ConfigStore) countActiveOrgConnectionLeases(tx *gorm.DB, orgID string) (int64, error) {
var count int64
leaseTable := cs.runtimeTable((&OrgConnectionLease{}).TableName())
cpTable := cs.runtimeTable((&ControlPlaneInstance{}).TableName())
err := tx.Table(leaseTable+" AS l").
Joins("LEFT JOIN "+cpTable+" AS cp ON cp.id = l.cp_instance_id").
Where("l.org_id = ?", orgID).
Where("cp.id IS NULL OR cp.state <> ?", ControlPlaneInstanceStateExpired).
Count(&count).Error
return count, err
}
Loading
Loading