fix(parquet): bound data page byte size for large variable-width values#9972
fix(parquet): bound data page byte size for large variable-width values#9972adriangb wants to merge 6 commits into
Conversation
|
run benchmark arrow_writer |
393ead0 to
4823429
Compare
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (4823429) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer |
0fd6dcb to
24b83c7
Compare
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (24b83c7) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (24b83c7) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (70dc497) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (bbe2b7e) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (bbe2b7e) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
Have you considered making the batch size configurable per column? |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
Yes, that may be a simpler approach. But I'm hoping we can get to a place where users don't have to think about / configure this. Given they gave us a page size limit it'd be nice if we can always adhere to that... |
| /// push a page far past the configured page byte limit before the | ||
| /// post-write size check fires. | ||
| #[inline] | ||
| fn byte_size(&self) -> usize { |
There was a problem hiding this comment.
This seems to duplicate dict_encoding_size. Also, #9700 wants to rename dict_encoding_size and instead implement it pretty much the same way as here.
|
Another thought...maybe add another chunker like the CDC work added ( ). If we compute batches up front when we know the shape of the data that might be faster 🤷 |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (145ea5d) to 48fa8a7 (merge-base) diff File an issue against this benchmark runner |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (11c9d51) to e28fd0d (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
@etseidl sorry to bug you again. I've re-stacked the commits to make the diff more reviewable, a good chunk of the diff is regression tests and benchmarks which I've tried to split out into their own commits to make things more reviewable. Let me know if there's anything else I can do to help make this more palatable. |
|
Sorry @adriangb, I've been too slammed recently to follow along on this one. I'll try to carve out some time to do a deep dive. Thanks for your patience 🙏 |
|
No worries, thanks for taking the time. I've seen all of the amazing work you've been doing for Parquet itself! |
| /// for the case where individual values are small enough that the byte-budget | ||
| /// based sub-batch sizing in `write_batch_internal` should always resolve to | ||
| /// the full chunk (no granular splitting, no regression vs. current behavior). | ||
| fn create_short_string_bench_batch(size: usize) -> Result<RecordBatch> { |
There was a problem hiding this comment.
Let's break the bench update into a separate PR so we can see the difference in the large string case. I'm seeing a 13% slowdown vs main, but that may just be the price for getting smaller batches.
…10021) # Which issue does this PR close? Split out of #9972 per [this review comment](#9972 (comment)). # Rationale for this change #9972 makes the parquet writer's mini-batch sizing byte-budget aware so large variable-width values don't produce oversized data pages. To measure that change against a stable baseline — and in particular to see the difference in the large-string case — these benchmarks belong on `main` first. # What changes are included in this PR? Adds two BYTE_ARRAY write cases to the `arrow_writer` criterion bench: - **`short_string_non_null`** — 1M fixed-width 8-byte strings. The small-value hot path, where byte-budget-based sub-batch sizing should always resolve to the full chunk (no granular splitting, no regression). - **`large_string_non_null`** — 1024 × 256 KiB strings (256 MiB total). The large-value case: with the default 1 MiB page byte limit each value needs its own page, and a `write_batch_size` of 1024 would otherwise buffer all 256 MiB before the post-write size check runs. No library code changes — benchmarks only. # Are there any user-facing changes? No. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmark arrow_writer env:
BENCH_FILTER: large|short |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (11c9d51) to e28fd0d (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_writer env:
BENCH_FILTER: large|short |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (79507a2) to bbbe8a6 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
Interesting, on my WS the large string benches were consistently slower (probably due to earlier abandonment of the dictionary encoder). On ARM it seems to tilt the other way 🤷 Anyway, I did a first pass and I think this looks nice. I want to test it locally some with some icky files I have. Then I'll do a final pass through. Thanks @adriangb! |
|
Yeah I had to spend a lot of time messing with code structure because small layout differences cascade into measurable differences. Thanks for digging deep into this. |
| let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead; | ||
| cum = cum.saturating_add(len); | ||
| if cum > byte_budget { | ||
| return i.max(1); |
There was a problem hiding this comment.
I did some testing on a file consisting of 128b strings. If I set max page size to 64000, then I wind up with a file with a pattern of pages of size 968/540/540 values. This is because this line will return a size of 484 (floor of 64000/132). So what happens is the first mini batch of 484 is just under the 64k threshold, so we add the next 484 from the batch to get 968. That leaves 56 rows left. The next iter appends the next 484 to the 56 to get 540, and then we have 484+56 left in that batch of 1024, so we wind up writing that 540. And then repeat. If we instead just return i + 1 here, that eliminates the need for the .max(1), and also gives us a mini-batch size just over the requested threshold. Now I see a pattern of 485/539 repeating.
I wonder if there's a way to smooth this some. We can't really change the batch size being passed in, but given a batch, maybe we can add some kind of heuristic here that can figure out first that multiple mini-batches are needed then divides the batch size by the number of batches to smooth this out some. Naively something like
if cum > byte_budget {
//return (i+1).max(1);
let num_batches = 1.max(n/(i+1));
return 1.max(n / num_batches)
}This overshoots some by producing mini-batches of 512. Dividing by 3 would undershoot, but then we need two mini-batches to fill a page so that winds up overshooting even more.
There was a problem hiding this comment.
Another option given the above is do 3 batches of around 341, but change write_mini_batch to take a flag to immediately flush a page after writing. Then we wind up with more pages, but they're all under the budget.
There was a problem hiding this comment.
Good catch.
I applied your return i + 1 suggestion in f8f2a52.
I'd rather keep something simple and obviously cheap, some overshoot is okay as long as it's not unbounded as it was before, I don't think we need to be exact, best effort is still okay.
There was a problem hiding this comment.
I'd rather keep something simple and obviously cheap, some overshoot is okay as long as it's not unbounded as it was before, I don't think we need to be exact, best effort is still okay.
Sounds reasonable. I like simple :D
Add the regression tests first, before any fix (TDD). With unmodified `main` the page-bounding assertions fail: the column writer only checks the data/dictionary page byte limit *after* each `write_batch_size` mini-batch, so large variable-width values pile into a single oversized page (we've observed 2 GiB data pages and ~256x dictionary-page overshoot at default settings). Column-writer tests (`ColumnValueEncoderImpl` path): - large BYTE_ARRAY values cap data pages near one value each - large values inside a repeated/list column (record-boundary stepping) - nullable column (value vs level counting) - dictionary spill then plain-encode large values - large distinct values bound the dictionary page - FIXED_LEN_BYTE_ARRAY byte budget Arrow-writer tests (`ByteArrayEncoder` path, what real users hit): - large `Utf8` strings via `ArrowWriter` - mixed small/large strings round-trip bit-identically - large `Utf8View` strings - all-null string column stays correct The subsequent commits make each of these pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Make the mini-batch size byte-budget aware so the post-write page check
fires before a data page grows unbounded:
- New `ColumnValueEncoder::count_values_within_byte_budget{,_gather}`
trait methods (default `None` = "no estimate; stay batched"), with a
concrete impl on `ColumnValueEncoderImpl` driven by
`plain_encoded_byte_size`. Fixed-width physical types answer in one
division; only variable-width BYTE_ARRAY/FLBA walk values, exiting at
the first that overruns the budget.
- New `LevelDataRef::value_count` converts a chunk's level span into a
leaf-value count (O(1) for flat columns, def-level scan for
nullable/nested), with a unit test.
- New `ByteBudgetChunker` picks the largest sub-batch that fits one page
budget. For the common case (small or fixed-width values) it returns
the full chunk with no value inspection, so the hot path is unchanged.
- `write_batch_internal` consults the chunker per chunk and, only when a
chunk would overflow, routes through the new `write_granular_chunk`,
which sub-batches so the post-write check fires in time.
Repeated/nested columns step on record (rep == 0) boundaries so a
record never spans pages.
This makes the column-writer data-page, list, nullable and FLBA
regression tests pass. Dictionary-encoding columns are still left on the
batched path (the data page holds only small RLE indices) — bounding the
dictionary page is a separate commit, so the two dictionary tests and
the arrow `ByteArrayEncoder` tests do not pass yet.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Implement `ColumnValueEncoder::count_values_within_byte_budget_gather` for `ByteArrayEncoder`, the encoder real `ArrowWriter` users hit. This makes the page-size bound fire for arrow string/binary columns; the previous commit only wired the generic column-writer path. Makes the arrow-writer regression tests pass. The implementation stays off the hot path for small values via cheap O(1) upper bounds before any per-value walk: - Offset-backed arrays (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`): the span `offsets[last+1] - offsets[first]` bounds the chunk's payload in O(1); if it fits, every value fits. The span is exact even for nullable columns (skipped positions are nulls with zero offset delta), so sparse `indices` skip the per-value walk too. - View arrays (`Utf8View`/`BinaryView`): lengths live in the low 32 bits of each view word, so an O(1) `n * (max_value_len + 4)` bound skips the scan in the common case; otherwise scan lengths with no data-buffer deref. - Dictionary input: treated as always-fitting — dict-encoded arrow input implies values small enough to dedup, the opposite of the blob case this targets, and a per-key walk measurably regressed the bench. - FixedSizeBinary: falls through to the generic accessor walk. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
While a column dictionary-encodes, the data page holds only small RLE indices but the *dictionary* page accumulates the distinct values themselves, and its spill check runs only once per mini-batch. A mini-batch of large distinct values therefore interns `write_batch_size * value_size` bytes into the dictionary page before the check fires — ~256x the limit in the worst case. Extend `ByteBudgetChunker` to bound the dictionary-encoding phase too: when `has_dictionary()`, size the mini-batch against the dictionary page's *remaining* budget (`dict_page_byte_limit - estimated_dict_page_size`) rather than the data page. Fixed-width columns short-circuit via a precomputed `static_dict_always_fits`, so only large variable-width distinct values pay the per-value walk. Makes the two dictionary regression tests pass. `arrow_writer_layout`'s `test_string` updates accordingly: the dictionary page is now bounded at its 1000-byte limit and spills one mini-batch earlier (125 rows rather than 130) instead of overshooting to 1040. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The variable-width byte-budget walks returned the largest count whose cumulative encoded size was *under* the budget, so each mini-batch ended just short of the page threshold. When the input row batch did not divide evenly into mini-batches, the remainder rolled into the next page and produced a bimodal page-size pattern (e.g. 128B values, 64KB budget, 1024-row batches: 968 / 540 / 540 ... values per page). Return the boundary value's index + 1 instead, so the mini-batch crosses the threshold by exactly one value and the caller's page-flush check trips immediately, with no leftover sliver carried into the next page. The worst-case overshoot per page is one value's encoded size, which already matched the previous behavior whenever a single value alone exceeded the budget (the dropped .max(1) floor). Reported by Ed Seidel in apache#9972 review. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
79507a2 to
f8f2a52
Compare
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing parquet-page-size-mid-batch (f8f2a52) to e470187 (merge-base) diff File an issue against this benchmark runner |
The mini-batch byte-budget walk now includes the value that crosses the budget, so the dictionary in the spill sub-test fills at 126 rows (1008 bytes) instead of 125 rows (1000 bytes), and the downstream plain page picks up 1254 rows / 10032 bytes instead of 1255 / 10040. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
etseidl
left a comment
There was a problem hiding this comment.
Thanks @adriangb, I think this is a nice addition. It's a lot to go through, but it is well documented and pretty easy to follow. I'd like to get more eyes on this, however. Perhaps @alamb or @HippoBaro can take a look.
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
We write large values into our parquet files (e.g. a 5MB LLM prompt). A naive write will cause massive pages (we've seen up to 2GB) at default write settings. The main knob to control this is
write_batch_sizewhich defaults to 1024. But if each row is 5MB that's 5GB. On the other hand setting this to something small like 32 kills write performance and is completely unnecessary for other fixed width columns.The writer even documents this (
parquet/src/column/writer/mod.rs):This PR makes the mini-batch size byte-budget aware:
bytes_per_valuefrom the values about to be written and picksub_batch_size = page_byte_limit / bytes_per_value(clamped ≥ 1).sub_batch_size≥ chunk size, so we stay on the existing batched fast path with zero behavior change.Implementation notes
Skip the byte-size check while parquet dictionary encoding is active:
estimated_value_bytesreturns plain-encoded size but a dict-encoded data page only stores small RLE indices, so the estimate would spuriously shrink pages. Dict fallback bounds dict-encoded pages independently.For repeated/nested columns the sub-batch steps record-by-record (rep == 0 boundaries) so a record never spans data pages, matching the parquet format rule.
Regression test
test_column_writer_caps_page_size_for_large_byte_array_valueswrites 64 × 64 KiB BYTE_ARRAY values with a 16 KiB page byte limit. Before this fix that produced a single ~4 MiB page; after, it's one page per value (~64 pages, all within ~2× the value size).Bench results
5-run medians, criterion
arrow_writerbench, default writer properties, on a noisy laptop (run-to-run variance ~±1.6%):primitive/default(i32 25% null)primitive_non_null/defaultbool_non_null/defaultstring/defaultshort_string_non_null/default(new, 1M × 8 B)large_string_non_null/default(new, 1024 × 256 KiB)string_non_null/defaultstring_dictionary/defaultlist_primitive/defaultlist_primitive_non_null/default🤖 Generated with Claude Code