Skip to content

[SPARK-55551][SQL] Improve BroadcastHashJoinExec output partitioning#54335

Open
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-55551-improve-broadcasthashjoinexec-output-partitioning
Open

[SPARK-55551][SQL] Improve BroadcastHashJoinExec output partitioning#54335
peter-toth wants to merge 1 commit intoapache:masterfrom
peter-toth:SPARK-55551-improve-broadcasthashjoinexec-output-partitioning

Conversation

@peter-toth
Copy link
Contributor

What changes were proposed in this pull request?

This is a minor refector of BroadcastHashJoinExec.outputPartitioning to:

  • simlify the logic and
  • make it future proof by using Partitioning with Expression instead of HashPartitioningLike.

Why are the changes needed?

Code cleanup and add support for future partitionings that implement Expression but not HashPartitioningLike. (Like KeyedPartitioning is in #54330.)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

HashPartitioning(Seq(l3), 1)))),
right = DummySparkPlan())
expected = PartitioningCollection(Seq(
PartitioningCollection(Seq(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping a PartitioningCollection in a PartitioningCollection has no benefit.

@@ -96,28 +97,23 @@ case class BroadcastHashJoinExec private(
}

// Expands the given partitioning collection recursively.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revise this method description a little more according to your code change? The method is slightly changed to handle Partitioning instead of PartitionCollection.

// where the streamed keys are Seq("b", "c") and the build keys are Seq("x", "y"),
// the expanded partitioning will have the following expressions:
// Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x", "y").
// The expanded expressions are returned as PartitioningCollection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- // The expanded expressions are returned as PartitioningCollection.
+ // The expanded expressions are returned as Seq[Partitioning].

case other => other
val expandedPartitioning = expandOutputPartitioning(streamedPlan.outputPartitioning)
expandedPartitioning match {
case Nil => UnknownPartitioning(streamedPlan.outputPartitioning.numPartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic looks new to me. Is this an improvement?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring itself looks reasonable to me. I have a few comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants