Skip to content

Commit 0430418

Browse files
committed
sea-operation: cancel/close/finished lifecycle for SEA operations
Rust source changes (native/sea/src/connection.rs + statement.rs) deferred to kernel repo PR #29 (kernel-napi-statement-validity) since napi/src now lives in databricks-sql-kernel/napi/. SeaOperationBackend.ts conflict resolved using integration commit 3da7aa7 (combining sea-results fetch pipeline with sea-operation lifecycle helpers). Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent cd3b1e1 commit 0430418

4 files changed

Lines changed: 1132 additions & 108 deletions

File tree

lib/sea/SeaOperationBackend.ts

Lines changed: 117 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,30 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
/**
16+
* `IOperationBackend` implementation for the SEA path.
17+
*
18+
* Combines:
19+
* - **Fetch pipeline (from sea-results):**
20+
* `napi.Statement.fetchNextBatch()` → `SeaResultsProvider` →
21+
* `ArrowResultConverter` (Phase 1 + Phase 2; reused unchanged) →
22+
* `ResultSlicer` (chunk-size normalisation; reused unchanged). The M0
23+
* row shape is byte-identical to the thrift path for every M0
24+
* datatype (parity gate exercised by `tests/integration/sea/results-e2e.test.ts`).
25+
*
26+
* - **Lifecycle (from sea-operation):** `cancel()` / `close()` /
27+
* `finished()` (alias of `waitUntilReady`) delegate to the helpers
28+
* in `SeaOperationLifecycle.ts`. The helpers handle idempotency,
29+
* flag-set-before-await ordering (so cancel-mid-fetch propagates),
30+
* logging via `IClientContext`, and kernel-error mapping.
31+
*
32+
* The lifecycle helpers route fetch-after-cancel / fetch-after-close
33+
* through `failIfNotActive`, which throws an `OperationStateError`
34+
* matching the Thrift `failIfClosed` semantics. We call it from
35+
* `fetchChunk`/`hasMore`/`getResultMetadata` so the cancel-mid-fetch
36+
* e2e (cancel < 200ms) drives against this backend cleanly.
37+
*/
38+
1539
import { v4 as uuidv4 } from 'uuid';
1640
import {
1741
TGetOperationStatusResp,
@@ -29,78 +53,50 @@ import ResultSlicer from '../result/ResultSlicer';
2953
import SeaResultsProvider from './SeaResultsProvider';
3054
import { arrowSchemaToThriftSchema, decodeIpcSchema } from './SeaArrowIpc';
3155
import { SeaNativeStatement } from './SeaNativeLoader';
32-
import { mapKernelErrorToJsError, KernelErrorShape } from './SeaErrorMapping';
56+
import {
57+
SeaStatementHandle,
58+
SeaOperationLifecycleState,
59+
createLifecycleState,
60+
seaCancel,
61+
seaClose,
62+
seaFinished,
63+
failIfNotActive,
64+
} from './SeaOperationLifecycle';
65+
66+
/**
67+
* Structural union of the lifecycle surface (cancel/close) and the
68+
* fetch surface (fetchNextBatch/schema). The real napi `Statement`
69+
* implements both; lifecycle-only test stubs implement only the
70+
* cancel/close half — fetch methods are accessed lazily and the
71+
* lifecycle tests never reach that path.
72+
*/
73+
export type SeaOperationStatement = SeaStatementHandle & Partial<SeaNativeStatement>;
3374

3475
/**
3576
* Constructor options for `SeaOperationBackend`.
3677
*/
3778
export interface SeaOperationBackendOptions {
3879
/** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */
39-
statement: SeaNativeStatement;
80+
statement: SeaOperationStatement;
4081
context: IClientContext;
4182
/**
42-
* Optional override for `id`. When not provided a fresh UUIDv4 is used.
43-
* The kernel does not yet surface its internal statement-id at the napi
44-
* boundary; once it does, the JS layer can thread it through here.
83+
* Optional override for `id`. When not provided a fresh UUIDv4 is
84+
* generated upstream (in `SeaSessionBackend.executeStatement`); the
85+
* kernel does not yet surface its internal statement-id at the napi
86+
* boundary. Once it does, the JS layer can thread it through here.
4587
*/
4688
id?: string;
4789
}
4890

49-
/**
50-
* Sentinel string the napi binding uses on `Error.reason` JSON envelopes.
51-
* Keep in sync with `native/sea/src/error.rs` (`SENTINEL`).
52-
*/
53-
const KERNEL_ERROR_SENTINEL = '__databricks_error__:';
54-
55-
function rethrowKernelError(err: unknown): never {
56-
if (err && typeof err === 'object' && 'message' in err) {
57-
const reason = (err as { reason?: unknown }).reason;
58-
if (typeof reason === 'string' && reason.startsWith(KERNEL_ERROR_SENTINEL)) {
59-
try {
60-
const payload = JSON.parse(reason.slice(KERNEL_ERROR_SENTINEL.length)) as KernelErrorShape;
61-
throw mapKernelErrorToJsError(payload);
62-
} catch (parseErr) {
63-
if (parseErr !== err) {
64-
throw parseErr;
65-
}
66-
}
67-
}
68-
}
69-
throw err;
70-
}
71-
72-
/**
73-
* `IOperationBackend` over the napi-bound kernel `Statement`. Adapts
74-
* the kernel's Arrow IPC stream onto the existing thrift-shaped result
75-
* pipeline (`ArrowResultConverter` + `ResultSlicer`) so the M0 row
76-
* shape is byte-identical to the thrift path for every M0 datatype.
77-
*
78-
* Pipeline:
79-
* napi.Statement.fetchNextBatch() (IPC bytes per batch)
80-
* -> SeaResultsProvider (adapts to IResultsProvider<ArrowBatch>)
81-
* -> ArrowResultConverter (Phase 1 + Phase 2; reused unchanged)
82-
* -> ResultSlicer (chunk-size normalisation; reused unchanged)
83-
*
84-
* The kernel exposes only the `Arrow` `ResultBatch` variant for M0 —
85-
* both CloudFetch (external links) and inline batches flow through
86-
* `ResultStream::next_batch` and surface as a single Arrow IPC stream
87-
* per call. One backend therefore covers both fetch modes without
88-
* dispatching on `TSparkRowSetType`.
89-
*
90-
* **Lifecycle:** `cancel()` and `close()` are idempotent (a second
91-
* call is a no-op). Cancel-after-close is a no-op; close-after-cancel
92-
* still goes through to the binding because the kernel's close is the
93-
* only way to release the server-side handle. Cancelled flag is set
94-
* _before_ awaiting the napi call so a concurrent `fetchChunk` issued
95-
* mid-cancel sees the flag when its await yields.
96-
*/
9791
export default class SeaOperationBackend implements IOperationBackend {
98-
private readonly statement: SeaNativeStatement;
92+
private readonly statement: SeaOperationStatement;
9993

10094
private readonly context: IClientContext;
10195

10296
private readonly _id: string;
10397

98+
private readonly lifecycle: SeaOperationLifecycleState = createLifecycleState();
99+
104100
private resultSlicer?: ResultSlicer<any>;
105101

106102
private resultsProvider?: SeaResultsProvider;
@@ -109,16 +105,6 @@ export default class SeaOperationBackend implements IOperationBackend {
109105

110106
private metadataPromise?: Promise<TGetResultSetMetadataResp>;
111107

112-
// Tracks the operation's terminal state. The kernel does not expose
113-
// pending/running observability at the napi surface today; `execute`
114-
// resolves only after the statement has reached a result-fetching
115-
// state, so we treat the backend as FINISHED until `close()`/`cancel()`.
116-
private state: TOperationState = TOperationState.FINISHED_STATE;
117-
118-
private cancelled = false;
119-
120-
private closed = false;
121-
122108
constructor({ statement, context, id }: SeaOperationBackendOptions) {
123109
this.statement = statement;
124110
this.context = context;
@@ -138,50 +124,42 @@ export default class SeaOperationBackend implements IOperationBackend {
138124
return true;
139125
}
140126

127+
// ---------------------------------------------------------------------------
128+
// Fetch / metadata (owned by the sea-results pipeline).
129+
// ---------------------------------------------------------------------------
130+
141131
public async fetchChunk({
142132
limit,
143133
disableBuffering,
144134
}: {
145135
limit: number;
146136
disableBuffering?: boolean;
147137
}): Promise<Array<object>> {
138+
// Cancel-mid-fetch propagation: if cancel() has flipped the
139+
// lifecycle flag, fail locally without a wire round-trip.
140+
failIfNotActive(this.lifecycle);
148141
const slicer = await this.getResultSlicer();
149142
return slicer.fetchNext({ limit, disableBuffering });
150143
}
151144

152145
public async hasMore(): Promise<boolean> {
146+
failIfNotActive(this.lifecycle);
153147
const slicer = await this.getResultSlicer();
154148
return slicer.hasMore();
155149
}
156150

157-
public async waitUntilReady(options?: {
158-
progress?: boolean;
159-
callback?: (progress: TGetOperationStatusResp) => unknown;
160-
}): Promise<void> {
161-
// The kernel's `executeStatement` resolves once results are
162-
// available; there's no pending/running state to observe here. We
163-
// synthesise an immediate FINISHED status for the optional callback.
164-
if (options?.callback) {
165-
await Promise.resolve(options.callback(await this.status(Boolean(options.progress))));
166-
}
167-
}
168-
169-
public async status(_progress: boolean): Promise<TGetOperationStatusResp> {
170-
return {
171-
status: { statusCode: TStatusCode.SUCCESS_STATUS },
172-
operationState: this.state,
173-
hasResultSet: true,
174-
};
175-
}
176-
177151
public async getResultMetadata(): Promise<TGetResultSetMetadataResp> {
152+
failIfNotActive(this.lifecycle);
178153
if (this.metadata) {
179154
return this.metadata;
180155
}
181156
if (this.metadataPromise) {
182157
return this.metadataPromise;
183158
}
184159
this.metadataPromise = (async () => {
160+
if (!this.statement.schema) {
161+
throw new Error('SeaOperationBackend: statement.schema() is not available on this handle');
162+
}
185163
const arrowSchemaIpc = await this.statement.schema();
186164
const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes);
187165
const thriftSchema: TTableSchema = arrowSchemaToThriftSchema(arrowSchema);
@@ -205,42 +183,73 @@ export default class SeaOperationBackend implements IOperationBackend {
205183
}
206184
}
207185

208-
public async cancel(): Promise<Status> {
209-
if (this.cancelled || this.closed) {
210-
return Status.success();
186+
// ---------------------------------------------------------------------------
187+
// Status / lifecycle (owned by the sea-operation lifecycle helpers).
188+
// ---------------------------------------------------------------------------
189+
190+
public async status(_progress: boolean): Promise<TGetOperationStatusResp> {
191+
// Synthesised — kernel only surfaces terminal-or-running statements
192+
// through its public API; we report CANCELED/CLOSED if the lifecycle
193+
// flag is set, else FINISHED. Matches the Thrift status shape so
194+
// facade-level callers see consistent telemetry across backends.
195+
if (this.lifecycle.isCancelled) {
196+
return {
197+
status: { statusCode: TStatusCode.SUCCESS_STATUS },
198+
operationState: TOperationState.CANCELED_STATE,
199+
hasResultSet: true,
200+
};
211201
}
212-
// Set the flag _before_ awaiting so a concurrent fetchChunk
213-
// observing the flag short-circuits when its await yields.
214-
this.cancelled = true;
215-
try {
216-
await this.statement.cancel();
217-
} catch (err) {
218-
rethrowKernelError(err);
202+
if (this.lifecycle.isClosed) {
203+
return {
204+
status: { statusCode: TStatusCode.SUCCESS_STATUS },
205+
operationState: TOperationState.CLOSED_STATE,
206+
hasResultSet: true,
207+
};
219208
}
220-
this.state = TOperationState.CANCELED_STATE;
221-
return Status.success();
209+
return {
210+
status: { statusCode: TStatusCode.SUCCESS_STATUS },
211+
operationState: TOperationState.FINISHED_STATE,
212+
hasResultSet: true,
213+
};
214+
}
215+
216+
public async waitUntilReady(options?: {
217+
progress?: boolean;
218+
callback?: (progress: TGetOperationStatusResp) => unknown;
219+
}): Promise<void> {
220+
// Kernel's `Statement::execute().await` has already resolved by the
221+
// time we hold a Statement handle — there is no pending/running
222+
// state to poll for M0. seaFinished fires the progress callback
223+
// once with a synthesised FINISHED response so progress-UI callers
224+
// see the same one-shot completion tick the Thrift path emits at
225+
// the end of its polling loop.
226+
return seaFinished(this.lifecycle, options);
227+
}
228+
229+
public async cancel(): Promise<Status> {
230+
return seaCancel(this.lifecycle, this.statement, this.context, this._id);
222231
}
223232

224233
public async close(): Promise<Status> {
225-
if (this.closed) {
226-
return Status.success();
227-
}
228-
this.closed = true;
229-
try {
230-
await this.statement.close();
231-
} catch (err) {
232-
rethrowKernelError(err);
233-
}
234-
this.state = TOperationState.CLOSED_STATE;
235-
return Status.success();
234+
return seaClose(this.lifecycle, this.statement, this.context, this._id);
236235
}
237236

237+
// ---------------------------------------------------------------------------
238+
// Internals.
239+
// ---------------------------------------------------------------------------
240+
238241
private async getResultSlicer(): Promise<ResultSlicer<any>> {
239242
if (this.resultSlicer) {
240243
return this.resultSlicer;
241244
}
245+
if (!this.statement.fetchNextBatch) {
246+
throw new Error('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
247+
}
242248
const metadata = await this.getResultMetadata();
243-
this.resultsProvider = new SeaResultsProvider(this.statement);
249+
// The lifecycle subset has cancel/close only; fetch methods exist on
250+
// the full napi Statement. Cast is safe here because we've just
251+
// verified `fetchNextBatch` is callable.
252+
this.resultsProvider = new SeaResultsProvider(this.statement as SeaNativeStatement);
244253
const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata);
245254
this.resultSlicer = new ResultSlicer(this.context, converter);
246255
return this.resultSlicer;

0 commit comments

Comments
 (0)