Add ResultSet.binaryStream() for binary formats and rawStream() passthrough#778
Add ResultSet.binaryStream() for binary formats and rawStream() passthrough#778Copilot wants to merge 3 commits into
Conversation
|
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds a new ResultSet.rawStream() API to stream binary response formats (Parquet/ORC/Arrow/Native) byte-for-byte without newline row-splitting, while still detecting genuine mid-stream server exceptions using a stronger exception-marker detector.
Changes:
- Introduces
RawStreamExceptionDetectorinclient-commonto detect full\r\n__exception__\r\n<tag>markers (instead of the existing bare-CRLF heuristic). - Extends
BaseResultSetwithrawStream()and implements it in both Node and WebResultSet. - Adds unit/integration coverage for regression case (#607) and documents the change in
CHANGELOG.md.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/client-web/src/result_set.ts | Implements ResultSet.rawStream() for Web streams using RawStreamExceptionDetector. |
| packages/client-web/tests/unit/web_result_set.test.ts | Adds unit tests for rawStream() behavior and exception propagation (Web). |
| packages/client-node/src/result_set.ts | Implements ResultSet.rawStream() for Node streams via a transform + detector. |
| packages/client-node/tests/unit/node_result_set.test.ts | Adds unit tests for rawStream() behavior and exception propagation (Node). |
| packages/client-node/tests/integration/node_streaming_e2e.test.ts | Adds an e2e Parquet regression test consuming Parquet via rawStream(). |
| packages/client-common/src/utils/stream.ts | Adds RawStreamExceptionDetector and helper utilities for marker detection. |
| packages/client-common/src/result.ts | Extends BaseResultSet interface with rawStream() and documents correct usage for binary formats. |
| packages/client-common/src/index.ts | Re-exports RawStreamExceptionDetector (+ type export). |
| packages/client-common/tests/unit/stream_utils.test.ts | Adds unit tests for the new detector (including chunk-boundary cases). |
| CHANGELOG.md | Adds a bug-fix entry describing the new rawStream() API and regression fix. |
| const data = detector.push(chunk) | ||
| callback(null, data.length > 0 ? Buffer.from(data) : undefined) | ||
| }, |
| const { data, error } = detector.flush() | ||
| if (error !== undefined) { | ||
| callback(error) | ||
| return | ||
| } |
| 'Schema<{ 0: id: Uint64, 1: name: Utf8, 2: sku: List<Uint8> }>', | ||
| ) | ||
| const actualParquetData: unknown[] = [] | ||
| table.toArray().map((v) => { |
|
|
||
| ## Bug Fixes | ||
|
|
||
| - Added `ResultSet.rawStream()` for streaming raw response bytes without splitting the body into rows. This is the correct way to consume binary formats such as `Parquet`, `ORC`, `Arrow`, `ArrowStream`, and `Native` via `query().stream()`. Previously, calling `ResultSet.stream()` on a binary format would split the payload on newline (`0x0a`) bytes and could misinterpret random `\r\n` byte sequences inside the binary data as the mid-stream exception marker, producing a spurious `there was an error in the stream; failed to parse the message length` error. Unlike consuming the stream via `exec()`, `rawStream()` still detects and propagates a genuine mid-stream exception appended by the server (matching the full `\r\n__exception__\r\n<tag>` marker instead of a bare `\r\n` heuristic). ([#607]) |
|
@copilot This is dope, you've got the root case correctly and prototyped a working parser. Let's iterate by renaming the new |
| const transform = new TransformStream<Uint8Array, Uint8Array>({ | ||
| transform: (chunk, controller) => { | ||
| if (chunk === null) { | ||
| controller.terminate() | ||
| return | ||
| } | ||
| if (detector === undefined) { | ||
| controller.enqueue(chunk) | ||
| return | ||
| } | ||
| const data = detector.push(chunk) | ||
| if (data.length > 0) { | ||
| controller.enqueue(data) | ||
| } | ||
| }, | ||
| flush: (controller) => { | ||
| if (detector === undefined) { | ||
| return | ||
| } | ||
| const { data, error } = detector.flush() | ||
| if (error !== undefined) { | ||
| controller.error(error) | ||
| return | ||
| } | ||
| if (data.length > 0) { | ||
| controller.enqueue(data) | ||
| } | ||
| }, | ||
| }) |
Addressed in 0c7c650: the exception-aware query byte stream is now |
Summary
ResultSet.stream()splits the response body on newline bytes (0x0a) and detects mid-stream exceptions via a bare\r\nheuristic. On binary formats likeParquet, incidental\r\nbyte sequences in the payload are misread as the exception marker, throwing a spuriousthere was an error in the stream; failed to parse the message length(issue #607).This now adds
ResultSet.binaryStream()as the exception-aware API for binary query results, whileResultSet.rawStream()returns the consumed underlying response stream as-is for low-level use cases.RawStreamExceptionDetector(client-common/utils/stream.ts): stateful detector matching the full\r\n__exception__\r\n<tag>start marker (<tag>from thex-clickhouse-exception-tagheader) instead of a bare\r\n. Withholds a trailingmarkerLen-1byte window so markers split across chunks are still caught; data before the marker passes through untouched, bytes after are buffered and parsed viaextractErrorAtTheEndOfChunkon flush.BaseResultSet.binaryStream(): new exception-aware binary streaming method forquery()results; the result-set interface now distinguishes between the binary stream type and the raw underlying stream type.BaseResultSet.rawStream(): new low-level passthrough returning the consumed underlying transport stream/socket as-is, with JSDoc describing that trailing ClickHouse exception bytes are not parsed for the caller.ResultSet: implementbinaryStream()via a passthrough transform that runs chunks through the detector, whilerawStream()returns the original consumed stream unchanged.Bufferviews for detected chunks to avoid unnecessary allocations on large binary payloads.Notes for reviewers:
stream()is intentionally left unchanged;binaryStream()is opt-in to avoid breaking existing row-based consumers.rawStream()is intended for callers that explicitly want the exact underlying response stream and are prepared to handle any trailing exception bytes themselves.stream(), confirmedbinaryStream()returns intact Parquet (validPAR1framing) despite0x0d0abytes in the payload.Checklist