Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type configCLIInputs struct {
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
MemoryLimit string
Expand Down Expand Up @@ -225,6 +226,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
cfg.DuckLake.S3Profile = fileCfg.DuckLake.S3Profile
}

cfg.FilePersistence = fileCfg.FilePersistence
cfg.ProcessIsolation = fileCfg.ProcessIsolation
if fileCfg.IdleTimeout != "" {
if d, err := time.ParseDuration(fileCfg.IdleTimeout); err == nil {
Expand Down Expand Up @@ -435,6 +437,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if v := getenv("DUCKGRES_DUCKLAKE_S3_PROFILE"); v != "" {
cfg.DuckLake.S3Profile = v
}
if v := getenv("DUCKGRES_FILE_PERSISTENCE"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.FilePersistence = b
} else {
warn("Invalid DUCKGRES_FILE_PERSISTENCE: " + err.Error())
}
}
if v := getenv("DUCKGRES_PROCESS_ISOLATION"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.ProcessIsolation = b
Expand Down Expand Up @@ -638,6 +647,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if cli.Set["key"] {
cfg.TLSKeyFile = cli.KeyFile
}
if cli.Set["file-persistence"] {
cfg.FilePersistence = cli.FilePersistence
}
if cli.Set["process-isolation"] {
cfg.ProcessIsolation = cli.ProcessIsolation
}
Expand Down Expand Up @@ -755,6 +767,12 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
cfg.ACMEDNSZoneID = ""
}

// Validate file persistence requires data_dir
if cfg.FilePersistence && cfg.DataDir == "" {
warn("file_persistence is enabled but data_dir is empty; disabling file persistence")
cfg.FilePersistence = false
}

// Validate memory_limit format if explicitly set
if cfg.MemoryLimit != "" && !server.ValidateMemoryLimit(cfg.MemoryLimit) {
warn("Invalid memory_limit format: " + cfg.MemoryLimit + " (expected e.g. '4GB', '512MB')")
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type FileConfig struct {
RateLimit RateLimitFileConfig `yaml:"rate_limit"`
Extensions []string `yaml:"extensions"`
DuckLake DuckLakeFileConfig `yaml:"ducklake"`
FilePersistence bool `yaml:"file_persistence"` // Persist DuckDB to <data_dir>/<username>.duckdb instead of :memory:
ProcessIsolation bool `yaml:"process_isolation"` // Enable process isolation per connection
IdleTimeout string `yaml:"idle_timeout"` // e.g., "24h", "1h", "-1" to disable
MemoryLimit string `yaml:"memory_limit"` // DuckDB memory_limit per session (e.g., "4GB")
Expand Down Expand Up @@ -194,6 +195,7 @@ func main() {
dataDir := flag.String("data-dir", "", "Directory for DuckDB files (env: DUCKGRES_DATA_DIR)")
certFile := flag.String("cert", "", "TLS certificate file (env: DUCKGRES_CERT)")
keyFile := flag.String("key", "", "TLS private key file (env: DUCKGRES_KEY)")
filePersistence := flag.Bool("file-persistence", false, "Persist DuckDB to <data-dir>/<username>.duckdb instead of in-memory (env: DUCKGRES_FILE_PERSISTENCE)")
processIsolation := flag.Bool("process-isolation", false, "Enable process isolation (spawn child process per connection)")
idleTimeout := flag.String("idle-timeout", "", "Connection idle timeout (e.g., '30m', '1h', '-1' to disable) (env: DUCKGRES_IDLE_TIMEOUT)")
memoryLimit := flag.String("memory-limit", "", "DuckDB memory_limit per session (e.g., '4GB') (env: DUCKGRES_MEMORY_LIMIT)")
Expand Down Expand Up @@ -259,6 +261,7 @@ func main() {
fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_FILE_PERSISTENCE Persist DuckDB to <data_dir>/<username>.duckdb (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_PROCESS_ISOLATION Enable process isolation (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_IDLE_TIMEOUT Connection idle timeout (e.g., 30m, 1h, -1 to disable)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_MEMORY_LIMIT DuckDB memory_limit per session (e.g., 4GB)\n")
Expand Down Expand Up @@ -353,6 +356,7 @@ func main() {
DataDir: *dataDir,
CertFile: *certFile,
KeyFile: *keyFile,
FilePersistence: *filePersistence,
ProcessIsolation: *processIsolation,
IdleTimeout: *idleTimeout,
MemoryLimit: *memoryLimit,
Expand Down
80 changes: 80 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,86 @@ func TestResolveEffectiveConfigACMEDNSProviderValidation(t *testing.T) {
}
}

func TestResolveEffectiveConfigFilePersistenceFromFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
DataDir: "/tmp/data",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(nil), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from YAML to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceFromEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "true",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from env to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceEnvOverridesFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
}
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(env), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected env false to override file true")
}
}

func TestResolveEffectiveConfigFilePersistenceCLIOverridesEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{
Set: map[string]bool{"file-persistence": true},
FilePersistence: true,
}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected CLI true to override env false")
}
}

func TestResolveEffectiveConfigFilePersistenceDefaultFalse(t *testing.T) {
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(nil), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected file_persistence to default to false")
}
}

func TestResolveEffectiveConfigFilePersistenceRequiresDataDir(t *testing.T) {
// Override data_dir to empty via CLI to test the validation guard.
var warnings []string
resolved := resolveEffectiveConfig(&FileConfig{
FilePersistence: true,
}, configCLIInputs{
Set: map[string]bool{"data-dir": true},
DataDir: "", // explicitly empty
}, envFromMap(nil), func(msg string) {
warnings = append(warnings, msg)
})
if resolved.Server.FilePersistence {
t.Fatal("expected file_persistence to be disabled when data_dir is empty")
}
found := false
for _, w := range warnings {
if strings.Contains(w, "file_persistence") && strings.Contains(w, "data_dir") {
found = true
break
}
}
if !found {
t.Fatalf("expected warning about file_persistence + empty data_dir, got: %v", warnings)
}
}

func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) {
fileCfg := &FileConfig{
TLS: TLSConfig{
Expand Down
16 changes: 13 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type clientConn struct {
portals map[string]*portal // portals by name
txStatus byte // current transaction status ('I', 'T', or 'E')
passthrough bool // true for passthrough users (skip transpiler + pg_catalog)
sharedDB bool // true when using file persistence pool (don't close DB directly)
cursors map[string]*cursorState // server-side cursor emulation
ctx context.Context // connection context, cancelled when connection is closed
cancel context.CancelFunc // cancels the connection context
Expand Down Expand Up @@ -397,8 +398,9 @@ func (c *clientConn) safeCleanupDB() {
}
}

// Detach DuckLake to release the RDS metadata connection (only if connection is healthy)
if connHealthy && c.server.cfg.DuckLake.MetadataStore != "" {
// Detach DuckLake to release the RDS metadata connection (only if connection is healthy).
// Skip for shared file-backed DBs — DuckLake stays attached for the pool lifetime.
if connHealthy && !c.sharedDB && c.server.cfg.DuckLake.MetadataStore != "" {
// Must switch away from ducklake before detaching - DuckDB doesn't allow
// detaching the default database
ctx3, cancel3 := context.WithTimeout(context.Background(), cleanupTimeout)
Expand All @@ -421,6 +423,13 @@ func (c *clientConn) safeCleanupDB() {
}
}

// For shared file-backed DBs, release our reference instead of closing.
// The pool will close the DB when the last reference is released.
if c.sharedDB {
c.server.releaseFileDB(c.username)
return
}

// Always attempt to close the database connection.
// If the connection is broken, this may still throw, but we've done our best
// to clean up the transaction state first.
Expand Down Expand Up @@ -565,14 +574,15 @@ func (c *clientConn) serve() error {
var db *sql.DB
var err error
if c.passthrough {
db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion)
db, err = c.server.createPassthroughDBConnection(c.username)
} else {
db, err = c.server.createDBConnection(c.username)
}
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
c.sharedDB = c.server.cfg.FilePersistence
c.executor = NewLocalExecutor(db)

// Start background credential refresh for long-lived connections.
Expand Down
111 changes: 107 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ type Config struct {
// uncleanly. Default: 24 hours. Set to a negative value (e.g., -1) to disable.
IdleTimeout time.Duration

// FilePersistence stores DuckDB data in <DataDir>/<username>.duckdb instead of :memory:.
// DuckDB memory-maps the file and serves queries from RAM, so performance is similar
// to in-memory mode while data persists across connections and restarts.
FilePersistence bool

// ProcessIsolation enables spawning each client connection in a separate OS process.
// This prevents DuckDB C++ crashes from taking down the entire server.
// When enabled, rate limiting and cancel requests are handled by the parent process,
Expand Down Expand Up @@ -239,6 +244,14 @@ type DuckLakeConfig struct {
S3Profile string // AWS profile name to use (for "config" chain)
}

// fileDBEntry tracks a shared file-backed DuckDB connection with reference counting.
// Used when FilePersistence is enabled so multiple client connections for the same
// user share a single *sql.DB instead of fighting over the DuckDB file lock.
type fileDBEntry struct {
db *sql.DB
refs int
}

type Server struct {
cfg Config
listener net.Listener
Expand Down Expand Up @@ -275,6 +288,12 @@ type Server struct {

// Query logger for DuckLake system.query_log
queryLogger *QueryLogger

// fileDBs pools shared *sql.DB instances for file-backed persistence mode.
// Keyed by username. Multiple client connections for the same user share a
// single DuckDB file handle to avoid write-lock conflicts.
fileDBsMu sync.Mutex
fileDBs map[string]*fileDBEntry
}

func New(cfg Config) (*Server, error) {
Expand Down Expand Up @@ -322,6 +341,7 @@ func New(cfg Config) (*Server, error) {
activeQueries: make(map[BackendKey]context.CancelFunc),
duckLakeSem: make(chan struct{}, 1),
conns: make(map[int32]*clientConn),
fileDBs: make(map[string]*fileDBEntry),
}

// Configure TLS: ACME DNS-01, ACME HTTP-01, or static certificate files
Expand Down Expand Up @@ -620,16 +640,99 @@ func (s *Server) listConns() []*clientConn {
}

// createDBConnection creates a DuckDB connection for a client session.
// This is a thin wrapper around CreateDBConnection using the server's config.
// When FilePersistence is enabled, connections are pooled per-user so multiple
// clients sharing the same username reuse a single DuckDB file handle.
func (s *Server) createDBConnection(username string) (*sql.DB, error) {
if s.cfg.FilePersistence {
return s.acquireFileDB(username, false)
}
return CreateDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion)
}

// openBaseDB creates and configures a bare DuckDB in-memory connection with
// threads, memory limit, temp directory, extensions, and cache_httpfs settings.
// createPassthroughDBConnection creates a passthrough DuckDB connection.
// When FilePersistence is enabled, connections are pooled per-user.
func (s *Server) createPassthroughDBConnection(username string) (*sql.DB, error) {
if s.cfg.FilePersistence {
return s.acquireFileDB(username, true)
}
return CreatePassthroughDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion)
}

// acquireFileDB returns a shared *sql.DB for the given user, creating one if needed.
// The caller must call releaseFileDB when the connection is done.
func (s *Server) acquireFileDB(username string, passthrough bool) (*sql.DB, error) {
s.fileDBsMu.Lock()
defer s.fileDBsMu.Unlock()

if entry, ok := s.fileDBs[username]; ok {
entry.refs++
slog.Debug("Reusing file-backed DuckDB.", "user", username, "refs", entry.refs)
return entry.db, nil
}

// First connection for this user — create and configure the DB.
var db *sql.DB
var err error
if passthrough {
db, err = CreatePassthroughDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion)
} else {
db, err = CreateDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion)
}
if err != nil {
return nil, err
}

// Override openBaseDB's single-connection limits. Multiple client connections
// share this *sql.DB, so we need enough pooled connections for concurrency.
db.SetMaxOpenConns(0) // unlimited
db.SetMaxIdleConns(4)

s.fileDBs[username] = &fileDBEntry{db: db, refs: 1}
return db, nil
}

// releaseFileDB decrements the reference count for a shared file-backed DB.
// When the last reference is released, the DB is closed and removed from the pool.
func (s *Server) releaseFileDB(username string) {
s.fileDBsMu.Lock()
defer s.fileDBsMu.Unlock()

entry, ok := s.fileDBs[username]
if !ok {
return
}
entry.refs--
if entry.refs <= 0 {
if err := entry.db.Close(); err != nil {
slog.Warn("Failed to close shared file-backed DuckDB.", "user", username, "error", err)
}
delete(s.fileDBs, username)
slog.Debug("Closed file-backed DuckDB (last reference released).", "user", username)
}
}

// openBaseDB creates and configures a DuckDB connection with threads, memory
// limit, temp directory, extensions, and cache_httpfs settings.
// This shared setup is used by both regular and passthrough connections.
//
// When FilePersistence is enabled and DataDir is set, the database is file-backed
// at <DataDir>/<username>.duckdb. DuckDB memory-maps the file and serves queries
// from RAM, so performance is close to in-memory while data persists across restarts.
func openBaseDB(cfg Config, username string) (*sql.DB, error) {
db, err := sql.Open("duckdb", ":memory:")
dsn := ":memory:"
if cfg.FilePersistence && cfg.DataDir != "" && username != "" {
// Reject usernames that could escape DataDir via path traversal.
if strings.ContainsAny(username, "/\\") || strings.Contains(username, "..") {
return nil, fmt.Errorf("invalid username for file persistence: %q", username)
}
dsn = filepath.Join(cfg.DataDir, username+".duckdb")
// Ensure the data directory exists before opening the file.
if err := os.MkdirAll(cfg.DataDir, 0750); err != nil {
return nil, fmt.Errorf("failed to create data directory %s: %w", cfg.DataDir, err)
}
slog.Info("Opening file-backed DuckDB.", "path", dsn)
}
db, err := sql.Open("duckdb", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open duckdb: %w", err)
}
Expand Down
Loading
Loading