Skip to content

Commit c6d24df

Browse files
committed
sea-integration: INTERVAL YEAR-MONTH + DAY-TIME parity with thrift
- YEAR-MONTH: convert Arrow Interval[YearMonth] to thrift "N-M" string format (with leading "-" for negatives) in Phase 1 of converter - DAY-TIME: pre-process IPC schema bytes before apache-arrow@13 decode (which predates the Arrow Duration type id 18) to remap Duration -> Int64 with original time unit preserved in `databricks.arrow.duration_unit` field metadata; convert Int64 duration values to thrift "D HH:mm:ss.fffffffff" string format Both interval flavours are formatted by the same converter helper (formatDayTimeFromTotal); the duration_unit metadata gates between the native Arrow Interval Int32Array path and the rewritten Duration Int64 path. No apache-arrow bump, no node_modules edits, no kernel-side change. New: lib/sea/SeaArrowIpcDurationFix.ts (FlatBuffer rewriter using apache-arrow's internal fb/* accessors). M0 datatype parity now 25/25.
1 parent 4bd89c6 commit c6d24df

5 files changed

Lines changed: 1274 additions & 15 deletions

File tree

lib/result/ArrowResultConverter.ts

Lines changed: 199 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,143 @@ const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
2323
type ArrowSchema = Schema<TypeMap>;
2424
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
2525

26+
/**
27+
* Metadata key carrying the original Arrow `Duration` time unit on
28+
* fields that were rewritten to `Int64` by the SEA IPC pre-processor
29+
* (`lib/sea/SeaArrowIpcDurationFix.ts`). We re-declare the constant
30+
* here (rather than importing it) so the converter has no compile-time
31+
* dependency on the SEA module — it's reused unchanged by the
32+
* thrift-path which has no SEA awareness.
33+
*/
34+
const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit';
35+
36+
/**
37+
* Format an Arrow `Interval[YearMonth]` or `Interval[DayTime]` value
38+
* into the canonical thrift string the JDBC/ODBC server emits:
39+
* YEAR-MONTH → `"Y-M"` (e.g. 1 year 2 months → `"1-2"`)
40+
* DAY-TIME → `"D HH:mm:ss.fffffffff"`
41+
* (e.g. 1 day 02:03:04 → `"1 02:03:04.000000000"`)
42+
*
43+
* Arrow surfaces these as `Int32Array(2)` via the `GetVisitor`
44+
* (`apache-arrow/visitor/get.js:177-185`):
45+
* YEAR-MONTH: `[years, months]` (years/months derived from a single
46+
* int32 holding total months)
47+
* DAY-TIME: `[days, milliseconds]` (legacy two-int32 form)
48+
*
49+
* Negative intervals: the FULL interval is emitted with a leading `-`
50+
* (Spark convention), and individual fields are unsigned. We mirror
51+
* Spark's display.
52+
*/
53+
function formatArrowInterval(value: any, valueType: any): string {
54+
// `value` is an Int32Array of length 2.
55+
const a = Number(value[0]);
56+
const b = Number(value[1]);
57+
// unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO
58+
const unit = valueType?.unit;
59+
if (unit === 0) {
60+
return formatYearMonth(a, b);
61+
}
62+
// DAY_TIME: a = days, b = milliseconds (within the day, can be ≥0 or <0)
63+
// We re-normalise: total milliseconds = a * 86_400_000 + b, then split into
64+
// days, hours, minutes, seconds, nanoseconds (nanoseconds is always 0
65+
// because the legacy IntervalDayTime carries only millisecond precision).
66+
const totalMs = BigInt(a) * BigInt(86_400_000) + BigInt(b);
67+
return formatDayTimeFromTotal(totalMs * BigInt(1_000_000) /* → ns */, 'NANOSECOND');
68+
}
69+
70+
/**
71+
* Format the (years, months) decomposition into `"Y-M"` (or `"-Y-M"`
72+
* for negative intervals). Arrow's `getIntervalYearMonth` (in
73+
* `apache-arrow/visitor/get.js:179`) decomposes a signed total-months
74+
* int32 via integer truncation, so years and months always share the
75+
* same sign. We render the absolute values with a single leading `-`
76+
* to match the Spark display format used on the thrift path.
77+
*/
78+
function formatYearMonth(years: number, months: number): string {
79+
const total = years * 12 + months;
80+
if (total < 0) {
81+
const abs = -total;
82+
const y = Math.trunc(abs / 12);
83+
const m = abs % 12;
84+
return `-${y}-${m}`;
85+
}
86+
return `${years}-${months}`;
87+
}
88+
89+
/**
90+
* Format an Arrow `Duration` value (rewritten by the SEA IPC
91+
* pre-processor to `Int64`) into the thrift INTERVAL DAY-TIME string.
92+
*
93+
* @param value the duration value as `bigint` (signed nanos/micros/
94+
* millis/seconds depending on `unit`)
95+
* @param unit one of `SECOND` / `MILLISECOND` / `MICROSECOND` /
96+
* `NANOSECOND` (the original Arrow time unit, captured
97+
* by `SeaArrowIpcDurationFix.ts`)
98+
*/
99+
function formatDurationToIntervalDayTime(value: bigint | number, unit: string): string {
100+
const bi = typeof value === 'bigint' ? value : BigInt(value);
101+
const nanos = toNanoseconds(bi, unit);
102+
return formatDayTimeFromTotal(nanos, unit);
103+
}
104+
105+
/**
106+
* Scale a duration value to nanoseconds based on its unit.
107+
*
108+
* SECOND → ×1_000_000_000
109+
* MILLISECOND → × 1_000_000
110+
* MICROSECOND → × 1_000
111+
* NANOSECOND → × 1
112+
*/
113+
function toNanoseconds(value: bigint, unit: string): bigint {
114+
switch (unit) {
115+
case 'SECOND':
116+
return value * BigInt(1_000_000_000);
117+
case 'MILLISECOND':
118+
return value * BigInt(1_000_000);
119+
case 'MICROSECOND':
120+
return value * BigInt(1_000);
121+
case 'NANOSECOND':
122+
default:
123+
return value;
124+
}
125+
}
126+
127+
/**
128+
* Format a signed total-nanoseconds value as `"D HH:mm:ss.fffffffff"`.
129+
* Always emits 9 fractional digits to match the thrift driver's wire
130+
* format (`"1 02:03:04.000000000"` — 9 digits regardless of the
131+
* server-side storage precision). Negative values get a single
132+
* leading `-`.
133+
*
134+
* The `unit` parameter is currently unused for formatting (the value
135+
* is already in nanoseconds by the time we get here) but is retained
136+
* for future use if a unit-aware precision is ever needed.
137+
*/
138+
function formatDayTimeFromTotal(totalNanos: bigint, _unit: string): string {
139+
const ZERO = BigInt(0);
140+
const sign = totalNanos < ZERO ? '-' : '';
141+
const abs = totalNanos < ZERO ? -totalNanos : totalNanos;
142+
143+
const NS_PER_SEC = BigInt(1_000_000_000);
144+
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
145+
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
146+
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);
147+
148+
const days = abs / NS_PER_DAY;
149+
let rem = abs % NS_PER_DAY;
150+
const hours = rem / NS_PER_HOUR;
151+
rem %= NS_PER_HOUR;
152+
const minutes = rem / NS_PER_MIN;
153+
rem %= NS_PER_MIN;
154+
const seconds = rem / NS_PER_SEC;
155+
const subSeconds = rem % NS_PER_SEC;
156+
157+
const pad2 = (n: bigint): string => n.toString().padStart(2, '0');
158+
const fraction = `.${subSeconds.toString().padStart(9, '0')}`;
159+
160+
return `${sign}${days.toString()} ${pad2(hours)}:${pad2(minutes)}:${pad2(seconds)}${fraction}`;
161+
}
162+
26163
export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
27164
private readonly context: IClientContext;
28165

@@ -142,37 +279,52 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
142279
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
143280
return rows.map((row) => {
144281
// First, convert native Arrow values to corresponding plain JS objects
145-
const record = this.convertArrowTypes(row, undefined, schema.fields);
282+
const record = this.convertArrowTypes(row, undefined, schema.fields, undefined);
146283
// Second, cast all the values to original Thrift types
147284
return this.convertThriftTypes(record);
148285
});
149286
}
150287

151-
private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
288+
private convertArrowTypes(
289+
value: any,
290+
valueType: DataType | undefined,
291+
fields: Array<ArrowSchemaField> = [],
292+
field?: ArrowSchemaField,
293+
): any {
152294
if (value === null) {
153295
return value;
154296
}
155297

156298
const fieldsMap: Record<string, ArrowSchemaField> = {};
157-
for (const field of fields) {
158-
fieldsMap[field.name] = field;
299+
for (const f of fields) {
300+
fieldsMap[f.name] = f;
159301
}
160302

161303
// Convert structures to plain JS object and process all its fields recursively
162304
if (value instanceof StructRow) {
163305
const result = value.toJSON();
164306
for (const key of Object.keys(result)) {
165-
const field: ArrowSchemaField | undefined = fieldsMap[key];
166-
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
307+
const childField: ArrowSchemaField | undefined = fieldsMap[key];
308+
result[key] = this.convertArrowTypes(
309+
result[key],
310+
childField?.type,
311+
childField?.type.children || [],
312+
childField,
313+
);
167314
}
168315
return result;
169316
}
170317
if (value instanceof MapRow) {
171318
const result = value.toJSON();
172319
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
173-
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
320+
const valueField = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
174321
for (const key of Object.keys(result)) {
175-
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
322+
result[key] = this.convertArrowTypes(
323+
result[key],
324+
valueField?.type,
325+
valueField?.type.children || [],
326+
valueField,
327+
);
176328
}
177329
return result;
178330
}
@@ -181,31 +333,67 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
181333
if (value instanceof Vector) {
182334
const result = value.toJSON();
183335
// Array type contains the only child which defines a type of each array's element
184-
const field = fieldsMap.element;
185-
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
336+
const elementField = fieldsMap.element;
337+
return result.map((item) =>
338+
this.convertArrowTypes(item, elementField?.type, elementField?.type.children || [], elementField),
339+
);
186340
}
187341

188342
if (DataType.isTimestamp(valueType)) {
189343
return new Date(value);
190344
}
191345

346+
// INTERVAL — Spark/Databricks SEA emits two flavours: native Arrow
347+
// `Interval[YearMonth]` / `Interval[DayTime]` (handled here) and
348+
// `Duration` (transparently rewritten to `Int64` upstream by
349+
// `SeaArrowIpcDurationFix.ts`; handled in the bigint/Int64 branch
350+
// below). In every case we coerce to the canonical thrift string
351+
// form so the SEA path is byte-identical with the thrift path:
352+
// YEAR-MONTH → `"Y-M"`
353+
// DAY-TIME → `"D HH:mm:ss.fffffffff"`
354+
if (DataType.isInterval(valueType)) {
355+
return formatArrowInterval(value, valueType);
356+
}
357+
192358
// Convert big number values to BigInt
193359
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
194360
if (value instanceof Object && value[isArrowBigNumSymbol]) {
195361
const result = bigNumToBigInt(value);
196362
if (DataType.isDecimal(valueType)) {
197363
return Number(result) / 10 ** valueType.scale;
198364
}
365+
// Duration columns rewritten to Int64 — detect via metadata.
366+
const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY);
367+
if (durationUnit) {
368+
return formatDurationToIntervalDayTime(result, durationUnit);
369+
}
199370
return result;
200371
}
201372

202373
// Convert binary data to Buffer
203374
if (value instanceof Uint8Array) {
375+
// INTERVAL DAY-TIME / YEAR-MONTH that apache-arrow surfaced as
376+
// an Int32Array (size 2). `Uint8Array.isInstanceOf` is true for
377+
// every TypedArray subclass, so we have to check the parent type
378+
// first. The `DataType.isInterval` branch above already handles
379+
// the case where Arrow knew the field was an interval — this
380+
// fallback covers schemas where the interval surfaced as bare
381+
// bytes (defensive; not exercised in M0).
204382
return Buffer.from(value);
205383
}
206384

385+
// Bigint fallback — for raw bigints (not BigNum wrappers), the
386+
// duration_unit metadata also gates the INTERVAL DAY-TIME format.
387+
if (typeof value === 'bigint') {
388+
const durationUnit = field?.metadata.get(DURATION_UNIT_METADATA_KEY);
389+
if (durationUnit) {
390+
return formatDurationToIntervalDayTime(value, durationUnit);
391+
}
392+
return Number(value);
393+
}
394+
207395
// Return other values as is
208-
return typeof value === 'bigint' ? Number(value) : value;
396+
return value;
209397
}
210398

211399
private convertThriftTypes(record: Record<string, any>): any {

lib/sea/SeaArrowIpc.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import { RecordBatchReader, Schema, Field, DataType, TypeMap } from 'apache-arrow';
1616
import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types';
17+
import { rewriteDurationToInt64, DURATION_UNIT_METADATA_KEY } from './SeaArrowIpcDurationFix';
1718

1819
/**
1920
* Field metadata key used by the kernel to attach the original Databricks
@@ -44,7 +45,8 @@ const DATABRICKS_TYPE_NAME = 'databricks.type_name';
4445
* double-parse cost is negligible for M0.
4546
*/
4647
export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; rowCount: number } {
47-
const reader = RecordBatchReader.from<TypeMap>(ipcBytes);
48+
const patched = rewriteDurationToInt64(ipcBytes);
49+
const reader = RecordBatchReader.from<TypeMap>(patched);
4850
// Eagerly open so `schema` is populated.
4951
reader.open();
5052
const { schema } = reader;
@@ -62,11 +64,30 @@ export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; row
6264
* apache-arrow Schema object.
6365
*/
6466
export function decodeIpcSchema(ipcBytes: Buffer): Schema<TypeMap> {
65-
const reader = RecordBatchReader.from<TypeMap>(ipcBytes);
67+
const patched = rewriteDurationToInt64(ipcBytes);
68+
const reader = RecordBatchReader.from<TypeMap>(patched);
6669
reader.open();
6770
return reader.schema;
6871
}
6972

73+
/**
74+
* Pre-process raw IPC bytes from the kernel so they're consumable by
75+
* `apache-arrow@13`. The current transformation is `Duration → Int64`
76+
* with the original duration unit preserved in field metadata (see
77+
* `SeaArrowIpcDurationFix.ts`). Returned bytes are byte-identical to
78+
* the input when no transformation is needed.
79+
*
80+
* Exposed so callers can pre-patch the buffer **once** and pass the
81+
* result through both `decodeIpcBatch` (for row-count extraction in
82+
* `SeaResultsProvider`) and `ArrowResultConverter.fetchNext` (which
83+
* re-decodes the same bytes via `RecordBatchReader.from`). Without
84+
* this, the converter would re-throw on `Duration` because it never
85+
* sees the patched bytes.
86+
*/
87+
export function patchIpcBytes(ipcBytes: Buffer): Buffer {
88+
return rewriteDurationToInt64(ipcBytes);
89+
}
90+
7091
/**
7192
* Map an Arrow `DataType` (with optional `databricks.type_name`
7293
* metadata) onto the closest Thrift `TTypeId`.
@@ -160,6 +181,13 @@ function arrowTypeToTTypeId(field: Field<DataType>): TTypeId {
160181
const arrowType = field.type;
161182
if (DataType.isBool(arrowType)) return TTypeId.BOOLEAN_TYPE;
162183
if (DataType.isInt(arrowType)) {
184+
// Duration columns are rewritten to Int64 with a
185+
// `databricks.arrow.duration_unit` metadata marker (see
186+
// `SeaArrowIpcDurationFix.ts`). Surface them as INTERVAL_DAY_TIME
187+
// so the converter formats them back into the thrift string form.
188+
if (arrowType.bitWidth === 64 && field.metadata.has(DURATION_UNIT_METADATA_KEY)) {
189+
return TTypeId.INTERVAL_DAY_TIME_TYPE;
190+
}
163191
switch (arrowType.bitWidth) {
164192
case 8:
165193
return TTypeId.TINYINT_TYPE;
@@ -182,6 +210,15 @@ function arrowTypeToTTypeId(field: Field<DataType>): TTypeId {
182210
if (DataType.isBinary(arrowType)) return TTypeId.BINARY_TYPE;
183211
if (DataType.isDate(arrowType)) return TTypeId.DATE_TYPE;
184212
if (DataType.isTimestamp(arrowType)) return TTypeId.TIMESTAMP_TYPE;
213+
// Native Arrow Interval types. The server-side INTERVAL YEAR-MONTH
214+
// (and the legacy IntervalDayTime variant) come through with type
215+
// id 11 / -25 / -26 — apache-arrow@13 surfaces them as `Int32Array`
216+
// pairs which the converter formats to thrift's `"Y-M"` / day-time
217+
// strings.
218+
if (DataType.isInterval(arrowType)) {
219+
// unit 0 = YEAR_MONTH, unit 1 = DAY_TIME, unit 2 = MONTH_DAY_NANO
220+
return arrowType.unit === 0 ? TTypeId.INTERVAL_YEAR_MONTH_TYPE : TTypeId.INTERVAL_DAY_TIME_TYPE;
221+
}
185222
if (DataType.isList(arrowType)) return TTypeId.ARRAY_TYPE;
186223
if (DataType.isMap(arrowType)) return TTypeId.MAP_TYPE;
187224
if (DataType.isStruct(arrowType)) return TTypeId.STRUCT_TYPE;

0 commit comments

Comments
 (0)