Skip to content

Commit a9347e8

Browse files
committed
sea-abstraction: address PR #378 review-comment fixes (H1 / M1-M4 / L1-L10)
Addresses 15 review findings from the code-review-squad pass on PR #378. L11 (backend kind field on the three interfaces) is deliberately deferred to avoid a cross-stack cascade ripple while the downstream PRs are still in flight. H1 — fetchChunk lost mid-flight failIfClosed regression. Add optional `isClosed?: () => boolean` to IOperationBackend.fetchChunk's options bag. ThriftOperationBackend.fetchChunk probes it after the setTimeout(0) macrotask yield and returns [] when set; the facade's post-fetch failIfClosed then raises the user-visible OperationStateError. Restores the guard that the refactor split across the facade/backend boundary so a cancel/close arriving during the yield window no longer runs the data RPC to completion needlessly. M1 — neutralize WaitUntilReadyOptions callback shape. Introduce IOperationBackendWaitOptions { callback?: (status: OperationStatus) => unknown } on the backend interface. Facade keeps the public Thrift-typed OperationStatusCallback and adapts at the boundary by wrapping the user's callback with synthesizeThriftStatus. ThriftOperationBackend.waitUntilReady consumes the neutral options and passes adaptOperationStatus(response) to the callback. M2 — synthesizeOkStatus maps OperationState to TStatusCode. Add synthesizeStatusFromOperation that returns ERROR_STATUS for Failed/Cancelled/Closed (carrying errorMessage + sqlState) and SUCCESS_STATUS otherwise. Wire it into synthesizeThriftStatus so legacy Status.assert(resp.status) sees the right code on non-Thrift backends. M3 — TelemetryEvent + DriverConfiguration carry a backend tag. Add optional backend?: 'thrift' | 'sea' | 'kernel' on both interfaces so dashboards can slice latency/error rate by backend without a metrics-schema migration once non-Thrift emission goes live. M4 — test coverage for the synthesize helpers + useSEA failure path. New tests/unit/thrift-backend/wireSynthesis.test.ts covering all OperationState/ResultFormat mappings, ERROR_STATUS carries errorMessage/sqlState, hasResultSet round-trip, schema/arrowSchema/ lz4Compressed/isStagingOperation preservation, and the L3 throw on unknown ResultFormat. New test in DBSQLClient.test.ts asserts that a useSEA:true connect failure leaves this.backend === undefined and the next openSession() surfaces "not connected" rather than the SeaBackend's "not implemented" error. L1 — forwardConnectionEvent normalizes payload to Error. Replace `payload as Error` with `payload instanceof Error ? payload : new Error(String(payload))` so a backend that emits a non-Error through the cross-backend onConnectionEvent doesn't crash the logger.log call. L2 — DBSQLClient.connect publishes this.backend only on success. Construct the backend locally, await connect() in a try/catch, run a best-effort backend.close() (per IBackend.close()'s safe-on-partial-init contract) and rethrow on failure. Only assign this.backend after a clean connect so a failed connect surfaces "DBSQLClient: not connected" on the next openSession. L3 — resultFormatToThrift throws on unknown ResultFormat. Replace the silent default fallback to COLUMN_BASED_SET with a HiveDriverError. Prevents a future ResultFormat enum extension from silently routing results through JsonResultHandler and surfacing garbled rows. L4 — DBSQLOperation.getMetadata carries @deprecated. Adds the canonical TypeScript JSDoc tag so IDEs (strikethrough), tsc, ESLint plugins, and agentic codegen pick up the soft deprecation in favour of getResultMetadata. L5 — numberToInt64 re-export carries @deprecated. Re-export through a named const with a JSDoc block (rather than a bare `export { ... } from`) so the @deprecated tag attaches to the symbol consumers see in their IDE / .d.ts. L6 — DBSQLSession.runBackend helper. Collapse 11 duplicated `failIfClosed → backend.X → failIfClosed` brackets into a single private runBackend<T>(fn) so the open-flag-before-and-after contract has a name and can't be forgotten in a new delegation method. L7 — restore three why-comments deleted from DBSQLSession. Staging-detection invariant in executeStatement, AWS-vs-Azure 404 difference on staging-remove, and the Content-Length-required note on staging-upload. Verbatim from main; these document non-obvious intentional behaviour the refactor inadvertently dropped. L8 — hasResultSet becomes a method on IOperationBackend. The value is state-dependent (the Thrift impl mutates the underlying operation handle inside processOperationStatusResponse), so the property+readonly+disclaimer-JSDoc pattern was misleading. Method form makes the live-read semantics obvious to a fresh implementer. 3 facade call sites updated. L9 — wireSynthesis moves under thrift-backend. The file imports Thrift IDL types and produces Thrift-typed values; it belongs next to ThriftOperationBackend, not in the neutral lib/utils/ tree where it would creep into the dependency cone of future backend-neutral helpers. Same reasoning that placed numberToInt64 and getDirectResultsOptions under thrift-backend/. L10 — interface-level downcast policy. Add a JSDoc paragraph on IOperationBackend grandfathering the two existing `instanceof ThriftOperationBackend` downcasts in DBSQLOperation.status/getMetadata and prohibiting new ones. Future zero-loss back-compat needs should extend the interface (or add an optional method) rather than spawn a per-backend branch matrix. Gates: yarn build (exit 0), yarn lint (0 errors, 3 pre-existing warnings in tests/e2e/protocol_versions.test.ts), yarn test on touched files (163 passing, +12 net new tests from M4 work; 2 failures pre- existing on PR head unchanged: getSchema-directResults and the LZ4-cloud-fetch flag — both flagged in the team-lead playbook as known prior regressions). Cascade implications for downstream PRs (#380 #377 #379 #382 #381 #384 #383): L8 converts hasResultSet from a property to a method, M1 swaps WaitUntilReadyOptions for IOperationBackendWaitOptions on the backend interface. Both are mechanical renames at downstream backend impls when they rebase. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 8a22d54 commit a9347e8

9 files changed

Lines changed: 378 additions & 78 deletions

File tree

lib/DBSQLClient.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,22 +242,43 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
242242
// doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")`
243243
// pattern (see databricks-sql-python/src/databricks/sql/session.py).
244244
const internalOptions = options as ConnectionOptions & InternalConnectionOptions;
245-
this.backend = internalOptions.useSEA
245+
const backend = internalOptions.useSEA
246246
? new SeaBackend()
247247
: new ThriftBackend({
248248
context: this,
249249
onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload),
250250
});
251251

252-
await this.backend.connect(options);
252+
// Publish `this.backend` only after a successful `connect()`. Otherwise a
253+
// failed connect would leave a half-initialized backend in place, and the
254+
// next `openSession()` would slip past the `!this.backend` guard and
255+
// surface a misleading "backend not implemented" / partial-state error
256+
// instead of the accurate "DBSQLClient: not connected".
257+
try {
258+
await backend.connect(options);
259+
} catch (err) {
260+
// `IBackend.close()` is documented as safe on a partially-initialized
261+
// backend; best-effort cleanup so we don't leak sockets / state.
262+
try {
263+
await backend.close();
264+
} catch (closeErr) {
265+
// Swallow; the original error is what the caller needs to see.
266+
}
267+
throw err;
268+
}
269+
this.backend = backend;
253270

254271
return this;
255272
}
256273

257274
private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void {
258275
switch (event) {
259276
case 'error': {
260-
const error = payload as Error;
277+
// `payload` is typed `unknown` because the cross-backend
278+
// `IBackend.onConnectionEvent` doesn't constrain the error shape.
279+
// Normalize to `Error` so the stack/name/message access below is safe
280+
// for any backend that emits a non-Error value (e.g. a bare string).
281+
const error = payload instanceof Error ? payload : new Error(String(payload));
261282
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
262283
try {
263284
this.emit('error', error);

lib/DBSQLOperation.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import { LogLevel } from './contracts/IDBSQLLogger';
1919
import OperationStateError, { OperationStateErrorCode } from './errors/OperationStateError';
2020
import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator';
2121
import IClientContext from './contracts/IClientContext';
22-
import IOperationBackend from './contracts/IOperationBackend';
22+
import IOperationBackend, { IOperationBackendWaitOptions } from './contracts/IOperationBackend';
2323
import { ResultMetadata } from './contracts/ResultMetadata';
2424
import ThriftOperationBackend from './thrift-backend/ThriftOperationBackend';
25-
import { synthesizeThriftStatus, synthesizeThriftResultSetMetadata } from './utils/thriftWireSynthesis';
25+
import { synthesizeThriftStatus, synthesizeThriftResultSetMetadata } from './thrift-backend/wireSynthesis';
2626

2727
interface DBSQLOperationConstructorOptions {
2828
backend: IOperationBackend;
@@ -114,7 +114,7 @@ export default class DBSQLOperation implements IOperation {
114114
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
115115
await this.failIfClosed();
116116

117-
if (!this.backend.hasResultSet) {
117+
if (!this.backend.hasResultSet()) {
118118
return [];
119119
}
120120

@@ -123,7 +123,11 @@ export default class DBSQLOperation implements IOperation {
123123

124124
const defaultMaxRows = this.context.getConfig().fetchChunkDefaultMaxRows;
125125
const limit = options?.maxRows ?? defaultMaxRows;
126-
const result = await this.backend.fetchChunk({ limit, disableBuffering: options?.disableBuffering });
126+
const result = await this.backend.fetchChunk({
127+
limit,
128+
disableBuffering: options?.disableBuffering,
129+
isClosed: () => this.closed || this.cancelled,
130+
});
127131
await this.failIfClosed();
128132

129133
this.context.getLogger().log(LogLevel.debug, `Fetched chunk of size: ${limit} from operation with id: ${this.id}`);
@@ -192,7 +196,7 @@ export default class DBSQLOperation implements IOperation {
192196
return false;
193197
}
194198

195-
if (this.backend.hasResultSet) {
199+
if (this.backend.hasResultSet()) {
196200
await this.waitUntilReadyThroughBackend();
197201
}
198202

@@ -202,7 +206,7 @@ export default class DBSQLOperation implements IOperation {
202206
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
203207
await this.failIfClosed();
204208

205-
if (!this.backend.hasResultSet) {
209+
if (!this.backend.hasResultSet()) {
206210
return null;
207211
}
208212

@@ -227,7 +231,9 @@ export default class DBSQLOperation implements IOperation {
227231
* fields (`cacheLookupResult`, `uncompressedBytes`, `compressedBytes`,
228232
* `status`) left undefined / defaulted.
229233
*
230-
* Prefer {@link DBSQLOperation.getResultMetadata} in new code.
234+
* @deprecated Use {@link DBSQLOperation.getResultMetadata}; this method
235+
* synthesizes Thrift-only fields as `undefined` on non-Thrift backends and
236+
* couples callers to the Thrift wire shape.
231237
*/
232238
public async getMetadata(): Promise<TGetResultSetMetadataResp> {
233239
await this.failIfClosed();
@@ -248,8 +254,21 @@ export default class DBSQLOperation implements IOperation {
248254
}
249255

250256
private async waitUntilReadyThroughBackend(options?: WaitUntilReadyOptions) {
257+
// The backend-facing `waitUntilReady` takes a neutral
258+
// `IOperationBackendWaitOptions` whose `callback` receives an
259+
// `OperationStatus`. The public `WaitUntilReadyOptions.callback` is
260+
// Thrift-shaped — synthesize the wire response from the neutral status
261+
// at this boundary so the backend impl doesn't have to know about Thrift
262+
// IDL.
263+
const userCallback = options?.callback;
264+
const backendOptions: IOperationBackendWaitOptions = {
265+
progress: options?.progress,
266+
callback: userCallback
267+
? (status) => userCallback(synthesizeThriftStatus(status))
268+
: undefined,
269+
};
251270
try {
252-
await this.backend.waitUntilReady(options);
271+
await this.backend.waitUntilReady(backendOptions);
253272
} catch (err) {
254273
// Reflect terminal states back into facade flags so subsequent calls
255274
// short-circuit via failIfClosed().

lib/DBSQLSession.ts

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,20 @@ import StagingError from './errors/StagingError';
2626
import IClientContext from './contracts/IClientContext';
2727
import ISessionBackend from './contracts/ISessionBackend';
2828
import IOperationBackend from './contracts/IOperationBackend';
29+
import { numberToInt64 as numberToInt64Impl } from './thrift-backend/ThriftSessionBackend';
2930

3031
// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
3132
const pipeline = util.promisify(stream.pipeline);
3233

33-
// Re-export for back-compat with existing imports.
34-
export { numberToInt64 } from './thrift-backend/ThriftSessionBackend';
34+
/**
35+
* Convert a JS number to a Thrift-wire `node-int64`.
36+
*
37+
* @deprecated Thrift-only utility re-exported for back-compat with existing
38+
* external consumers. Backends other than Thrift do not use `node-int64`;
39+
* new code should not import this from `DBSQLSession`. It will be removed
40+
* when the public API stops exposing Thrift wire types.
41+
*/
42+
export const numberToInt64 = numberToInt64Impl;
3543

3644
interface DBSQLSessionConstructorOptions {
3745
backend: ISessionBackend;
@@ -68,10 +76,7 @@ export default class DBSQLSession implements IDBSQLSession {
6876
* const response = await session.getInfo(thrift.TCLIService_types.TGetInfoType.CLI_DBMS_VER);
6977
*/
7078
public async getInfo(infoType: number): Promise<InfoValue> {
71-
await this.failIfClosed();
72-
const result = await this.backend.getInfo(infoType);
73-
await this.failIfClosed();
74-
return result;
79+
return this.runBackend(() => this.backend.getInfo(infoType));
7580
}
7681

7782
/**
@@ -84,12 +89,16 @@ export default class DBSQLSession implements IDBSQLSession {
8489
* const operation = await session.executeStatement(query);
8590
*/
8691
public async executeStatement(statement: string, options: ExecuteStatementOptions = {}): Promise<IOperation> {
87-
await this.failIfClosed();
88-
const opBackend = await this.backend.executeStatement(statement, options);
89-
await this.failIfClosed();
92+
const opBackend = await this.runBackend(() => this.backend.executeStatement(statement, options));
9093
const operation = this.wrapOperation(opBackend);
9194

92-
// Staging detection: only run when stagingAllowedLocalPath is provided.
95+
// If `stagingAllowedLocalPath` is provided - assume that operation possibly may be a staging operation.
96+
// To know for sure, fetch metadata and check a `isStagingOperation` flag. If it happens that it wasn't
97+
// a staging operation - not a big deal, we just fetched metadata earlier, but operation is still usable
98+
// and user can get data from it.
99+
// If `stagingAllowedLocalPath` is not provided - don't do anything to the operation. In a case of regular
100+
// operation, everything will work as usual. In a case of staging operation, it will be processed like any
101+
// other query - it will be possible to get data from it as usual, or use other operation methods.
93102
if (options.stagingAllowedLocalPath !== undefined) {
94103
const metadata = await operation.getResultMetadata();
95104
if (metadata.isStagingOperation) {
@@ -174,6 +183,13 @@ export default class DBSQLSession implements IDBSQLSession {
174183
const agent = await connectionProvider.getAgent();
175184

176185
const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent });
186+
// Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files.
187+
// AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200.
188+
// Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if
189+
// file doesn't exist - Azure returns HTTP 404.
190+
//
191+
// For us, it's totally okay if file didn't exist before removing. So when we get an HTTP 404 -
192+
// just ignore it and report success. This way we can have a uniform library behavior for all clouds
177193
if (!response.ok && response.status !== 404) {
178194
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
179195
}
@@ -198,6 +214,7 @@ export default class DBSQLSession implements IDBSQLSession {
198214
method: 'PUT',
199215
headers: {
200216
...headers,
217+
// This header is required by server
201218
'Content-Length': fileInfo.size.toString(),
202219
},
203220
agent,
@@ -215,10 +232,7 @@ export default class DBSQLSession implements IDBSQLSession {
215232
* @returns DBSQLOperation
216233
*/
217234
public async getTypeInfo(request: TypeInfoRequest = {}): Promise<IOperation> {
218-
await this.failIfClosed();
219-
const opBackend = await this.backend.getTypeInfo(request);
220-
await this.failIfClosed();
221-
return this.wrapOperation(opBackend);
235+
return this.wrapOperation(await this.runBackend(() => this.backend.getTypeInfo(request)));
222236
}
223237

224238
/**
@@ -228,10 +242,7 @@ export default class DBSQLSession implements IDBSQLSession {
228242
* @returns DBSQLOperation
229243
*/
230244
public async getCatalogs(request: CatalogsRequest = {}): Promise<IOperation> {
231-
await this.failIfClosed();
232-
const opBackend = await this.backend.getCatalogs(request);
233-
await this.failIfClosed();
234-
return this.wrapOperation(opBackend);
245+
return this.wrapOperation(await this.runBackend(() => this.backend.getCatalogs(request)));
235246
}
236247

237248
/**
@@ -241,10 +252,7 @@ export default class DBSQLSession implements IDBSQLSession {
241252
* @returns DBSQLOperation
242253
*/
243254
public async getSchemas(request: SchemasRequest = {}): Promise<IOperation> {
244-
await this.failIfClosed();
245-
const opBackend = await this.backend.getSchemas(request);
246-
await this.failIfClosed();
247-
return this.wrapOperation(opBackend);
255+
return this.wrapOperation(await this.runBackend(() => this.backend.getSchemas(request)));
248256
}
249257

250258
/**
@@ -254,10 +262,7 @@ export default class DBSQLSession implements IDBSQLSession {
254262
* @returns DBSQLOperation
255263
*/
256264
public async getTables(request: TablesRequest = {}): Promise<IOperation> {
257-
await this.failIfClosed();
258-
const opBackend = await this.backend.getTables(request);
259-
await this.failIfClosed();
260-
return this.wrapOperation(opBackend);
265+
return this.wrapOperation(await this.runBackend(() => this.backend.getTables(request)));
261266
}
262267

263268
/**
@@ -267,10 +272,7 @@ export default class DBSQLSession implements IDBSQLSession {
267272
* @returns DBSQLOperation
268273
*/
269274
public async getTableTypes(request: TableTypesRequest = {}): Promise<IOperation> {
270-
await this.failIfClosed();
271-
const opBackend = await this.backend.getTableTypes(request);
272-
await this.failIfClosed();
273-
return this.wrapOperation(opBackend);
275+
return this.wrapOperation(await this.runBackend(() => this.backend.getTableTypes(request)));
274276
}
275277

276278
/**
@@ -280,10 +282,7 @@ export default class DBSQLSession implements IDBSQLSession {
280282
* @returns DBSQLOperation
281283
*/
282284
public async getColumns(request: ColumnsRequest = {}): Promise<IOperation> {
283-
await this.failIfClosed();
284-
const opBackend = await this.backend.getColumns(request);
285-
await this.failIfClosed();
286-
return this.wrapOperation(opBackend);
285+
return this.wrapOperation(await this.runBackend(() => this.backend.getColumns(request)));
287286
}
288287

289288
/**
@@ -293,17 +292,11 @@ export default class DBSQLSession implements IDBSQLSession {
293292
* @returns DBSQLOperation
294293
*/
295294
public async getFunctions(request: FunctionsRequest): Promise<IOperation> {
296-
await this.failIfClosed();
297-
const opBackend = await this.backend.getFunctions(request);
298-
await this.failIfClosed();
299-
return this.wrapOperation(opBackend);
295+
return this.wrapOperation(await this.runBackend(() => this.backend.getFunctions(request)));
300296
}
301297

302298
public async getPrimaryKeys(request: PrimaryKeysRequest): Promise<IOperation> {
303-
await this.failIfClosed();
304-
const opBackend = await this.backend.getPrimaryKeys(request);
305-
await this.failIfClosed();
306-
return this.wrapOperation(opBackend);
299+
return this.wrapOperation(await this.runBackend(() => this.backend.getPrimaryKeys(request)));
307300
}
308301

309302
/**
@@ -313,10 +306,7 @@ export default class DBSQLSession implements IDBSQLSession {
313306
* @returns DBSQLOperation
314307
*/
315308
public async getCrossReference(request: CrossReferenceRequest): Promise<IOperation> {
316-
await this.failIfClosed();
317-
const opBackend = await this.backend.getCrossReference(request);
318-
await this.failIfClosed();
319-
return this.wrapOperation(opBackend);
309+
return this.wrapOperation(await this.runBackend(() => this.backend.getCrossReference(request)));
320310
}
321311

322312
/**
@@ -346,6 +336,21 @@ export default class DBSQLSession implements IDBSQLSession {
346336
return operation;
347337
}
348338

339+
/**
340+
* Bracket a backend call with `failIfClosed()` on both sides. The pre-call
341+
* check rejects work against an already-closed session; the post-call check
342+
* rejects results that came back after a concurrent close (server-side
343+
* close doesn't error out the in-flight RPC). Centralizing the pattern
344+
* keeps the 10+ delegation methods readable and makes the contract
345+
* impossible to forget.
346+
*/
347+
private async runBackend<T>(fn: () => Promise<T>): Promise<T> {
348+
await this.failIfClosed();
349+
const result = await fn();
350+
await this.failIfClosed();
351+
return result;
352+
}
353+
349354
private async failIfClosed(): Promise<void> {
350355
if (!this.isOpen) {
351356
throw new HiveDriverError('The session was closed or has expired');

0 commit comments

Comments
 (0)