feat: column-major streaming Parquet reader primitive#6386
Open
feat: column-major streaming Parquet reader primitive#6386
Conversation
Issues exactly two streaming reads per input file in the common path: one footer GET (via RemoteByteSource::get_slice) and one body GET (via RemoteByteSource::get_slice_stream). Yields per-column-chunk raw bytes in storage order (RG-major, col-major-in-RG) so callers can decode pages with parquet's existing SerializedPageReader at their own pace. Symmetric with PR-2's StreamingParquetWriter. Together the two PRs form the I/O substrate for PR-6's streaming column-major merge engine and PR-7's wiring into ParquetMergeExecutor. quickwit-parquet-engine deliberately does not depend on quickwit-storage — that would invert the layering, since storage pulls in cloud-vendor SDKs this crate has no business linking against. Instead the reader takes a minimal RemoteByteSource trait (file_size + get_slice + get_slice_stream); PR-6 will provide a ~10-line adapter from quickwit_storage::Storage in quickwit-indexing. Page indexes (offset_index/column_index) are NOT loaded into the parsed ParquetMetaData. They live in the file body, not the footer; loading them either requires extra GETs or extends the body GET range to start before the first column chunk. PR-6's direct page copy reads column_chunk.offset_index_offset/length and decodes the index from the body bytes when it needs page boundaries — no need to materialize the full ParquetOffsetIndex structure up front. 12 tests cover the four contracts (PR-A two-GETs, PR-B metadata equivalence vs sync reader, PR-C bytes round-trip on single-RG and multi-RG files, PR-D storage-order advance), plus footer body range correctness, KV metadata round-trip, EOF idempotency, out-of-order error path, prefetch retry, and truncated-file rejection. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PR-4 of the streaming-merge stack. Symmetric with PR-2 (#6384) on the
read side: lands a primitive in
quickwit-parquet-engine/src/storage/streaming_reader.rsthat takes a minimal
RemoteByteSourcetrait and produces per-column-chunk
Bytesin storage order via exactly one footer GET + onebody GET in the common path.
No production callers in this PR. PR-5 will wrap a legacy adapter for
multi-RG files with no
qh.rg_partition_prefix_lenclaim; PR-6'sstreaming merge engine consumes both shapes through the same trait;
PR-7 wires
ParquetMergeExecutor.Caller contract
ColumnChunkBytes::bytesis exactly the on-disk column chunk'scompressed_sizebytes, ref-counted and cheap to clone. The mergeengine in PR-6 will wrap whole-file bytes (received from this reader
as a sequence of
Bytes) forSerializedPageReader::new— thatconstructor uses file-absolute offsets, so the natural integration is
"reader yields bytes; merge engine assembles them and decodes pages
with existing parquet APIs."
Layering decision
quickwit-parquet-enginedeliberately does NOT take aquickwit-storagedependency. Storage pulls in cloud-vendor SDKs thatthis crate has no business linking against, and the layering would
invert (storage is a higher concern than the parquet data model).
Instead the reader takes a minimal
RemoteByteSourcetrait — threemethods:
file_size,get_slice,get_slice_stream. The mergeactor in
quickwit-indexingwill add a thin adapter toquickwit_storage::Storagein PR-6, ~10 lines.Page indexes
Not loaded into the parsed
ParquetMetaDatain PR-4. Rationale: pageindexes (offset_index / column_index) live in the file body, not the
footer; loading them either requires extra GETs or requires the body
GET range to extend earlier than the first column chunk. PR-6's
direct page copy reads
column_chunk.offset_index_offset()/offset_index_length()from the column metadata and decodes theoffset index from the body bytes lazily when it needs page boundaries
— no need to materialize the full
ParquetOffsetIndexstructure upfront.
This is purely a perf decision; the metadata structure is unchanged
from
ParquetRecordBatchReaderBuilder::try_new(...).metadata()modulo the absent page indexes.
Footer prefetch
Configurable via
StreamingReaderConfig::footer_prefetch_bytes,defaulting to 256 KiB. If the configured prefetch is smaller than
the actual footer (rare; would require thousands of columns or
extreme page counts), the reader transparently issues one retry GET
sized to the parser-reported needed length. The PR-A "two-GETs"
contract still holds in the common path; the rare retry is opt-in
via configuration.
Tests (12)
PR-A (two-GETs):
test_footer_get_only_at_construction— only one slice GET, zero stream GETstest_two_gets_for_full_consumption— exactly one slice + one stream after draintest_body_get_starts_at_first_column_chunk_offset— body range =[first_col.offset .. last_col.end]PR-B (metadata equivalence vs sync reader):
test_metadata_matches_sync_reader— schema, KV, sorting_columns, num_rowsqh.*round-trip folded into above)PR-C (bytes round-trip):
test_column_chunk_bytes_round_trip_single_rg— yielded bytes ≡ file slice atbyte_range()test_column_chunk_bytes_round_trip_multi_rg— same across multiple row groupstest_column_chunk_pages_decode_through_full_file— sanity-check that bytes feedSerializedPageReadercleanlyPR-D (order):
test_storage_order_advance— observed(rg, col)pairs lex-ordered, full coverageEdge cases:
test_eof_idempotenttest_small_prefetch_retries_with_correct_sizetest_truncated_file_returns_footer_too_largetest_out_of_order_yields_structured_error(defensive — public API can't normally hit it)Position in stack
qh.rg_partition_prefix_lenmarkerRemoteByteSourcetrait)ParquetMergeExecutor; delete downloaderPR-4 is independent of PR-1 / PR-2 / PR-3 review — it branches off
mainand shares no code with them.Test plan
cargo +nightly fmt(per files touched)RUSTFLAGS=\"-Dwarnings --cfg tokio_unstable\" cargo clippy -p quickwit-parquet-engine --testscargo doc -p quickwit-parquet-engine --no-depscargo machetebash quickwit/scripts/check_license_headers.shbash quickwit/scripts/check_log_format.shcargo nextest run -p quickwit-parquet-engine --all-features— 368 tests, all pass🤖 Generated with Claude Code