Skip to content
17 changes: 17 additions & 0 deletions config_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type configCLIInputs struct {
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
MemoryLimit string
Expand Down Expand Up @@ -284,6 +285,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
}
}

cfg.FilePersistence = fileCfg.FilePersistence
cfg.ProcessIsolation = fileCfg.ProcessIsolation
if fileCfg.IdleTimeout != "" {
if d, err := time.ParseDuration(fileCfg.IdleTimeout); err == nil {
Expand Down Expand Up @@ -503,6 +505,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if v := getenv("DUCKGRES_DUCKLAKE_S3_PROFILE"); v != "" {
cfg.DuckLake.S3Profile = v
}
if v := getenv("DUCKGRES_FILE_PERSISTENCE"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.FilePersistence = b
} else {
warn("Invalid DUCKGRES_FILE_PERSISTENCE: " + err.Error())
}
}
if v := getenv("DUCKGRES_DUCKLAKE_MIGRATE"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.DuckLake.Migrate = b
Expand Down Expand Up @@ -782,6 +791,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if cli.Set["key"] {
cfg.TLSKeyFile = cli.KeyFile
}
if cli.Set["file-persistence"] {
cfg.FilePersistence = cli.FilePersistence
}
if cli.Set["process-isolation"] {
cfg.ProcessIsolation = cli.ProcessIsolation
}
Expand Down Expand Up @@ -902,6 +914,11 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
cfg.QueryLog.Enabled = cli.QueryLog
}

if cfg.FilePersistence && cfg.DataDir == "" {
warn("file_persistence is enabled but data_dir is empty; disabling file persistence")
cfg.FilePersistence = false
}

if cfg.ACMEDNSProvider != "" {
provider := strings.ToLower(cfg.ACMEDNSProvider)
if provider != "route53" {
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type FileConfig struct {
RateLimit RateLimitFileConfig `yaml:"rate_limit"`
Extensions []string `yaml:"extensions"`
DuckLake DuckLakeFileConfig `yaml:"ducklake"`
FilePersistence bool `yaml:"file_persistence"` // Persist DuckDB to <data_dir>/<username>.duckdb instead of :memory:
ProcessIsolation bool `yaml:"process_isolation"` // Enable process isolation per connection
IdleTimeout string `yaml:"idle_timeout"` // e.g., "24h", "1h", "-1" to disable
MemoryLimit string `yaml:"memory_limit"` // DuckDB memory_limit per session (e.g., "4GB")
Expand Down Expand Up @@ -208,6 +209,7 @@ func main() {
dataDir := flag.String("data-dir", "", "Directory for DuckDB files (env: DUCKGRES_DATA_DIR)")
certFile := flag.String("cert", "", "TLS certificate file (env: DUCKGRES_CERT)")
keyFile := flag.String("key", "", "TLS private key file (env: DUCKGRES_KEY)")
filePersistence := flag.Bool("file-persistence", false, "Persist DuckDB to <data-dir>/<username>.duckdb instead of in-memory (env: DUCKGRES_FILE_PERSISTENCE)")
processIsolation := flag.Bool("process-isolation", false, "Enable process isolation (spawn child process per connection)")
idleTimeout := flag.String("idle-timeout", "", "Connection idle timeout (e.g., '30m', '1h', '-1' to disable) (env: DUCKGRES_IDLE_TIMEOUT)")
memoryLimit := flag.String("memory-limit", "", "DuckDB memory_limit per session (e.g., '4GB') (env: DUCKGRES_MEMORY_LIMIT)")
Expand Down Expand Up @@ -282,6 +284,7 @@ func main() {
fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_FILE_PERSISTENCE Persist DuckDB to <data_dir>/<username>.duckdb (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_PROCESS_ISOLATION Enable process isolation (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_IDLE_TIMEOUT Connection idle timeout (e.g., 30m, 1h, -1 to disable)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_MEMORY_LIMIT DuckDB memory_limit per session (e.g., 4GB)\n")
Expand Down Expand Up @@ -393,6 +396,7 @@ func main() {
DataDir: *dataDir,
CertFile: *certFile,
KeyFile: *keyFile,
FilePersistence: *filePersistence,
ProcessIsolation: *processIsolation,
IdleTimeout: *idleTimeout,
MemoryLimit: *memoryLimit,
Expand Down
85 changes: 85 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,60 @@ func TestResolveEffectiveConfigACMEDNSProviderValidation(t *testing.T) {
}
}

func TestResolveEffectiveConfigFilePersistenceFromFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
DataDir: "/tmp/data",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(nil), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from YAML to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceFromEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "true",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from env to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceEnvOverridesFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
}
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(env), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected env false to override file true")
}
}

func TestResolveEffectiveConfigFilePersistenceCLIOverridesEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{
Set: map[string]bool{"file-persistence": true},
FilePersistence: true,
}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected CLI true to override env false")
}
}

func TestResolveEffectiveConfigFilePersistenceDefaultFalse(t *testing.T) {
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(nil), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected file_persistence to default to false")
}
}

func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) {
fileCfg := &FileConfig{
TLS: TLSConfig{
Expand Down Expand Up @@ -767,3 +821,34 @@ func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) {
t.Fatalf("expected warning about missing ACME domain for DNS mode, warnings: %v", warns)
}
}

func TestFilePersistenceRequiresDataDir(t *testing.T) {
var warns []string
// Use CLI to explicitly set data-dir to empty, overriding the default.
resolved := resolveEffectiveConfig(
&FileConfig{
FilePersistence: true,
},
configCLIInputs{
Set: map[string]bool{"data-dir": true},
DataDir: "",
},
nil,
func(msg string) { warns = append(warns, msg) },
)

if resolved.Server.FilePersistence {
t.Fatal("expected FilePersistence to be disabled when DataDir is empty")
}

found := false
for _, w := range warns {
if strings.Contains(w, "file_persistence is enabled but data_dir is empty") {
found = true
break
}
}
if !found {
t.Fatalf("expected warning about empty data_dir, warnings: %v", warns)
}
}
73 changes: 58 additions & 15 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ type clientConn struct {
ctx context.Context // connection context, cancelled when connection is closed
cancel context.CancelFunc // cancels the connection context

// sharedDB is true when this connection uses a shared file-persistence DB pool.
// Cleanup differs: we return the pinned conn to the pool instead of closing the DB.
sharedDB bool

// pg_stat_activity fields
backendStart time.Time // when this connection started
applicationName string // from startup params
Expand Down Expand Up @@ -433,6 +437,28 @@ func (c *clientConn) safeCleanupDB() {
}()

cleanupTimeout := 5 * time.Second

if c.sharedDB {
// Shared file-persistence pool: ROLLBACK any open transaction on the
// pinned connection, then return it to the pool. Skip DuckLake DETACH
// since the underlying DB is shared across connections.
if c.txStatus == txStatusTransaction || c.txStatus == txStatusError {
ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
_, err := c.executor.ExecContext(ctx, "ROLLBACK")
cancel()
if err != nil {
slog.Warn("Failed to rollback transaction during cleanup.",
"user", c.username, "error", err)
}
}
// Close returns the pinned *sql.Conn to the pool (does not close the DB).
if err := c.executor.Close(); err != nil {
slog.Warn("Failed to return connection to pool.", "user", c.username, "error", err)
}
c.server.releaseFileDB(c.username)
return
}

connHealthy := true

// Check connection health. For DuckLake, we need to actually run a query that
Expand Down Expand Up @@ -638,23 +664,40 @@ func (c *clientConn) serve() error {
// Create a DuckDB connection for this client session (unless pre-created by caller)
var stopRefresh func()
if c.executor == nil {
var db *sql.DB
var err error
if c.passthrough {
db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion)
if c.server.cfg.FilePersistence {
db, err := c.server.acquireFileDB(c.username, c.passthrough)
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
conn, err := db.Conn(c.ctx)
if err != nil {
c.server.releaseFileDB(c.username)
c.sendError("FATAL", "28000", fmt.Sprintf("failed to get pooled connection: %v", err))
return err
}
c.executor = NewPinnedExecutor(conn, db)
c.sharedDB = true
// Don't start per-connection credential refresh; the pool manages it.
} else {
db, err = c.server.createDBConnection(c.username)
}
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
c.executor = NewLocalExecutor(db)
var db *sql.DB
var err error
if c.passthrough {
db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion)
} else {
db, err = c.server.createDBConnection(c.username)
}
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
c.executor = NewLocalExecutor(db)

// Start background credential refresh for long-lived connections.
// Only needed when we create the DB here; the control plane manages
// refresh for pre-created connections via DBPool.
stopRefresh = StartCredentialRefresh(db, c.server.cfg.DuckLake)
// Start background credential refresh for long-lived connections.
// Only needed when we create the DB here; the control plane manages
// refresh for pre-created connections via DBPool.
stopRefresh = StartCredentialRefresh(db, c.server.cfg.DuckLake)
}
}
// Defers run LIFO: close cursors first (they hold open RowSets), then stop
// credential refresh, then clean up the database connection.
Expand Down
61 changes: 61 additions & 0 deletions server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,67 @@ func (e *LocalExecutor) LastProfilingOutput() string {
return string(data)
}

// PinnedExecutor wraps a pinned *sql.Conn from a shared *sql.DB pool
// to implement QueryExecutor for file-persistence mode.
type PinnedExecutor struct {
conn *sql.Conn
db *sql.DB
}

func NewPinnedExecutor(conn *sql.Conn, db *sql.DB) *PinnedExecutor {
return &PinnedExecutor{conn: conn, db: db}
}

// DB returns the underlying *sql.DB (for credential refresh and other direct access).
func (e *PinnedExecutor) DB() *sql.DB {
return e.db
}

func (e *PinnedExecutor) QueryContext(ctx context.Context, query string, args ...any) (RowSet, error) {
rows, err := e.conn.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &LocalRowSet{rows: rows}, nil
}

func (e *PinnedExecutor) ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error) {
return e.conn.ExecContext(ctx, query, args...)
}

func (e *PinnedExecutor) Query(query string, args ...any) (RowSet, error) {
rows, err := e.conn.QueryContext(context.Background(), query, args...)
if err != nil {
return nil, err
}
return &LocalRowSet{rows: rows}, nil
}

func (e *PinnedExecutor) Exec(query string, args ...any) (ExecResult, error) {
return e.conn.ExecContext(context.Background(), query, args...)
}

func (e *PinnedExecutor) ConnContext(ctx context.Context) (RawConn, error) {
return e.db.Conn(ctx)
}

func (e *PinnedExecutor) PingContext(ctx context.Context) error {
return e.conn.PingContext(ctx)
}

// Close returns the pinned connection to the pool; it does not close the underlying DB.
func (e *PinnedExecutor) Close() error {
return e.conn.Close()
}

func (e *PinnedExecutor) LastProfilingOutput() string {
data, err := os.ReadFile("/tmp/duckgres-profiling.json")
if err != nil {
return ""
}
return string(data)
}

// LocalRowSet wraps *sql.Rows to implement RowSet.
type LocalRowSet struct {
rows *sql.Rows
Expand Down
Loading
Loading