[SPARK-56395][SQL] Add NEAREST BY top-K ranking join (catalyst-side)#55681
Open
zhidongqu-db wants to merge 2 commits intoapache:branch-4.2from
Open
[SPARK-56395][SQL] Add NEAREST BY top-K ranking join (catalyst-side)#55681zhidongqu-db wants to merge 2 commits intoapache:branch-4.2from
zhidongqu-db wants to merge 2 commits intoapache:branch-4.2from
Conversation
### What changes were proposed in this pull request? This is the first of two PRs implementing https://issues.apache.org/jira/browse/SPARK-56395. It introduces the SQL grammar, logical plan, analyzer checks, and optimizer rewrite. The DataFrame / PySpark / Spark Connect API surface is split into a follow-up PR. **SQL syntax** ``` left_relation [ INNER | LEFT [ OUTER ] ] JOIN right_relation nearest_by_clause nearest_by_clause: { APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | SIMILARITY } ranking_expression ``` Only INNER (default) and LEFT OUTER join types are supported. num_results is a positive integer in [1, 100000], default 1. DISTANCE ranks smallest first; SIMILARITY ranks largest first. **Example:** ``` CREATE TEMP VIEW users(user_id, score) AS VALUES (1, 10.0), (2, 20.0), (3, 30.0); CREATE TEMP VIEW products(product, pscore) AS VALUES ('A', 11.0), ('B', 22.0), ('C', 5.0); SELECT u.user_id, p.product FROM users u JOIN products p APPROX NEAREST 2 BY DISTANCE abs(u.score - p.pscore); ``` **Parsed Plan** ``` 'Project ['u.user_id, 'p.product] +- 'NearestByJoin Inner, approx=true, k=2, direction=NearestByDistance, rank='abs('u.score - 'p.pscore) :- 'SubqueryAlias u : +- 'UnresolvedRelation [users] +- 'SubqueryAlias p +- 'UnresolvedRelation [products] ``` **Optimized Plan** ``` Project [user_id#1, product#3] +- Generate inline(__nearest_matches__#7), [product#3, pscore#4], outer=false +- Aggregate [__qid#5], [first(user_id#1) AS user_id#1, first(score#2) AS score#2, min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2) AS __nearest_matches__#7] +- Join LeftOuter :- Project [user_id#1, score#2, : monotonically_increasing_id() AS __qid#5] : +- LocalRelation [user_id#1, score#2] +- LocalRelation [product#3, pscore#4] ``` **Physical Plan** ``` *(3) Project [user_id#1, product#3] +- *(3) Generate inline(__nearest_matches__#7), [user_id#1, score#2], false, [product#3, pscore#4] +- ObjectHashAggregate(keys=[__qid#5], functions=[first(user_id#1), first(score#2), min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)]) +- Exchange hashpartitioning(__qid#5, 200) +- ObjectHashAggregate(keys=[__qid#5], functions=[partial_first(user_id#1), partial_first(score#2), partial_min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)]) +- BroadcastNestedLoopJoin BuildRight, LeftOuter :- *(1) Project [user_id#1, score#2, : monotonically_increasing_id() AS __qid#5] : +- LocalTableScan [user_id#1, score#2] +- BroadcastExchange IdentityBroadcastMode +- LocalTableScan [product#3, pscore#4] ``` ### Why are the changes needed Design and rationale: see the SPIP linked from https://issues.apache.org/jira/browse/SPARK-56395. ### Does this PR introduce _any_ user-facing change? Yes — new SQL syntax (NEAREST BY clause). Five new non-reserved keywords (APPROX, EXACT, NEAREST, DISTANCE, SIMILARITY) added to the grammar; existing queries are unaffected because they're non-reserved. New error class NEAREST_BY_JOIN. ### How was this patch tested? PlanParserSuite,RewriteNearestByJoinSuite,SQLQueryTestSuite,SparkConnectDatabaseMetaDataSuite,ThriftServerWithSparkContextSuite ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7), human-reviewed and tested Closes apache#55629 from dilipbiswal/SPARK-56395-CATALYST. Lead-authored-by: Dilip Biswal <dkbiswal@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
…sJoin.enabled is false ### What changes were proposed in this pull request? Followup of apache#55629, addressing the review comment at apache#55629 (comment). Adds a dedicated `CheckAnalysis` case that fails with `NEAREST_BY_JOIN.CROSS_JOIN_NOT_ENABLED` when a nearest-by join is attempted while `spark.sql.crossJoin.enabled = false`. Previously the query fell through to the generic cross-join check and produced a confusing, unrelated error. ### Why are the changes needed? The nearest-by join is internally implemented as a bounded cross-product. Without this guard a user gets the misleading `_LEGACY_ERROR_TEMP_1211` cross-join error rather than a clear message explaining the relationship to `spark.sql.crossJoin.enabled`. ### Does this PR introduce _any_ user-facing change? Yes. Users who run a nearest-by join with `spark.sql.crossJoin.enabled = false` now receive the structured error `NEAREST_BY_JOIN.CROSS_JOIN_NOT_ENABLED` with an actionable message instead of a generic cross-join error. ### How was this patch tested? - New unit test in `AnalysisErrorSuite` (`NearestByJoin is rejected when spark.sql.crossJoin.enabled is false`) - Updated SQL golden files (`join-nearest-by.sql.out`) regenerated to reflect the new error ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-sonnet-4-6) Closes apache#55688 from sezruby/56395_followup_error_msg. Lead-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> Co-authored-by: Eunjin Song <esong@adobe.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Merging #55629 and #55688 to spark branch 4.2
Why are the changes needed?
Supporting VECTOR SEARCH in Spark 4.2
Does this PR introduce any user-facing change?
Yes — new SQL syntax (NEAREST BY clause). Five new non-reserved keywords (APPROX, EXACT, NEAREST, DISTANCE, SIMILARITY) added to the grammar; existing queries are unaffected because they're non-reserved. New error class NEAREST_BY_JOIN.
How was this patch tested?
PlanParserSuite,RewriteNearestByJoinSuite,SQLQueryTestSuite,SparkConnectDatabaseMetaDataSuite,ThriftServerWithSparkContextSuite
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7), human-reviewed and tested