Skip to content

[SPARK-56395][SQL] Add NEAREST BY top-K ranking join (catalyst-side)#55681

Open
zhidongqu-db wants to merge 2 commits intoapache:branch-4.2from
zhidongqu-db:nearest-join-14.2
Open

[SPARK-56395][SQL] Add NEAREST BY top-K ranking join (catalyst-side)#55681
zhidongqu-db wants to merge 2 commits intoapache:branch-4.2from
zhidongqu-db:nearest-join-14.2

Conversation

@zhidongqu-db
Copy link
Copy Markdown
Contributor

@zhidongqu-db zhidongqu-db commented May 5, 2026

What changes were proposed in this pull request?

Merging #55629 and #55688 to spark branch 4.2

Why are the changes needed?

Supporting VECTOR SEARCH in Spark 4.2

Does this PR introduce any user-facing change?

Yes — new SQL syntax (NEAREST BY clause). Five new non-reserved keywords (APPROX, EXACT, NEAREST, DISTANCE, SIMILARITY) added to the grammar; existing queries are unaffected because they're non-reserved. New error class NEAREST_BY_JOIN.

How was this patch tested?

PlanParserSuite,RewriteNearestByJoinSuite,SQLQueryTestSuite,SparkConnectDatabaseMetaDataSuite,ThriftServerWithSparkContextSuite

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.7), human-reviewed and tested

dilipbiswal and others added 2 commits May 5, 2026 05:39
### What changes were proposed in this pull request?
This is the first of two PRs implementing https://issues.apache.org/jira/browse/SPARK-56395. It introduces the SQL grammar, logical plan, analyzer checks, and optimizer rewrite. The DataFrame / PySpark / Spark Connect API surface is split into a follow-up PR.

**SQL syntax**
```
  left_relation [ INNER | LEFT [ OUTER ] ] JOIN right_relation nearest_by_clause
  nearest_by_clause:
    { APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | SIMILARITY } ranking_expression
```
Only INNER (default) and LEFT OUTER join types are supported. num_results is a positive integer in [1, 100000], default 1. DISTANCE ranks smallest first; SIMILARITY ranks largest first.

**Example:**
```
CREATE TEMP VIEW users(user_id, score)
    AS VALUES (1, 10.0), (2, 20.0), (3, 30.0);
  CREATE TEMP VIEW products(product, pscore)
    AS VALUES ('A', 11.0), ('B', 22.0), ('C', 5.0);

  SELECT u.user_id, p.product
  FROM users u JOIN products p
    APPROX NEAREST 2 BY DISTANCE abs(u.score - p.pscore);
```
**Parsed Plan**
```
'Project ['u.user_id, 'p.product]
  +- 'NearestByJoin Inner, approx=true, k=2, direction=NearestByDistance, rank='abs('u.score - 'p.pscore)
     :- 'SubqueryAlias u
     :  +- 'UnresolvedRelation [users]
     +- 'SubqueryAlias p
        +- 'UnresolvedRelation [products]
```
**Optimized Plan**
```
 Project [user_id#1, product#3]
  +- Generate inline(__nearest_matches__#7), [product#3, pscore#4], outer=false
     +- Aggregate [__qid#5], [first(user_id#1) AS user_id#1,  first(score#2)   AS score#2,  min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2) AS __nearest_matches__#7]
        +- Join LeftOuter
           :- Project [user_id#1, score#2,
           :           monotonically_increasing_id() AS __qid#5]
           :  +- LocalRelation [user_id#1, score#2]
           +- LocalRelation [product#3, pscore#4]
```
**Physical Plan**
```
*(3) Project [user_id#1, product#3]
  +- *(3) Generate inline(__nearest_matches__#7), [user_id#1, score#2], false, [product#3, pscore#4]
     +- ObjectHashAggregate(keys=[__qid#5], functions=[first(user_id#1), first(score#2),  min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)])
        +- Exchange hashpartitioning(__qid#5, 200)
           +- ObjectHashAggregate(keys=[__qid#5],
                functions=[partial_first(user_id#1), partial_first(score#2), partial_min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)])
              +- BroadcastNestedLoopJoin BuildRight, LeftOuter
                 :- *(1) Project [user_id#1, score#2,
                 :                monotonically_increasing_id() AS __qid#5]
                 :  +- LocalTableScan [user_id#1, score#2]
                 +- BroadcastExchange IdentityBroadcastMode
                    +- LocalTableScan [product#3, pscore#4]
```
### Why are the changes needed
Design and rationale: see the SPIP linked from https://issues.apache.org/jira/browse/SPARK-56395.
### Does this PR introduce _any_ user-facing change?
Yes — new SQL syntax (NEAREST BY clause). Five new non-reserved keywords (APPROX, EXACT, NEAREST, DISTANCE, SIMILARITY) added to the grammar; existing queries are unaffected because they're non-reserved. New error class NEAREST_BY_JOIN.

### How was this patch tested?
PlanParserSuite,RewriteNearestByJoinSuite,SQLQueryTestSuite,SparkConnectDatabaseMetaDataSuite,ThriftServerWithSparkContextSuite

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7), human-reviewed and tested

Closes apache#55629 from dilipbiswal/SPARK-56395-CATALYST.

Lead-authored-by: Dilip Biswal <dkbiswal@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
…sJoin.enabled is false

### What changes were proposed in this pull request?

Followup of apache#55629, addressing the review comment at apache#55629 (comment).

Adds a dedicated `CheckAnalysis` case that fails with `NEAREST_BY_JOIN.CROSS_JOIN_NOT_ENABLED` when a nearest-by join is attempted while `spark.sql.crossJoin.enabled = false`. Previously the query fell through to the generic cross-join check and produced a confusing, unrelated error.

### Why are the changes needed?

The nearest-by join is internally implemented as a bounded cross-product. Without this guard a user gets the misleading `_LEGACY_ERROR_TEMP_1211` cross-join error rather than a clear message explaining the relationship to `spark.sql.crossJoin.enabled`.

### Does this PR introduce _any_ user-facing change?

Yes. Users who run a nearest-by join with `spark.sql.crossJoin.enabled = false` now receive the structured error `NEAREST_BY_JOIN.CROSS_JOIN_NOT_ENABLED` with an actionable message instead of a generic cross-join error.

### How was this patch tested?

- New unit test in `AnalysisErrorSuite` (`NearestByJoin is rejected when spark.sql.crossJoin.enabled is false`)
- Updated SQL golden files (`join-nearest-by.sql.out`) regenerated to reflect the new error

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-6)

Closes apache#55688 from sezruby/56395_followup_error_msg.

Lead-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com>
Co-authored-by: Eunjin Song <esong@adobe.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants