Skip to content

feat: page-stream → RecordBatch decoder (PR-6a)#6407

Draft
g-talbot wants to merge 2 commits intogtt/column-page-stream-traitfrom
gtt/parquet-page-decoder
Draft

feat: page-stream → RecordBatch decoder (PR-6a)#6407
g-talbot wants to merge 2 commits intogtt/column-page-stream-traitfrom
gtt/parquet-page-decoder

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 8, 2026

Summary

  • Bridges PR-4's ColumnPageStream (raw compressed pages) to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays). PR-6b's merge engine drains each input row-group through this.
  • Adds header_bytes: Bytes to Page and a new StreamDecoder whose next_rg() yields one RecordBatch per input row group, idempotent at EOF.
  • Per-call memory cost: one row group's reconstructed column-chunk bytes (~tens of MB); the next call drops the previous buffer.
  • Pure plumbing — no merge logic.

How byte-exact reconstruction works

Each Page now carries the original Thrift-compact bytes for its header (header_bytes) alongside the parsed PageHeader and the raw compressed body (bytes). Concatenating header_bytes ++ bytes for every page in a column chunk reproduces the on-disk byte layout. The decoder allocates one buffer per RG sized to max(col_byte_range_end), places each column chunk at its original offset, and asks ParquetRecordBatchReaderBuilder::new_with_metadata (built on the original ParquetMetaData) to read just that RG via with_row_groups([rg_idx]).

We could in principle re-encode parsed PageHeader structs (Thrift-compact is deterministic for a given struct), but "deterministic in practice" is not a contract — encoder-version drift inside the compactor would silently corrupt outputs. Carrying the original header bytes sidesteps the problem.

Stack

main
└── gtt/parquet-streaming-base (= main ∪ PR-2 #6384 ∪ PR-4 #6386)
    └── gtt/column-page-stream-trait (PR-5a #6406)
        ├── gtt/legacy-input-adapter (PR-5)
        └── gtt/parquet-page-decoder ← PR-6a (this PR)
            └── gtt/streaming-merge-engine-merger ← PR-6b (single-RG merge — next)
                └── gtt/streaming-merge-engine-multi-rg ← PR-6c (multi-RG-at-metric_name)

Test plan

  • 9 new tests in page_decoder.rs: single/multi-RG drains, multi-page columns, dict columns, null preservation, codec roundtrip (uncompressed/snappy/zstd), idempotent EOF, byte-exact reconstruction proof, I/O failure → PageDecodeError::PageStream
  • All 14 existing streaming_reader.rs tests still pass with the header_bytes field addition (401/401 crate-wide)
  • cargo clippy --workspace --all-features --tests with -Dwarnings
  • cargo +nightly fmt --all -- --check
  • cargo doc --no-deps -p quickwit-parquet-engine with -Dwarnings
  • cargo machete
  • bash quickwit/scripts/check_license_headers.sh
  • typos clean on changed files

🤖 Generated with Claude Code

@g-talbot g-talbot force-pushed the gtt/column-page-stream-trait branch from af78c8f to 2714921 Compare May 8, 2026 20:49
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 4bc7122 to 5d5c4b1 Compare May 8, 2026 20:49
@g-talbot g-talbot force-pushed the gtt/column-page-stream-trait branch from 2714921 to 61b6310 Compare May 8, 2026 21:27
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from 5d5c4b1 to e660f78 Compare May 8, 2026 21:28
@g-talbot g-talbot force-pushed the gtt/column-page-stream-trait branch from 61b6310 to 4ae07e7 Compare May 8, 2026 21:46
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from e660f78 to fcfb854 Compare May 8, 2026 21:46
@g-talbot g-talbot force-pushed the gtt/column-page-stream-trait branch from 4ae07e7 to d43186d Compare May 9, 2026 00:07
g-talbot and others added 2 commits May 8, 2026 20:08
Bridges PR-4's ColumnPageStream (raw compressed pages in storage order)
to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays).
PR-6's streaming merge engine drains each input row-group through this
to keep per-RG memory bounded — only one input RG worth of bytes is
materialised at a time, rather than the whole file.

Approach: reconstruct one row group's column-chunk byte layout in a
buffer (column chunks placed at their original offsets, gaps zero-
padded), wrap the buffer in `Bytes`, and feed it to
`ParquetRecordBatchReaderBuilder::new_with_metadata` with
`with_row_groups([rg_idx])`. Byte-exact reconstruction by carrying
each page's original Thrift-compact `header_bytes` through PR-4's
streaming reader — no re-encoding, so encoder-version drift inside
the compactor cannot silently corrupt outputs.

Adds `header_bytes: Bytes` to `Page` and captures the drained
header bytes inside `parse_page_header_streaming`. New
`StreamDecoder` borrows the stream and exposes `next_rg()` returning
one `RecordBatch` per input row group, idempotent at EOF.

Tests (9, all passing): single-RG and multi-RG drains, multi-page
columns, dict columns, null preservation, compression codec roundtrip
(uncompressed/snappy/zstd — LZ4 not enabled in our parquet feature
set), idempotent EOF, byte-exact reconstruction proof, and I/O failure
surfacing as PageDecodeError::PageStream rather than masked as decode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI nightly rustfmt (newer than my local at the time of the original
push) wraps `write_parquet(...)` onto multiple lines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-page-decoder branch from fcfb854 to 736ce0e Compare May 9, 2026 00:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant