Skip to content

Add ResultSet.binaryStream() for binary formats and rawStream() passthrough#778

Open
Copilot wants to merge 3 commits into
mainfrom
copilot/fix-parquet-message-length-parsing
Open

Add ResultSet.binaryStream() for binary formats and rawStream() passthrough#778
Copilot wants to merge 3 commits into
mainfrom
copilot/fix-parquet-message-length-parsing

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented May 29, 2026

Summary

ResultSet.stream() splits the response body on newline bytes (0x0a) and detects mid-stream exceptions via a bare \r\n heuristic. On binary formats like Parquet, incidental \r\n byte sequences in the payload are misread as the exception marker, throwing a spurious there was an error in the stream; failed to parse the message length (issue #607).

This now adds ResultSet.binaryStream() as the exception-aware API for binary query results, while ResultSet.rawStream() returns the consumed underlying response stream as-is for low-level use cases.

const rs = await client.query({ query, query_params, format: 'Parquet' })
for await (const chunk of rs.binaryStream()) {
  // chunk is a Buffer (Node) / Uint8Array (Web) of raw bytes; binary payload is intact
}
  • RawStreamExceptionDetector (client-common/utils/stream.ts): stateful detector matching the full \r\n__exception__\r\n<tag> start marker (<tag> from the x-clickhouse-exception-tag header) instead of a bare \r\n. Withholds a trailing markerLen-1 byte window so markers split across chunks are still caught; data before the marker passes through untouched, bytes after are buffered and parsed via extractErrorAtTheEndOfChunk on flush.
  • BaseResultSet.binaryStream(): new exception-aware binary streaming method for query() results; the result-set interface now distinguishes between the binary stream type and the raw underlying stream type.
  • BaseResultSet.rawStream(): new low-level passthrough returning the consumed underlying transport stream/socket as-is, with JSDoc describing that trailing ClickHouse exception bytes are not parsed for the caller.
  • Node & Web ResultSet: implement binaryStream() via a passthrough transform that runs chunks through the detector, while rawStream() returns the original consumed stream unchanged.
  • In Node.js, the binary transform now uses zero-copy Buffer views for detected chunks to avoid unnecessary allocations on large binary payloads.

Notes for reviewers:

  • stream() is intentionally left unchanged; binaryStream() is opt-in to avoid breaking existing row-based consumers.
  • rawStream() is intended for callers that explicitly want the exact underlying response stream and are prepared to handle any trailing exception bytes themselves.
  • Verified end-to-end against ClickHouse 26.6: reproduced the original error with stream(), confirmed binaryStream() returns intact Parquet (valid PAR1 framing) despite 0x0d0a bytes in the payload.

Checklist

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Comment thread packages/client-web/src/result_set.ts Fixed
Copilot AI changed the title [WIP] Fix error while parsing message length in parquet streaming Add ResultSet.rawStream() for binary formats (Parquet/ORC/Arrow/Native) May 29, 2026
Copilot AI requested a review from peter-leonov-ch May 29, 2026 23:25
@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

❌ Patch coverage is 88.76404% with 10 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
packages/client-common/src/utils/stream.ts 88.88% 3 Missing and 2 partials ⚠️
packages/client-web/src/result_set.ts 81.81% 2 Missing and 2 partials ⚠️
packages/client-node/src/result_set.ts 95.45% 0 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@peter-leonov-ch peter-leonov-ch marked this pull request as ready for review May 31, 2026 18:08
Copilot AI review requested due to automatic review settings May 31, 2026 18:08
@peter-leonov-ch peter-leonov-ch requested a review from mshustov as a code owner May 31, 2026 18:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new ResultSet.rawStream() API to stream binary response formats (Parquet/ORC/Arrow/Native) byte-for-byte without newline row-splitting, while still detecting genuine mid-stream server exceptions using a stronger exception-marker detector.

Changes:

  • Introduces RawStreamExceptionDetector in client-common to detect full \r\n__exception__\r\n<tag> markers (instead of the existing bare-CRLF heuristic).
  • Extends BaseResultSet with rawStream() and implements it in both Node and Web ResultSet.
  • Adds unit/integration coverage for regression case (#607) and documents the change in CHANGELOG.md.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
packages/client-web/src/result_set.ts Implements ResultSet.rawStream() for Web streams using RawStreamExceptionDetector.
packages/client-web/tests/unit/web_result_set.test.ts Adds unit tests for rawStream() behavior and exception propagation (Web).
packages/client-node/src/result_set.ts Implements ResultSet.rawStream() for Node streams via a transform + detector.
packages/client-node/tests/unit/node_result_set.test.ts Adds unit tests for rawStream() behavior and exception propagation (Node).
packages/client-node/tests/integration/node_streaming_e2e.test.ts Adds an e2e Parquet regression test consuming Parquet via rawStream().
packages/client-common/src/utils/stream.ts Adds RawStreamExceptionDetector and helper utilities for marker detection.
packages/client-common/src/result.ts Extends BaseResultSet interface with rawStream() and documents correct usage for binary formats.
packages/client-common/src/index.ts Re-exports RawStreamExceptionDetector (+ type export).
packages/client-common/tests/unit/stream_utils.test.ts Adds unit tests for the new detector (including chunk-boundary cases).
CHANGELOG.md Adds a bug-fix entry describing the new rawStream() API and regression fix.

Comment on lines +243 to +245
const data = detector.push(chunk)
callback(null, data.length > 0 ? Buffer.from(data) : undefined)
},
Comment on lines +251 to +255
const { data, error } = detector.flush()
if (error !== undefined) {
callback(error)
return
}
'Schema<{ 0: id: Uint64, 1: name: Utf8, 2: sku: List<Uint8> }>',
)
const actualParquetData: unknown[] = []
table.toArray().map((v) => {
Comment thread CHANGELOG.md Outdated

## Bug Fixes

- Added `ResultSet.rawStream()` for streaming raw response bytes without splitting the body into rows. This is the correct way to consume binary formats such as `Parquet`, `ORC`, `Arrow`, `ArrowStream`, and `Native` via `query().stream()`. Previously, calling `ResultSet.stream()` on a binary format would split the payload on newline (`0x0a`) bytes and could misinterpret random `\r\n` byte sequences inside the binary data as the mid-stream exception marker, producing a spurious `there was an error in the stream; failed to parse the message length` error. Unlike consuming the stream via `exec()`, `rawStream()` still detects and propagates a genuine mid-stream exception appended by the server (matching the full `\r\n__exception__\r\n<tag>` marker instead of a bare `\r\n` heuristic). ([#607])
@peter-leonov-ch
Copy link
Copy Markdown
Collaborator

@copilot This is dope, you've got the root case correctly and prototyped a working parser. Let's iterate by renaming the new rawStream() method into a binaryStream() and also adding a new method rawStream() which just return the raw .consume()'ed socket (add descriptive JSDoc explaining the pitfalls.

Comment on lines +201 to +229
const transform = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
if (chunk === null) {
controller.terminate()
return
}
if (detector === undefined) {
controller.enqueue(chunk)
return
}
const data = detector.push(chunk)
if (data.length > 0) {
controller.enqueue(data)
}
},
flush: (controller) => {
if (detector === undefined) {
return
}
const { data, error } = detector.flush()
if (error !== undefined) {
controller.error(error)
return
}
if (data.length > 0) {
controller.enqueue(data)
}
},
})
Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 31, 2026

@copilot This is dope, you've got the root case correctly and prototyped a working parser. Let's iterate by renaming the new rawStream() method into a binaryStream() and also adding a new method `...

Addressed in 0c7c650: the exception-aware query byte stream is now binaryStream(), and rawStream() now returns the consumed underlying response stream as-is with JSDoc calling out that trailing server exception bytes are not parsed for you.

Copilot AI changed the title Add ResultSet.rawStream() for binary formats (Parquet/ORC/Arrow/Native) Add ResultSet.binaryStream() for binary formats and rawStream() passthrough May 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[parquet]: failed to parse the message length

4 participants