Skip to content

Commit cd3b1e1

Browse files
committed
sea-results: SeaOperationBackend wires kernel result-stream → JS rows
Implements IOperationBackend over the napi binding's Statement. fetchChunk decodes Arrow IPC bytes → apache-arrow RecordBatch → ArrowResultConverter (Phase 1+2 reused unchanged) → JS rows. All M0 datatypes round-trip via the same converter the thrift path uses (BOOL, INT8/16/32/64, FLOAT, DOUBLE, DECIMAL, STRING, BINARY, DATE, TIMESTAMP, INTERVAL, ARRAY, MAP, STRUCT). Unit tests construct synthetic IPC batches; e2e test against pecotesting confirms byte-identical parity vs thrift. No new dependencies. ArrowResultConverter / ResultSlicer / OperationIterator all reused unchanged (DRY). Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 6be8799 commit cd3b1e1

6 files changed

Lines changed: 864 additions & 100 deletions

File tree

lib/sea/SeaArrowIpc.ts

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import { RecordBatchReader, Schema, Field, DataType, TypeMap } from 'apache-arrow';
16+
import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types';
17+
18+
/**
19+
* Field metadata key used by the kernel to attach the original Databricks
20+
* SQL type name to each Arrow field. See `databricks-sql-kernel/src/reader/mod.rs`.
21+
*/
22+
const DATABRICKS_TYPE_NAME = 'databricks.type_name';
23+
24+
/**
25+
* Decode an Arrow IPC stream payload (schema header + zero-or-more
26+
* record-batch messages) into its row count.
27+
*
28+
* Returns `{ schema, rowCount }`. The schema is left intact as the
29+
* apache-arrow Schema object so callers can reuse it; the rowCount is
30+
* the sum of `RecordBatch.numRows` across every record-batch message
31+
* in the stream.
32+
*
33+
* Why we parse upfront: `ArrowResultConverter` consumes `ArrowBatch`
34+
* objects which carry an explicit `rowCount`. The kernel's IPC payload
35+
* does not carry a separate count — only per-RecordBatch numRows. We
36+
* walk the messages once to sum them so the converter sees the same
37+
* shape as the thrift path (`ArrowResultHandler.fetchNext` at
38+
* `lib/result/ArrowResultHandler.ts:55`).
39+
*
40+
* Re-parsing inside the converter is unavoidable because `RecordBatch`
41+
* instances created here cannot be passed across the converter's
42+
* `Buffer[]` boundary without rewriting the converter. The IPC bytes
43+
* themselves are small enough (one record batch per call) that the
44+
* double-parse cost is negligible for M0.
45+
*/
46+
export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; rowCount: number } {
47+
const reader = RecordBatchReader.from<TypeMap>(ipcBytes);
48+
// Eagerly open so `schema` is populated.
49+
reader.open();
50+
const { schema } = reader;
51+
52+
let rowCount = 0;
53+
// Iterate all record batches in the stream and sum row counts.
54+
for (const batch of reader) {
55+
rowCount += batch.numRows;
56+
}
57+
return { schema, rowCount };
58+
}
59+
60+
/**
61+
* Decode an Arrow IPC schema payload (no record batches) into the
62+
* apache-arrow Schema object.
63+
*/
64+
export function decodeIpcSchema(ipcBytes: Buffer): Schema<TypeMap> {
65+
const reader = RecordBatchReader.from<TypeMap>(ipcBytes);
66+
reader.open();
67+
return reader.schema;
68+
}
69+
70+
/**
71+
* Map an Arrow `DataType` (with optional `databricks.type_name`
72+
* metadata) onto the closest Thrift `TTypeId`.
73+
*
74+
* This is the synthesis step that lets the existing
75+
* `ArrowResultConverter` Phase-2 dispatch (`convertThriftValue` in
76+
* `lib/result/utils.ts:61-98`) keep working unchanged for the SEA
77+
* path. Phase-2 keys exclusively off `TPrimitiveTypeEntry.type` per
78+
* column, so we synthesize a `TColumnDesc` whose `TTypeId` matches the
79+
* server-emitted Arrow type as closely as possible.
80+
*
81+
* Resolution order:
82+
* 1. The kernel attaches `databricks.type_name` (e.g. "DECIMAL",
83+
* "INTERVAL", "STRUCT") to each field's metadata. Prefer that when
84+
* present — it carries the original SQL semantic that the Arrow
85+
* type alone can lose (e.g. INTERVAL → Utf8 with metadata).
86+
* 2. Fall back to the Arrow `DataType.typeId` for primitive types.
87+
*
88+
* This matches the JDBC and Python drivers' policy of trusting the
89+
* server's logical type assignment over the wire-level Arrow encoding.
90+
*/
91+
function arrowTypeToTTypeId(field: Field<DataType>): TTypeId {
92+
const typeName = field.metadata.get(DATABRICKS_TYPE_NAME)?.toUpperCase();
93+
94+
switch (typeName) {
95+
case 'BOOLEAN':
96+
return TTypeId.BOOLEAN_TYPE;
97+
case 'TINYINT':
98+
case 'BYTE':
99+
return TTypeId.TINYINT_TYPE;
100+
case 'SMALLINT':
101+
case 'SHORT':
102+
return TTypeId.SMALLINT_TYPE;
103+
case 'INT':
104+
case 'INTEGER':
105+
return TTypeId.INT_TYPE;
106+
case 'BIGINT':
107+
case 'LONG':
108+
return TTypeId.BIGINT_TYPE;
109+
case 'FLOAT':
110+
case 'REAL':
111+
return TTypeId.FLOAT_TYPE;
112+
case 'DOUBLE':
113+
return TTypeId.DOUBLE_TYPE;
114+
case 'STRING':
115+
return TTypeId.STRING_TYPE;
116+
case 'VARCHAR':
117+
return TTypeId.VARCHAR_TYPE;
118+
case 'CHAR':
119+
return TTypeId.CHAR_TYPE;
120+
case 'BINARY':
121+
return TTypeId.BINARY_TYPE;
122+
case 'DATE':
123+
return TTypeId.DATE_TYPE;
124+
case 'TIMESTAMP':
125+
case 'TIMESTAMP_NTZ':
126+
return TTypeId.TIMESTAMP_TYPE;
127+
case 'DECIMAL':
128+
return TTypeId.DECIMAL_TYPE;
129+
case 'INTERVAL':
130+
case 'INTERVAL DAY':
131+
case 'INTERVAL DAY TO HOUR':
132+
case 'INTERVAL DAY TO MINUTE':
133+
case 'INTERVAL DAY TO SECOND':
134+
case 'INTERVAL HOUR':
135+
case 'INTERVAL HOUR TO MINUTE':
136+
case 'INTERVAL HOUR TO SECOND':
137+
case 'INTERVAL MINUTE':
138+
case 'INTERVAL MINUTE TO SECOND':
139+
case 'INTERVAL SECOND':
140+
return TTypeId.INTERVAL_DAY_TIME_TYPE;
141+
case 'INTERVAL YEAR':
142+
case 'INTERVAL YEAR TO MONTH':
143+
case 'INTERVAL MONTH':
144+
return TTypeId.INTERVAL_YEAR_MONTH_TYPE;
145+
case 'ARRAY':
146+
return TTypeId.ARRAY_TYPE;
147+
case 'MAP':
148+
return TTypeId.MAP_TYPE;
149+
case 'STRUCT':
150+
return TTypeId.STRUCT_TYPE;
151+
case 'NULL':
152+
case 'VOID':
153+
return TTypeId.NULL_TYPE;
154+
default:
155+
break;
156+
}
157+
158+
// Fall back to Arrow's own type id when no databricks metadata is set
159+
// (e.g. unit tests constructing batches without metadata).
160+
const arrowType = field.type;
161+
if (DataType.isBool(arrowType)) return TTypeId.BOOLEAN_TYPE;
162+
if (DataType.isInt(arrowType)) {
163+
switch (arrowType.bitWidth) {
164+
case 8:
165+
return TTypeId.TINYINT_TYPE;
166+
case 16:
167+
return TTypeId.SMALLINT_TYPE;
168+
case 32:
169+
return TTypeId.INT_TYPE;
170+
case 64:
171+
return TTypeId.BIGINT_TYPE;
172+
default:
173+
return TTypeId.BIGINT_TYPE;
174+
}
175+
}
176+
if (DataType.isFloat(arrowType)) {
177+
// arrow Float precision: 16=HALF, 32=SINGLE, 64=DOUBLE
178+
return arrowType.precision === 2 ? TTypeId.DOUBLE_TYPE : TTypeId.FLOAT_TYPE;
179+
}
180+
if (DataType.isDecimal(arrowType)) return TTypeId.DECIMAL_TYPE;
181+
if (DataType.isUtf8(arrowType)) return TTypeId.STRING_TYPE;
182+
if (DataType.isBinary(arrowType)) return TTypeId.BINARY_TYPE;
183+
if (DataType.isDate(arrowType)) return TTypeId.DATE_TYPE;
184+
if (DataType.isTimestamp(arrowType)) return TTypeId.TIMESTAMP_TYPE;
185+
if (DataType.isList(arrowType)) return TTypeId.ARRAY_TYPE;
186+
if (DataType.isMap(arrowType)) return TTypeId.MAP_TYPE;
187+
if (DataType.isStruct(arrowType)) return TTypeId.STRUCT_TYPE;
188+
if (DataType.isNull(arrowType)) return TTypeId.NULL_TYPE;
189+
190+
return TTypeId.STRING_TYPE;
191+
}
192+
193+
/**
194+
* Synthesize a Thrift `TTableSchema` from an Arrow schema decoded out
195+
* of the kernel's IPC stream. Used by `SeaOperationBackend.getResultMetadata`
196+
* to drive `ArrowResultConverter.convertThriftTypes` (Phase 2) without
197+
* changing that code.
198+
*/
199+
export function arrowSchemaToThriftSchema(arrowSchema: Schema<TypeMap>): TTableSchema {
200+
const columns = arrowSchema.fields.map((field, index) => {
201+
const primitiveEntry: TPrimitiveTypeEntry = {
202+
type: arrowTypeToTTypeId(field),
203+
};
204+
return {
205+
columnName: field.name,
206+
typeDesc: {
207+
types: [
208+
{
209+
primitiveEntry,
210+
},
211+
],
212+
},
213+
position: index + 1,
214+
};
215+
});
216+
return { columns };
217+
}

0 commit comments

Comments
 (0)