Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,13 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
passthroughUser = resolution.Passthrough
defaultCatalog = resolution.DefaultCatalog
// From here on, `database` reflects the effective routing database.
// This is what gets passed to the worker as the logical database
// (drives the `current_database()` macro and pg_database view) so
// observability surfaces the actual routing decision.
// It is passed to the worker as the logical database name (the
// logical-catalog transform rewrites <database>.schema.table ->
// ducklake.schema.table). Note: for DuckLake-backed sessions the
// client-visible current_database()/pg_database report the stable
// physical catalog "ducklake" (see sessionmeta.ReportedDatabaseName),
// not this routing name; org identity for observability comes from
// orgID, which is logged separately.
database = effectiveDatabase
} else {
// Single-tenant: static users map
Expand Down Expand Up @@ -1135,21 +1139,31 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) {
var duckLakeAttached bool
if !passthroughUser {
initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout)
if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, executor, database); err != nil {
// Detect DuckLake attachment first so we can report a stable catalog
// name. DuckLake-backed sessions report the physical catalog name
// ("ducklake") as current_database() regardless of tenancy, so
// catalog-keyed tools (e.g. SQLMesh) see a consistent name across
// single-/multi-tenant and across a single->multi migration; the
// connection dbname still works as an alias via the logical-catalog
// transform. Non-DuckLake sessions (e.g. iceberg-only) keep reporting
// their own dbname.
duckLakeAttached, err = sessionmeta.HasAttachedCatalog(initCtx, executor, physicalDuckLakeCatalog)
if err != nil {
initCancel()
slog.Error("Failed to initialize session database metadata.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod)
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to initialize session database metadata")
slog.Error("Failed to detect ducklake catalog attachment.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod)
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect ducklake catalog attachment")
_ = writer.Flush()
return
}
duckLakeAttached, err = sessionmeta.HasAttachedCatalog(initCtx, executor, "ducklake")
initCancel()
if err != nil {
slog.Error("Failed to detect ducklake catalog attachment.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod)
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect ducklake catalog attachment")
reportedDatabase := sessionmeta.ReportedDatabaseName(database, defaultCatalog, duckLakeAttached)
if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, executor, reportedDatabase); err != nil {
initCancel()
slog.Error("Failed to initialize session database metadata.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod)
_ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to initialize session database metadata")
_ = writer.Flush()
return
}
initCancel()

// Apply the effective connect-time session default AFTER metadata init.
// It must run here, not on the worker at session create: (1)
Expand Down
24 changes: 16 additions & 8 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *clientConn) newTranspiler(convertPlaceholders bool) *transpiler.Transpi
return transpiler.New(transpiler.Config{
DuckLakeMode: c.server.cfg.DuckLake.MetadataStore != "" || c.server.cfg.AlwaysDuckLake,
LogicalDatabaseName: c.database,
PhysicalCatalogName: "ducklake",
PhysicalCatalogName: physicalDuckLakeCatalog,
ConvertPlaceholders: convertPlaceholders,
})
}
Expand Down Expand Up @@ -994,17 +994,25 @@ func (c *clientConn) serve() error {
initTimeout = DefaultSessionInitTimeout
}
initCtx, initCancel := context.WithTimeout(context.Background(), initTimeout)
if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, c.executor, c.database); err != nil {
// Detect DuckLake attachment first so we can report a stable catalog
// name. When DuckLake-backed, report the physical catalog name
// ("ducklake") as current_database() rather than the connection dbname
// (see sessionmeta.ReportedDatabaseName); the dbname still works as an
// alias via the logical-catalog transform.
duckLakeAttached, err := sessionmeta.HasAttachedCatalog(initCtx, c.executor, physicalDuckLakeCatalog)
if err != nil {
initCancel()
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err))
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to detect ducklake catalog attachment: %v", err))
return err
}
duckLakeAttached, err := sessionmeta.HasAttachedCatalog(initCtx, c.executor, "ducklake")
initCancel()
if err != nil {
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to detect ducklake catalog attachment: %v", err))
// Standalone has no per-user configured default catalog, so pass "".
reportedDatabase := sessionmeta.ReportedDatabaseName(c.database, "", duckLakeAttached)
if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, c.executor, reportedDatabase); err != nil {
initCancel()
c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err))
return err
}
initCancel()
c.logicalCatalogMapping = duckLakeAttached
}

Expand Down Expand Up @@ -1711,7 +1719,7 @@ func (c *clientConn) queryWithArgsWithMetadata(ctx context.Context, query string
}

// physicalDuckLakeCatalog is the physical catalog name DuckLake is attached as.
const physicalDuckLakeCatalog = "ducklake"
const physicalDuckLakeCatalog = sessionmeta.PhysicalDuckLakeCatalog

// executeSelectQuery runs a result-returning query against DuckDB and streams results to the client.
// Sends RowDescription, DataRow messages, CommandComplete, and ReadyForQuery.
Expand Down
39 changes: 39 additions & 0 deletions server/sessionmeta/sessionmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,45 @@ func InitSessionDatabaseMetadata(ctx context.Context, executor sqlcore.QueryExec
return nil
}

// PhysicalDuckLakeCatalog is the physical catalog name DuckLake is attached as
// on a worker. It is the stable, deployment-independent name that the
// logical-catalog transform rewrites client references to.
const PhysicalDuckLakeCatalog = "ducklake"

// ReportedDatabaseName returns the client-visible catalog name to install as
// current_database()/pg_database for a session, in precedence order:
//
// 1. A configured default catalog (e.g. "iceberg") wins. Such a session's
// connect-time search_path/USE is pointed at that catalog, so it is the
// catalog its queries resolve against and therefore what catalog-keyed
// tools must see as current_database().
//
// 2. Otherwise, a DuckLake-backed session reports the stable physical catalog
// name ("ducklake") rather than the per-connection startup dbname,
// regardless of tenancy. The startup dbname still works as an alias: the
// logical-catalog transform rewrites <dbname>.schema.table ->
// ducklake.schema.table and the USE rewriter maps USE "<dbname>" ->
// ducklake.main, while ducklake.* references resolve directly. Reporting a
// stable name keeps tools that key on the catalog — notably SQLMesh, which
// fully-qualifies every model as catalog.schema.object and persists that in
// its state — from treating a changed connection dbname as a brand-new
// warehouse (which would trigger a full rebuild). Because the name is the
// same across single- and multi-tenant deployments, an org graduating from
// a single-tenant duckling to a multi-tenant worker pod keeps the same
// catalog identity and does not churn.
//
// 3. Otherwise (no default catalog and no DuckLake attached — e.g. plain
// DuckDB), report the connection dbname unchanged.
func ReportedDatabaseName(startupDatabase, defaultCatalog string, duckLakeBacked bool) string {
if defaultCatalog != "" {
return defaultCatalog
}
if duckLakeBacked {
return PhysicalDuckLakeCatalog
}
return startupDatabase
}

func HasAttachedCatalog(ctx context.Context, executor sqlcore.QueryExecutor, catalog string) (bool, error) {
query := fmt.Sprintf(
"SELECT COUNT(*) FROM duckdb_databases() WHERE database_name = %s",
Expand Down
61 changes: 61 additions & 0 deletions server/sessionmeta/sessionmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,64 @@ func TestInformationSchemaColumnsCompatLoadedIcebergColumnsKeepIcebergCatalog(t
t.Fatalf("loaded Iceberg columns should not use current_database() as table_catalog in:\n%s", got)
}
}

func TestReportedDatabaseName(t *testing.T) {
tests := []struct {
name string
startup string
defaultCatalog string
duckLakeBacked bool
want string
}{
{
name: "ducklake-backed reports stable physical catalog",
startup: "portola",
duckLakeBacked: true,
want: PhysicalDuckLakeCatalog,
},
{
name: "ducklake-backed with ducklake dbname is unchanged",
startup: "ducklake",
duckLakeBacked: true,
want: PhysicalDuckLakeCatalog,
},
{
// A multi-tenant org reports the same stable name as the same org
// would on a single-tenant duckling, so a single->multi migration
// keeps the catalog identity and does not churn catalog-keyed tools.
name: "multi-tenant ducklake org reports the same stable catalog",
startup: "acme",
duckLakeBacked: true,
want: PhysicalDuckLakeCatalog,
},
{
// A configured default catalog wins over DuckLake: the session's
// search_path/USE points at iceberg, so current_database() must too.
name: "iceberg default catalog wins over ducklake",
startup: "acme",
defaultCatalog: "iceberg",
duckLakeBacked: true,
want: "iceberg",
},
{
name: "iceberg default catalog without ducklake reports iceberg",
startup: "acme",
defaultCatalog: "iceberg",
duckLakeBacked: false,
want: "iceberg",
},
{
name: "non-ducklake, no default catalog keeps connection dbname",
startup: "analytics",
duckLakeBacked: false,
want: "analytics",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if got := ReportedDatabaseName(tc.startup, tc.defaultCatalog, tc.duckLakeBacked); got != tc.want {
t.Fatalf("ReportedDatabaseName(%q, %q, %v) = %q, want %q", tc.startup, tc.defaultCatalog, tc.duckLakeBacked, got, tc.want)
}
})
}
}
20 changes: 12 additions & 8 deletions tests/integration/logical_catalog_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@ func TestPgwireLogicalCatalogMapping(t *testing.T) {
_, _ = db.Exec("DROP TABLE IF EXISTS ducklake.main.logical_catalog_mapping_test")
})

// DuckLake-backed sessions report the stable physical catalog name
// ("ducklake") as current_database()/pg_database, regardless of the
// connection dbname ("test"). The connection dbname still works as a
// write alias (exercised below).
var currentDB string
if err := db.QueryRow("SELECT current_database()").Scan(&currentDB); err != nil {
t.Fatalf("query current_database(): %v", err)
}
if currentDB != "test" {
t.Fatalf("expected current_database() = %q, got %q", "test", currentDB)
if currentDB != "ducklake" {
t.Fatalf("expected current_database() = %q, got %q", "ducklake", currentDB)
}

var datname string
if err := db.QueryRow("SELECT datname FROM pg_database WHERE datname = current_database()").Scan(&datname); err != nil {
t.Fatalf("query pg_database/current_database: %v", err)
}
if datname != "test" {
t.Fatalf("expected pg_database row %q, got %q", "test", datname)
if datname != "ducklake" {
t.Fatalf("expected pg_database row %q, got %q", "ducklake", datname)
}

mustExec(t, db, "CREATE TABLE test.public.logical_catalog_mapping_test (id INTEGER)")
Expand All @@ -52,8 +56,8 @@ func TestPgwireLogicalCatalogMapping(t *testing.T) {
`).Scan(&tableCatalog); err != nil {
t.Fatalf("query information_schema.tables: %v", err)
}
if tableCatalog != "test" {
t.Fatalf("expected information_schema.tables.table_catalog = %q, got %q", "test", tableCatalog)
if tableCatalog != "ducklake" {
t.Fatalf("expected information_schema.tables.table_catalog = %q, got %q", "ducklake", tableCatalog)
}

var schemaCatalog string
Expand All @@ -66,7 +70,7 @@ func TestPgwireLogicalCatalogMapping(t *testing.T) {
`).Scan(&schemaCatalog); err != nil {
t.Fatalf("query information_schema.schemata: %v", err)
}
if schemaCatalog != "test" {
t.Fatalf("expected information_schema.schemata.catalog_name = %q, got %q", "test", schemaCatalog)
if schemaCatalog != "ducklake" {
t.Fatalf("expected information_schema.schemata.catalog_name = %q, got %q", "ducklake", schemaCatalog)
}
}
26 changes: 15 additions & 11 deletions tests/integration/logical_database_catalog_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,25 @@ func TestPgwireLogicalDatabaseCatalogMapping(t *testing.T) {
_, _ = db.Exec(fmt.Sprintf(`DROP SCHEMA IF EXISTS ducklake.%s CASCADE`, logicalSchema))
})

t.Run("metadata reports logical database", func(t *testing.T) {
// DuckLake-backed sessions report the stable physical catalog name
// ("ducklake") via current_database()/pg_database/information_schema,
// regardless of the connection dbname ("duckgres"). The connection dbname
// still works as a write alias (exercised in the subtests below).
t.Run("metadata reports stable physical catalog", func(t *testing.T) {
var currentDB string
if err := db.QueryRow("SELECT current_database()").Scan(&currentDB); err != nil {
t.Fatalf("query current_database(): %v", err)
}
if currentDB != "duckgres" {
t.Fatalf("current_database() = %q, want %q", currentDB, "duckgres")
if currentDB != "ducklake" {
t.Fatalf("current_database() = %q, want %q", currentDB, "ducklake")
}

var datname string
if err := db.QueryRow("SELECT datname FROM pg_catalog.pg_database WHERE datname = current_database()").Scan(&datname); err != nil {
t.Fatalf("query pg_database/current_database: %v", err)
}
if datname != "duckgres" {
t.Fatalf("pg_database datname = %q, want %q", datname, "duckgres")
if datname != "ducklake" {
t.Fatalf("pg_database datname = %q, want %q", datname, "ducklake")
}

if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE duckgres.public.%s (id INTEGER)`, metadataProbeTable)); err != nil {
Expand All @@ -72,8 +76,8 @@ func TestPgwireLogicalDatabaseCatalogMapping(t *testing.T) {
)).Scan(&tableCatalog); err != nil {
t.Fatalf("query information_schema.tables: %v", err)
}
if tableCatalog != "duckgres" {
t.Fatalf("information_schema.tables table_catalog = %q, want %q", tableCatalog, "duckgres")
if tableCatalog != "ducklake" {
t.Fatalf("information_schema.tables table_catalog = %q, want %q", tableCatalog, "ducklake")
}

var columnCatalog string
Expand All @@ -83,16 +87,16 @@ func TestPgwireLogicalDatabaseCatalogMapping(t *testing.T) {
)).Scan(&columnCatalog); err != nil {
t.Fatalf("query information_schema.columns: %v", err)
}
if columnCatalog != "duckgres" {
t.Fatalf("information_schema.columns table_catalog = %q, want %q", columnCatalog, "duckgres")
if columnCatalog != "ducklake" {
t.Fatalf("information_schema.columns table_catalog = %q, want %q", columnCatalog, "ducklake")
}

var schemaCatalog string
if err := db.QueryRow("SELECT catalog_name FROM information_schema.schemata WHERE schema_name = 'public' LIMIT 1").Scan(&schemaCatalog); err != nil {
t.Fatalf("query information_schema.schemata: %v", err)
}
if schemaCatalog != "duckgres" {
t.Fatalf("information_schema.schemata catalog_name = %q, want %q", schemaCatalog, "duckgres")
if schemaCatalog != "ducklake" {
t.Fatalf("information_schema.schemata catalog_name = %q, want %q", schemaCatalog, "ducklake")
}
})

Expand Down
Loading
Loading