Skip to content

Commit f0b22cc

Browse files
committed
feat(sea): forward queryTags through executeStatement to the napi binding
Threads the public `ExecuteStatementOptions.queryTags` through the SEA backend into the kernel's per-statement Spark conf overlay. JS-side adapter path: - Serialises the `Record<string, string | null | undefined>` tag map via the existing `serializeQueryTags` util — same wire shape the Thrift backend produces (`k1:v1,k2:v2`, null values as bare keys). - Writes the serialised string into `statementConf["query_tags"]` on the napi `ExecuteOptions` (defined on the kernel side in `napi/src/connection.rs`). - Pre-serialising at the JS layer (instead of via the napi binding's own `queryTags` field) preserves null-valued tags, which a `HashMap<String, String>` over the FFI boundary cannot carry. Why match Thrift's exact wire shape: SEA and Thrift hit the same server with the same `confOverlay.query_tags` key. A consumer porting from Thrift to SEA gets byte-equivalent tagging behaviour without any code change on their side. Plumbing changes: - `SeaNativeLoader.ts` — `SeaNativeConnection.executeStatement` now accepts an optional `SeaNativeExecuteOptions` second arg with `statementConf` + `queryTags` fields, mirroring the napi-generated `ExecuteOptions` interface. - `SeaSessionBackend.ts` — adapter call site builds `nativeOptions` from the public `queryTags` and forwards it on the napi call. Existing `namedParameters`/`ordinalParameters`/`queryTimeout` deferred errors stay in place. - `native/sea/index.d.ts` — adds `ExecuteOptions` and updates `Connection.executeStatement` signature to take the optional second arg. Mirrors the napi-rs codegen from kernel branch `msrathore/krn-statement-options`. Tests: - `tests/unit/sea/execution-query-tags.test.ts` — 8 cases covering omission/empty, single + multiple tags, null/undefined-valued tags as bare keys, special-char escaping in keys and values. Deferred (separate PRs): - Positional / named parameters (TypedValue mapping over napi) - Row limit, wait timeout (need kernel `StatementSpec` extension) - Async execute (`submit()` + `await_result()` — needs new ExecutedAsyncStatement napi wrapper) Cross-PR dependency: requires kernel branch `msrathore/krn-statement-options` (commit `79bba34`), which is stacked on `msrathore/krn-max-connections`. This branch stacks on `msrathore/sea-max-connections`. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 3e7852f commit f0b22cc

4 files changed

Lines changed: 229 additions & 7 deletions

File tree

lib/sea/SeaNativeLoader.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,43 @@ export interface SeaNativeStatement {
6464
close(): Promise<void>;
6565
}
6666

67+
/**
68+
* Per-statement options for the napi `Connection.executeStatement`.
69+
* Mirrors the napi-rs-generated `ExecuteOptions` in
70+
* `native/sea/index.d.ts`. Declared locally to avoid coupling the
71+
* JS-side adapter to the auto-generated file.
72+
*
73+
* - `statementConf` — per-statement Spark conf overlay merged on top
74+
* of the session-level `sessionConf` at execute time. The map wins
75+
* on key collisions.
76+
* - `queryTags` — the napi binding accepts a `Record<string, string>`
77+
* and serialises it into `statementConf["query_tags"]` matching
78+
* NodeJS Thrift's `serializeQueryTags` wire shape. The JS-side
79+
* adapter today pre-serialises via the existing
80+
* `serializeQueryTags` helper and writes the result into
81+
* `statementConf` directly (so null-valued tags carry through),
82+
* so this field is not used by the SEA backend's adapter call
83+
* site; it is declared because the napi binding exports it and
84+
* alternate consumers may use it.
85+
*/
86+
export interface SeaNativeExecuteOptions {
87+
statementConf?: Record<string, string>;
88+
queryTags?: Record<string, string>;
89+
}
90+
6791
/**
6892
* Typed surface for the opaque napi `Connection` handle.
6993
*/
7094
export interface SeaNativeConnection {
7195
/**
7296
* Execute a SQL statement. Catalog / schema / sessionConf are
7397
* session-level — set on `openSession`, applied to every statement
74-
* executed on the resulting `Connection`. No per-statement options.
98+
* executed on the resulting `Connection`.
99+
*
100+
* `options` is optional; today carries `statementConf`
101+
* (per-statement Spark conf overlay) and `queryTags`.
75102
*/
76-
executeStatement(sql: string): Promise<SeaNativeStatement>;
103+
executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise<SeaNativeStatement>;
77104
close(): Promise<void>;
78105
}
79106

lib/sea/SeaSessionBackend.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import HiveDriverError from '../errors/HiveDriverError';
3434
import { SeaNativeConnection } from './SeaNativeLoader';
3535
import { decodeNapiKernelError } from './SeaErrorMapping';
3636
import SeaOperationBackend from './SeaOperationBackend';
37+
import { serializeQueryTags } from '../utils';
3738

3839
export interface SeaSessionBackendOptions {
3940
/** The opaque napi `Connection` handle returned by `openSession`. */
@@ -116,9 +117,22 @@ export default class SeaSessionBackend implements ISessionBackend {
116117
);
117118
}
118119

120+
// Build the per-statement conf overlay. Today only `queryTags` is
121+
// surfaced on the public `ExecuteStatementOptions` (mirrors Thrift);
122+
// pre-serialise on the JS side via the existing
123+
// `serializeQueryTags` helper so the kernel-side conf overlay
124+
// shape exactly matches the Thrift wire bytes for the same input.
125+
// Doing the serialisation here (instead of inside the napi `queryTags`
126+
// field) is what carries null-valued tags through correctly — napi's
127+
// `HashMap<String, String>` can't represent nulls.
128+
const serializedQueryTags = serializeQueryTags(options.queryTags);
129+
const statementConf =
130+
serializedQueryTags !== undefined ? { query_tags: serializedQueryTags } : undefined;
131+
const nativeOptions = statementConf !== undefined ? { statementConf } : undefined;
132+
119133
let nativeStatement;
120134
try {
121-
nativeStatement = await this.connection.executeStatement(statement);
135+
nativeStatement = await this.connection.executeStatement(statement, nativeOptions);
122136
} catch (err) {
123137
throw decodeNapiKernelError(err);
124138
}

native/sea/index.d.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,22 @@ export interface ConnectionOptions {
7171
* to camelCase for free functions).
7272
*/
7373
export declare function openSession(options: ConnectionOptions): Promise<Connection>
74+
/**
75+
* Per-statement options for `Connection.executeStatement`.
76+
* Mirrors the napi-rs-generated `ExecuteOptions` in
77+
* `napi/src/connection.rs`. Today carries:
78+
* - `statementConf` — per-statement Spark conf overlay merged on top
79+
* of the session-level `sessionConf` at execute time. Map wins on
80+
* key collisions.
81+
* - `queryTags` — JSON-encoded into `statementConf["query_tags"]`
82+
* matching NodeJS Thrift's `serializeQueryTags` shape. Passing both
83+
* `queryTags` and an explicit `statementConf["query_tags"]` raises
84+
* `InvalidArgument`.
85+
*/
86+
export interface ExecuteOptions {
87+
statementConf?: Record<string, string>
88+
queryTags?: Record<string, string>
89+
}
7490
/**
7591
* A single Arrow IPC stream payload encoding one record batch (plus
7692
* the schema header so the JS-side reader is stateless).
@@ -108,11 +124,13 @@ export declare class Connection {
108124
* Execute a SQL statement and return a Statement handle that
109125
* streams batches via `fetchNextBatch()`.
110126
*
111-
* No per-statement options: catalog / schema / sessionConf are
112-
* session-level (`openSession`). Positional / named parameters
113-
* land in M1 via `Statement::spec().param(…)` on the kernel.
127+
* Catalog / schema / sessionConf are session-level (`openSession`).
128+
* `options` carries per-statement knobs: `statementConf`
129+
* (per-statement Spark conf overlay) and `queryTags` (serialised
130+
* into `statementConf["query_tags"]` matching NodeJS Thrift's
131+
* `serializeQueryTags` wire shape).
114132
*/
115-
executeStatement(sql: string): Promise<Statement>
133+
executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise<Statement>
116134
/**
117135
* Explicit close. Marks the connection wrapper as closed so
118136
* subsequent calls on this `Connection` return `InvalidArg`, then
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* Unit tests for `SeaSessionBackend.executeStatement` query-tags
17+
* threading. The JS-side adapter pre-serialises the public
18+
* `queryTags: Record<string, string | null | undefined>` map via
19+
* the existing `serializeQueryTags` util (so null-valued tags carry
20+
* through) and writes the result into the napi
21+
* `statementConf["query_tags"]`. The kernel then forwards the conf
22+
* overlay verbatim onto the SEA wire.
23+
*
24+
* These tests verify that the JS adapter constructs the napi options
25+
* shape correctly. End-to-end behaviour against a live warehouse is
26+
* exercised separately in `tests/e2e/sea/`.
27+
*/
28+
29+
import { expect } from 'chai';
30+
import sinon from 'sinon';
31+
import SeaSessionBackend from '../../../lib/sea/SeaSessionBackend';
32+
import {
33+
SeaNativeConnection,
34+
SeaNativeStatement,
35+
SeaNativeExecuteOptions,
36+
} from '../../../lib/sea/SeaNativeLoader';
37+
import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientContext';
38+
import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger';
39+
40+
class FakeNativeStatement implements SeaNativeStatement {
41+
public async fetchNextBatch() {
42+
return null;
43+
}
44+
45+
public async schema() {
46+
return { ipcBytes: Buffer.alloc(0) };
47+
}
48+
49+
public async cancel() {
50+
// no-op
51+
}
52+
53+
public async close() {
54+
// no-op
55+
}
56+
}
57+
58+
function makeFakeContext(): IClientContext {
59+
const logger: IDBSQLLogger = {
60+
log(_level: LogLevel, _message: string): void {
61+
// no-op
62+
},
63+
};
64+
const config = {} as ClientConfig;
65+
return {
66+
getConfig: () => config,
67+
getLogger: () => logger,
68+
getConnectionProvider: () => {
69+
throw new Error('not used');
70+
},
71+
getClient: () => {
72+
throw new Error('not used');
73+
},
74+
getDriver: () => {
75+
throw new Error('not used');
76+
},
77+
} as unknown as IClientContext;
78+
}
79+
80+
describe('SeaSessionBackend — query tags threading', () => {
81+
let executeSpy: sinon.SinonSpy;
82+
let connection: SeaNativeConnection;
83+
let session: SeaSessionBackend;
84+
85+
beforeEach(() => {
86+
const stmt = new FakeNativeStatement();
87+
executeSpy = sinon.spy(async (_sql: string, _options?: SeaNativeExecuteOptions) => stmt);
88+
connection = {
89+
executeStatement: executeSpy,
90+
close: async () => {},
91+
} as unknown as SeaNativeConnection;
92+
session = new SeaSessionBackend({ connection, context: makeFakeContext() });
93+
});
94+
95+
it('omits the napi options arg when queryTags is not set', async () => {
96+
await session.executeStatement('SELECT 1', {});
97+
expect(executeSpy.calledOnce).to.equal(true);
98+
expect(executeSpy.firstCall.args[0]).to.equal('SELECT 1');
99+
expect(executeSpy.firstCall.args[1]).to.equal(undefined);
100+
});
101+
102+
it('omits the napi options arg when queryTags is empty', async () => {
103+
await session.executeStatement('SELECT 1', { queryTags: {} });
104+
expect(executeSpy.firstCall.args[1]).to.equal(undefined);
105+
});
106+
107+
it('forwards a single tag through statementConf["query_tags"]', async () => {
108+
await session.executeStatement('SELECT 1', {
109+
queryTags: { team: 'platform' },
110+
});
111+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
112+
expect(opts.statementConf).to.deep.equal({ query_tags: 'team:platform' });
113+
});
114+
115+
it('forwards multiple tags as comma-separated key:value pairs', async () => {
116+
await session.executeStatement('SELECT 1', {
117+
queryTags: { team: 'platform', env: 'staging' },
118+
});
119+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
120+
expect(opts.statementConf!.query_tags).to.match(
121+
/^(team:platform,env:staging|env:staging,team:platform)$/,
122+
);
123+
});
124+
125+
it('preserves null-valued tags as bare keys (no colon)', async () => {
126+
await session.executeStatement('SELECT 1', {
127+
queryTags: { highPriority: null, team: 'platform' },
128+
});
129+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
130+
const encoded = opts.statementConf!.query_tags;
131+
// Object iteration is insertion-order for string keys; serializeQueryTags
132+
// follows that. Two possible orderings depending on Object.keys order.
133+
expect(encoded).to.match(
134+
/^(highPriority,team:platform|team:platform,highPriority)$/,
135+
);
136+
});
137+
138+
it('preserves undefined-valued tags as bare keys (no colon)', async () => {
139+
await session.executeStatement('SELECT 1', {
140+
queryTags: { highPriority: undefined, team: 'platform' },
141+
});
142+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
143+
expect(opts.statementConf!.query_tags).to.contain('highPriority');
144+
expect(opts.statementConf!.query_tags).to.contain('team:platform');
145+
});
146+
147+
it('escapes special chars (colon, comma, backslash) in values', async () => {
148+
await session.executeStatement('SELECT 1', {
149+
queryTags: { k: 'a:b,c\\d' },
150+
});
151+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
152+
// `:` → `\:`, `,` → `\,`, `\` → `\\`
153+
expect(opts.statementConf!.query_tags).to.equal('k:a\\:b\\,c\\\\d');
154+
});
155+
156+
it('escapes backslashes in keys', async () => {
157+
await session.executeStatement('SELECT 1', {
158+
queryTags: { 'k\\1': 'v' },
159+
});
160+
const opts = executeSpy.firstCall.args[1] as SeaNativeExecuteOptions;
161+
expect(opts.statementConf!.query_tags).to.equal('k\\\\1:v');
162+
});
163+
});

0 commit comments

Comments
 (0)