feat: add fair_unified_task_shared memory pool to fix 2x memory allocation#3924
Open
andygrove wants to merge 5 commits intoapache:mainfrom
Open
feat: add fair_unified_task_shared memory pool to fix 2x memory allocation#3924andygrove wants to merge 5 commits intoapache:mainfrom
andygrove wants to merge 5 commits intoapache:mainfrom
Conversation
…ation When Comet executes a shuffle, it creates two separate native plans (the child plan and the shuffle writer plan) that run concurrently in a pipelined fashion. Previously, each plan got its own memory pool at the full per-task limit, effectively allowing 2x the intended memory to be consumed. The new `fair_unified_task_shared` pool type shares a single CometFairMemoryPool across all native plans within the same Spark task. This ensures the total memory stays within the per-task limit while dynamically distributing memory among operators based on how many register as memory consumers (e.g. if the child plan is a simple scan+filter, the shuffle writer gets 100% of the pool). This is now the default for off-heap mode. Closes apache#3921 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When using fair_unified_task_shared, multiple execution contexts on the same thread share a single Arc<dyn MemoryPool>. The tracing code was summing pool.reserved() for each registered context, double-counting the shared pool and reporting 2x the actual memory reservation. Deduplicate pools by Arc data pointer before summing so each underlying pool is only counted once.
Make fair_unified_task_shared opt-in rather than the default to simplify review. Update docs to reflect the new default.
Add context about how Comet creates two concurrent native plans per Spark task during shuffle and why this matters for pool selection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3921
Rationale for this change
When Comet executes a shuffle, it creates two separate native plans (the child plan and the shuffle writer plan) that run concurrently in a pipelined fashion. Previously, each plan got its own memory pool at the full per-task limit, effectively allowing 2x the intended memory to be consumed.
The new
fair_unified_task_sharedpool type shares a singleCometFairMemoryPoolacross all native plans within the same Spark task. This ensures the total memory stays within the per-task limit while dynamically distributing memory among operators based on how many register as memory consumers.This pool type is opt-in (not the default) to allow further testing and evaluation.
What changes are included in this PR?
fair_unified_task_sharedmemory pool type that shares a single pool across all native plans within the same Spark task, using a globalHashMap<task_attempt_id, PerTaskMemoryPool>with reference countingtotal_reserved_for_thread()andunregister_and_total()double-counted memory when multiple execution contexts shared the same poolArc. Deduplicates byArcdata pointer before summingreserved()fair_unifiedas the default;fair_unified_task_sharedis opt-in viaspark.comet.exec.memoryPoolHow are these changes tested?
Tested with TPC-H queries using tracing to compare jemalloc and pool-tracked memory between
fair_unifiedandfair_unified_task_shared. Confirmed that the tracing fix eliminates the false 2x reporting and that both pool types show equivalent actual memory usage.