|
| 1 | +// Copyright (c) 2026 Databricks, Inc. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +import { RecordBatchReader, Schema, Field, DataType, TypeMap } from 'apache-arrow'; |
| 16 | +import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types'; |
| 17 | + |
| 18 | +/** |
| 19 | + * Field metadata key used by the kernel to attach the original Databricks |
| 20 | + * SQL type name to each Arrow field. See `databricks-sql-kernel/src/reader/mod.rs`. |
| 21 | + */ |
| 22 | +const DATABRICKS_TYPE_NAME = 'databricks.type_name'; |
| 23 | + |
| 24 | +/** |
| 25 | + * Decode an Arrow IPC stream payload (schema header + zero-or-more |
| 26 | + * record-batch messages) into its row count. |
| 27 | + * |
| 28 | + * Returns `{ schema, rowCount }`. The schema is left intact as the |
| 29 | + * apache-arrow Schema object so callers can reuse it; the rowCount is |
| 30 | + * the sum of `RecordBatch.numRows` across every record-batch message |
| 31 | + * in the stream. |
| 32 | + * |
| 33 | + * Why we parse upfront: `ArrowResultConverter` consumes `ArrowBatch` |
| 34 | + * objects which carry an explicit `rowCount`. The kernel's IPC payload |
| 35 | + * does not carry a separate count — only per-RecordBatch numRows. We |
| 36 | + * walk the messages once to sum them so the converter sees the same |
| 37 | + * shape as the thrift path (`ArrowResultHandler.fetchNext` at |
| 38 | + * `lib/result/ArrowResultHandler.ts:55`). |
| 39 | + * |
| 40 | + * Re-parsing inside the converter is unavoidable because `RecordBatch` |
| 41 | + * instances created here cannot be passed across the converter's |
| 42 | + * `Buffer[]` boundary without rewriting the converter. The IPC bytes |
| 43 | + * themselves are small enough (one record batch per call) that the |
| 44 | + * double-parse cost is negligible for M0. |
| 45 | + */ |
| 46 | +export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; rowCount: number } { |
| 47 | + const reader = RecordBatchReader.from<TypeMap>(ipcBytes); |
| 48 | + // Eagerly open so `schema` is populated. |
| 49 | + reader.open(); |
| 50 | + const { schema } = reader; |
| 51 | + |
| 52 | + let rowCount = 0; |
| 53 | + // Iterate all record batches in the stream and sum row counts. |
| 54 | + for (const batch of reader) { |
| 55 | + rowCount += batch.numRows; |
| 56 | + } |
| 57 | + return { schema, rowCount }; |
| 58 | +} |
| 59 | + |
| 60 | +/** |
| 61 | + * Decode an Arrow IPC schema payload (no record batches) into the |
| 62 | + * apache-arrow Schema object. |
| 63 | + */ |
| 64 | +export function decodeIpcSchema(ipcBytes: Buffer): Schema<TypeMap> { |
| 65 | + const reader = RecordBatchReader.from<TypeMap>(ipcBytes); |
| 66 | + reader.open(); |
| 67 | + return reader.schema; |
| 68 | +} |
| 69 | + |
| 70 | +/** |
| 71 | + * Map an Arrow `DataType` (with optional `databricks.type_name` |
| 72 | + * metadata) onto the closest Thrift `TTypeId`. |
| 73 | + * |
| 74 | + * This is the synthesis step that lets the existing |
| 75 | + * `ArrowResultConverter` Phase-2 dispatch (`convertThriftValue` in |
| 76 | + * `lib/result/utils.ts:61-98`) keep working unchanged for the SEA |
| 77 | + * path. Phase-2 keys exclusively off `TPrimitiveTypeEntry.type` per |
| 78 | + * column, so we synthesize a `TColumnDesc` whose `TTypeId` matches the |
| 79 | + * server-emitted Arrow type as closely as possible. |
| 80 | + * |
| 81 | + * Resolution order: |
| 82 | + * 1. The kernel attaches `databricks.type_name` (e.g. "DECIMAL", |
| 83 | + * "INTERVAL", "STRUCT") to each field's metadata. Prefer that when |
| 84 | + * present — it carries the original SQL semantic that the Arrow |
| 85 | + * type alone can lose (e.g. INTERVAL → Utf8 with metadata). |
| 86 | + * 2. Fall back to the Arrow `DataType.typeId` for primitive types. |
| 87 | + * |
| 88 | + * This matches the JDBC and Python drivers' policy of trusting the |
| 89 | + * server's logical type assignment over the wire-level Arrow encoding. |
| 90 | + */ |
| 91 | +function arrowTypeToTTypeId(field: Field<DataType>): TTypeId { |
| 92 | + const typeName = field.metadata.get(DATABRICKS_TYPE_NAME)?.toUpperCase(); |
| 93 | + |
| 94 | + switch (typeName) { |
| 95 | + case 'BOOLEAN': |
| 96 | + return TTypeId.BOOLEAN_TYPE; |
| 97 | + case 'TINYINT': |
| 98 | + case 'BYTE': |
| 99 | + return TTypeId.TINYINT_TYPE; |
| 100 | + case 'SMALLINT': |
| 101 | + case 'SHORT': |
| 102 | + return TTypeId.SMALLINT_TYPE; |
| 103 | + case 'INT': |
| 104 | + case 'INTEGER': |
| 105 | + return TTypeId.INT_TYPE; |
| 106 | + case 'BIGINT': |
| 107 | + case 'LONG': |
| 108 | + return TTypeId.BIGINT_TYPE; |
| 109 | + case 'FLOAT': |
| 110 | + case 'REAL': |
| 111 | + return TTypeId.FLOAT_TYPE; |
| 112 | + case 'DOUBLE': |
| 113 | + return TTypeId.DOUBLE_TYPE; |
| 114 | + case 'STRING': |
| 115 | + return TTypeId.STRING_TYPE; |
| 116 | + case 'VARCHAR': |
| 117 | + return TTypeId.VARCHAR_TYPE; |
| 118 | + case 'CHAR': |
| 119 | + return TTypeId.CHAR_TYPE; |
| 120 | + case 'BINARY': |
| 121 | + return TTypeId.BINARY_TYPE; |
| 122 | + case 'DATE': |
| 123 | + return TTypeId.DATE_TYPE; |
| 124 | + case 'TIMESTAMP': |
| 125 | + case 'TIMESTAMP_NTZ': |
| 126 | + return TTypeId.TIMESTAMP_TYPE; |
| 127 | + case 'DECIMAL': |
| 128 | + return TTypeId.DECIMAL_TYPE; |
| 129 | + case 'INTERVAL': |
| 130 | + case 'INTERVAL DAY': |
| 131 | + case 'INTERVAL DAY TO HOUR': |
| 132 | + case 'INTERVAL DAY TO MINUTE': |
| 133 | + case 'INTERVAL DAY TO SECOND': |
| 134 | + case 'INTERVAL HOUR': |
| 135 | + case 'INTERVAL HOUR TO MINUTE': |
| 136 | + case 'INTERVAL HOUR TO SECOND': |
| 137 | + case 'INTERVAL MINUTE': |
| 138 | + case 'INTERVAL MINUTE TO SECOND': |
| 139 | + case 'INTERVAL SECOND': |
| 140 | + return TTypeId.INTERVAL_DAY_TIME_TYPE; |
| 141 | + case 'INTERVAL YEAR': |
| 142 | + case 'INTERVAL YEAR TO MONTH': |
| 143 | + case 'INTERVAL MONTH': |
| 144 | + return TTypeId.INTERVAL_YEAR_MONTH_TYPE; |
| 145 | + case 'ARRAY': |
| 146 | + return TTypeId.ARRAY_TYPE; |
| 147 | + case 'MAP': |
| 148 | + return TTypeId.MAP_TYPE; |
| 149 | + case 'STRUCT': |
| 150 | + return TTypeId.STRUCT_TYPE; |
| 151 | + case 'NULL': |
| 152 | + case 'VOID': |
| 153 | + return TTypeId.NULL_TYPE; |
| 154 | + default: |
| 155 | + break; |
| 156 | + } |
| 157 | + |
| 158 | + // Fall back to Arrow's own type id when no databricks metadata is set |
| 159 | + // (e.g. unit tests constructing batches without metadata). |
| 160 | + const arrowType = field.type; |
| 161 | + if (DataType.isBool(arrowType)) return TTypeId.BOOLEAN_TYPE; |
| 162 | + if (DataType.isInt(arrowType)) { |
| 163 | + switch (arrowType.bitWidth) { |
| 164 | + case 8: |
| 165 | + return TTypeId.TINYINT_TYPE; |
| 166 | + case 16: |
| 167 | + return TTypeId.SMALLINT_TYPE; |
| 168 | + case 32: |
| 169 | + return TTypeId.INT_TYPE; |
| 170 | + case 64: |
| 171 | + return TTypeId.BIGINT_TYPE; |
| 172 | + default: |
| 173 | + return TTypeId.BIGINT_TYPE; |
| 174 | + } |
| 175 | + } |
| 176 | + if (DataType.isFloat(arrowType)) { |
| 177 | + // arrow Float precision: 16=HALF, 32=SINGLE, 64=DOUBLE |
| 178 | + return arrowType.precision === 2 ? TTypeId.DOUBLE_TYPE : TTypeId.FLOAT_TYPE; |
| 179 | + } |
| 180 | + if (DataType.isDecimal(arrowType)) return TTypeId.DECIMAL_TYPE; |
| 181 | + if (DataType.isUtf8(arrowType)) return TTypeId.STRING_TYPE; |
| 182 | + if (DataType.isBinary(arrowType)) return TTypeId.BINARY_TYPE; |
| 183 | + if (DataType.isDate(arrowType)) return TTypeId.DATE_TYPE; |
| 184 | + if (DataType.isTimestamp(arrowType)) return TTypeId.TIMESTAMP_TYPE; |
| 185 | + if (DataType.isList(arrowType)) return TTypeId.ARRAY_TYPE; |
| 186 | + if (DataType.isMap(arrowType)) return TTypeId.MAP_TYPE; |
| 187 | + if (DataType.isStruct(arrowType)) return TTypeId.STRUCT_TYPE; |
| 188 | + if (DataType.isNull(arrowType)) return TTypeId.NULL_TYPE; |
| 189 | + |
| 190 | + return TTypeId.STRING_TYPE; |
| 191 | +} |
| 192 | + |
| 193 | +/** |
| 194 | + * Synthesize a Thrift `TTableSchema` from an Arrow schema decoded out |
| 195 | + * of the kernel's IPC stream. Used by `SeaOperationBackend.getResultMetadata` |
| 196 | + * to drive `ArrowResultConverter.convertThriftTypes` (Phase 2) without |
| 197 | + * changing that code. |
| 198 | + */ |
| 199 | +export function arrowSchemaToThriftSchema(arrowSchema: Schema<TypeMap>): TTableSchema { |
| 200 | + const columns = arrowSchema.fields.map((field, index) => { |
| 201 | + const primitiveEntry: TPrimitiveTypeEntry = { |
| 202 | + type: arrowTypeToTTypeId(field), |
| 203 | + }; |
| 204 | + return { |
| 205 | + columnName: field.name, |
| 206 | + typeDesc: { |
| 207 | + types: [ |
| 208 | + { |
| 209 | + primitiveEntry, |
| 210 | + }, |
| 211 | + ], |
| 212 | + }, |
| 213 | + position: index + 1, |
| 214 | + }; |
| 215 | + }); |
| 216 | + return { columns }; |
| 217 | +} |
0 commit comments