[SPARK-55092][SQL] Disable partition grouping in KeyGroupedPartitioning when not needed#53859
Conversation
JIRA Issue Information=== Improvement SPARK-55092 === This comment was automatically generated by GitHub Actions |
04eed9a to
a0e1d99
Compare
KeyGroupedPartitioning don't group partitions when not neededKeyGroupedPartitioning don't group partitions when not needed
a0e1d99 to
1dc157a
Compare
KeyGroupedPartitioning don't group partitions when not neededKeyGroupedPartitioning when not needed
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
1dc157a to
f4fbeed
Compare
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
|
cc @szehon-ho , @sunchao, @viirya, @dongjoon-hyun |
|
Thank you for pinging me, @peter-toth . |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Although we need to handle copyFromTag independently, the proposal itself sounds reasonable to me. Do you think you can share some supporting performance numbers based on the existing benchmark or from your production environment?
Why are the changes needed?
Improve performance.
Numbers depend heavily on the usecase. In our case a customer would like to use SPJ, between table The optimization in this PR is similar to what |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @peter-toth .
BTW, this PR is not rebased to the master after merging #53884 . Did I understand correctly?
…s when not needed
…le partition grouping
f63091b to
4fd8026
Compare
You are right. I've just rebased it. The diff should be ok now. |
|
Thank you! |
|
@szehon-ho , @sunchao , @viirya , do you have any concerns or comments? |
| sparkSession: SparkSession, | ||
| adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, | ||
| subquery: Boolean): Seq[Rule[SparkPlan]] = { | ||
| val requiredDistribution = if (subquery) { |
There was a problem hiding this comment.
not sure i get this, if its not a subquery we pass in any requiredDistribution?
There was a problem hiding this comment.
Yeah, let me change this tomorrow and pass in subquery directly into EnsureRequirements, that way this will be much cleaner.
| val newChild = disableKeyGroupingIfNotNeeded(c) | ||
| ShuffleExchangeExec(newPartitioning, newChild, so, ps) | ||
| case _ => | ||
| val newChild = disableKeyGroupingIfNotNeeded(child) |
There was a problem hiding this comment.
could we make a method createShuffleExchangeExec(..., disableGrouping: Boolean) to reduce duplication?
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/StoragePartitionJoinParams.scala
Outdated
Show resolved
Hide resolved
|
@chirag-s-db if you also want to take a look? |
| } | ||
| } | ||
|
|
||
| private def populateNoGroupingPartitionInfo(plan: SparkPlan): SparkPlan = plan match { |
There was a problem hiding this comment.
This looks like can be done with transform api?
There was a problem hiding this comment.
Yes, I can change this to use transform() APIs.
Wanted to make it similar to the other 2 populate...() methods. Shall I change those as well?
There was a problem hiding this comment.
I modified all 3 populate...()s in b04bb61.
| child, values, joinKeyPositions, reducers, applyPartialClustering, replicatePartitions)) | ||
| } | ||
|
|
||
| private def disableKeyGroupingIfNotNeeded(child: SparkPlan) = { |
There was a problem hiding this comment.
More detailed comments on this method would be good, e.g., the conditions under which grouping can be safely disabled, etc.
| sparkSession: SparkSession, | ||
| adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, | ||
| subquery: Boolean): Seq[Rule[SparkPlan]] = { | ||
| val requiredDistribution = if (subquery) { |
There was a problem hiding this comment.
Can we add more detailed comments here? It looks confusing without any context when looking code here.
There was a problem hiding this comment.
I changed this and now passing in subquery and added comments in c28fc3f.
|
Could we also support the case where the KeyGroupedPartitioning is the output of the plan with the following approach?
One advantage of this approach is that it allows us to avoid grouping for the (presumably not uncommon) case of a simple scan from a partitioned table, and it should still be safe for checkpointed scans (as the checkpointed scans would have a FYI @szehon-ho |
My concern with this approach is that we can introduce an extra shuffle above the checkpointed (ungrouped) data. |
|
I really like @chirag-s-db 's idea, it is quite clean. Else we have to guard everywhere we make a Shuffle to disable it. But i also see the point about missing the opportunity to avoid shuffle for a checkpointed KeyGrouped RDD. Although i guess its not a very common case. So in short , no strong opinion either way. @sunchao @viirya wondering any opinion? |
|
Please note that not only checkpointed RDDs, but cached RDDs would also need an extra shuffle. Actually, I wonder if partition grouping by key is at the right place in I’m happy to put together a POC PR; just let me know. |
|
yea that sounds like it would be cleaner, and cover the checkpoint/ cache case. im not sure the detail about the disableGrouping by default will work, but curious to see, thanks @peter-toth |
|
Just a quick update, that I have opeped a draft PR: #54330 to implement the above idea and extract the partition grouping logic from |
What changes were proposed in this pull request?
Currently
KeyGroupedPartitioningalways groups partitions by key regardless if grouping is actually needed or not. This beahaviour decreases parallelism and can lead to slower performance.This PR disables parition grouping of a scan with
KeyGroupedPartitioningoutput partitioning if:We can't disable partition grouping of a scan in a main query if it contributes the ouput partitioning of the query result because we don't know whether the query is cached/checkpointed and how the output of the query will be used later. The output must keep
KeyGroupedPartitioningsemantics in this case.But we can disable partition grouping in subqueries when grouping is not needed for anything in the subquery plan. This is actually necessary to make sure broadcast exchange reuse happens correctly during dynamic partition pruning.
Why are the changes needed?
Improve performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.
Was this patch authored or co-authored using generative AI tooling?
No.