-
Notifications
You must be signed in to change notification settings - Fork 180
Support enumerable TopK #4993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Support enumerable TopK #4993
Conversation
Signed-off-by: Lantao Jin <ltjin@amazon.com>
📝 WalkthroughSummary by CodeRabbitRelease Notes
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughThis PR introduces a TopK optimization that fuses Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java (1)
142-145: Address the TODO comment and add JavaDoc.The method logic is correct, but:
- The TODO comment "check on adding" is vague and unclear about what needs to be checked and when.
- Per coding guidelines, public methods should have proper JavaDoc documentation.
🔎 Proposed fix
- // TODO check on adding - public boolean containsDigestOnTop(Object digest) { + /** + * Check if the most recently added operation has a digest equal to the provided digest. + * + * @param digest The digest object to compare against the top operation + * @return true if the top operation's digest equals the provided digest, false otherwise + */ + public boolean containsDigestOnTop(Object digest) { return this.queue.peekLast() != null && this.queue.peekLast().digest().equals(digest); }opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.java (1)
20-21: Add class-level JavaDoc.Per coding guidelines, public classes should have proper JavaDoc. Consider expanding the single-line comment into a full JavaDoc explaining the TopK optimization purpose and when this node is used.
Suggested JavaDoc
-/** The different between this and {@link EnumerableLimitSort} is the cost. */ +/** + * A physical relational node representing a TopK operation with optimized cost computation. + * + * <p>This node extends {@link EnumerableLimitSort} but computes a lower cost to encourage + * the planner to prefer fusing separate Limit and Sort operators into this single TopK node, + * thereby reducing memory usage during execution. + */ public class CalciteEnumerableTopK extends EnumerableLimitSort {opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.java (1)
28-42: Add acopy()override toCalciteEnumerableTopKto preserve the custom type.When Calcite's planning engine clones relational nodes, the parent
EnumerableLimitSort.copy()method will be invoked. Without an override, this returns anEnumerableLimitSortinstance instead ofCalciteEnumerableTopK, losing the customcomputeSelfCost()implementation. Following the pattern used inLogicalSystemLimitand other custom Calcite nodes in this codebase, overridecopy()to return a newCalciteEnumerableTopKinstance with the updated parameters.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
integ-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yamlinteg-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.jsoninteg-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yamlopensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.javaopensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.javaopensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java
🧰 Additional context used
📓 Path-based instructions (2)
**/*.java
📄 CodeRabbit inference engine (.rules/REVIEW_GUIDELINES.md)
**/*.java: UsePascalCasefor class names (e.g.,QueryExecutor)
UsecamelCasefor method and variable names (e.g.,executeQuery)
UseUPPER_SNAKE_CASEfor constants (e.g.,MAX_RETRY_COUNT)
Keep methods under 20 lines with single responsibility
All public classes and methods must have proper JavaDoc
Use specific exception types with meaningful messages for error handling
PreferOptional<T>for nullable returns in Java
Avoid unnecessary object creation in loops
UseStringBuilderfor string concatenation in loops
Validate all user inputs, especially queries
Sanitize data before logging to prevent injection attacks
Use try-with-resources for proper resource cleanup in Java
Maintain Java 11 compatibility when possible for OpenSearch 2.x
Document Calcite-specific workarounds in code
Files:
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.javaopensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.java
⚙️ CodeRabbit configuration file
**/*.java: - Flag methods >50 lines as potentially too complex - suggest refactoring
- Flag classes >500 lines as needing organization review
- Check for dead code, unused imports, and unused variables
- Identify code reuse opportunities across similar implementations
- Assess holistic maintainability - is code easy to understand and modify?
- Flag code that appears AI-generated without sufficient human review
- Verify Java naming conventions (PascalCase for classes, camelCase for methods/variables)
- Check for proper JavaDoc on public classes and methods
- Flag redundant comments that restate obvious code
- Ensure proper error handling with specific exception types
- Check for Optional usage instead of null returns
- Validate proper use of try-with-resources for resource management
Files:
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.javaopensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.javaopensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.java
integ-test/src/test/resources/**/*
⚙️ CodeRabbit configuration file
integ-test/src/test/resources/**/*: - Verify test data is realistic and representative
- Check data format matches expected schema
- Ensure test data covers edge cases and boundary conditions
Files:
integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yamlinteg-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.jsoninteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: opensearch-project/sql PR: 0
File: .rules/REVIEW_GUIDELINES.md:0-0
Timestamp: 2025-12-02T17:27:55.938Z
Learning: Test SQL generation and optimization paths for Calcite integration changes
📚 Learning: 2025-12-02T17:27:55.938Z
Learnt from: CR
Repo: opensearch-project/sql PR: 0
File: .rules/REVIEW_GUIDELINES.md:0-0
Timestamp: 2025-12-02T17:27:55.938Z
Learning: Test SQL generation and optimization paths for Calcite integration changes
Applied to files:
integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yamlopensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.javainteg-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yamlinteg-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.jsoninteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yamlopensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java
📚 Learning: 2025-12-11T05:27:39.856Z
Learnt from: LantaoJin
Repo: opensearch-project/sql PR: 0
File: :0-0
Timestamp: 2025-12-11T05:27:39.856Z
Learning: In opensearch-project/sql, for SEMI and ANTI join types in CalciteRelNodeVisitor.java, the `max` option has no effect because these join types only use the left side to filter records based on the existence of matches in the right side. The join results are identical regardless of max value (max=1, max=2, or max=∞). The early return for SEMI/ANTI joins before processing the `max` option is intentional and correct behavior.
Applied to files:
integ-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yamlinteg-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.jsoninteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yamlinteg-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yaml
📚 Learning: 2025-12-02T17:27:55.938Z
Learnt from: CR
Repo: opensearch-project/sql PR: 0
File: .rules/REVIEW_GUIDELINES.md:0-0
Timestamp: 2025-12-02T17:27:55.938Z
Learning: Follow existing patterns in `CalciteRelNodeVisitor` and `CalciteRexNodeVisitor` for Calcite integration
Applied to files:
integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.json
🧬 Code graph analysis (1)
opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.java (2)
legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/Sort.java (1)
Sort(16-48)opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.java (1)
CalciteEnumerableTopK(21-54)
🔇 Additional comments (16)
integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml (1)
11-14: LGTM!The test expectation correctly reflects the Limit + Sort fusion into
EnumerableLimitSort. The sort key (sort0=[$5]), direction (dir0=[ASC]), and fetch limit (fetch=[10000]) are properly combined into a single operator, which aligns with the PR objective of reducing memory overhead by avoiding full in-memory sorts.opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java (1)
420-429: LGTM!The guard logic correctly prevents stack overflow by checking if an identical
LimitDigestalready exists at the top of the pushdown context. This is a sound fix for the recursive pushdown issue, and the precomputeddigestvariable is properly reused when adding to the context.integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml (1)
18-40: LGTM!The test expectation correctly reflects the plan restructuring:
EnumerableLimitSortfuses the limit and sort operationsEnumerableMergeJoinwith 3-key condition requires the corresponding 3-key sorts on both inputs- The inner branch projections (
[gender, age]) are appropriately minimized for the join/aggregation logicThe plan structure is consistent with the TopK optimization objective.
integ-test/src/test/resources/expectedOutput/calcite/explain_multisearch_timestamp.yaml (1)
16-27: LGTM!The test expectation correctly reflects the multisearch physical plan with
EnumerableMergeUnion. TheLIMIT->5appears twice in eachPushDownContext(before and afterPROJECT), which is consistent with limit pushdown occurring at multiple stages. The search filter terms (A,Bfor the first source;E,Ffor the second) are correctly represented in the query builders.integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml (1)
16-31: LGTM!The test expectation correctly reflects the TopK optimization with
EnumerableLimitSortfusing the limit and sort. TheEnumerableMergeJoinwith its required sorted inputs and the inner branch's optimized projections ([gender]and[gender, age]) are appropriately structured.integ-test/src/test/resources/expectedOutput/calcite/explain_limit_agg_pushdown5.json (1)
4-4: LGTM!The test expectation correctly shows
EnumerableLimitSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])replacing the previousEnumerableSort(fetch=[100]). This fusion is consistent with the PR objective of combining limit and sort into a single TopK operator for improved memory efficiency.integ-test/src/test/resources/expectedOutput/calcite/clickbench/q28.yaml (1)
13-15: LGTM!This is the key test expectation for the PR's main objective (issue #4982). The
EnumerableLimitSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[25])directly addresses the OOM issue by maintaining only the top 25 results in memory during sorting, rather than sorting all data first and then limiting. This pattern is correct and will prevent the memory exhaustion observed in Clickbench Q28.integ-test/src/test/resources/expectedOutput/calcite/clickbench/q29.yaml (1)
14-17: LGTM! Physical plan correctly reflects TopK fusion.The expected output properly shows the optimization: the inner
EnumerableLimit(fetch=[25])+EnumerableSortpattern is fused into a singleEnumerableLimitSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[25]), while preserving the outer system limit (EnumerableLimit(fetch=[10000])). This aligns with the PR objective to reduce memory usage by avoiding full in-memory sorts.opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java (2)
44-46: LGTM! Rule registration follows existing patterns.The new
ENUMERABLE_TOP_K_RULEfollows the established pattern for rule instantiation and naming conventions (UPPER_SNAKE_CASE for constants).
66-68: Verify rule ordering is intentional.The
ENUMERABLE_TOP_K_RULEis placed beforeEXPAND_COLLATION_ON_PROJECT_EXPR. Please confirm this ordering is intentional, as the TopK transformation should occur before collation expansion to ensure proper optimization.integ-test/src/test/resources/expectedOutput/calcite/explain_exists_correlated_subquery.yaml (1)
15-17: Verify semantic equivalence of operator reordering.The physical plan now has
EnumerableLimit(fetch=[10000])aboveEnumerableCalc, whereas the previous structure had them in reverse order. While this may be a valid optimization, please verify that limiting before calculating (projection) is semantically equivalent in this correlated subquery context—especially since the projection changes the column ordering (id, name, salaryfromname, id, salary).integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml (1)
11-14: LGTM! Correctly demonstrates Limit+Sort fusion.The expected output properly reflects the TopK optimization:
EnumerableLimit(fetch=[10000])+EnumerableSort(sort0=[$5], dir0=[ASC])are fused intoEnumerableLimitSort(sort0=[$5], dir0=[ASC], fetch=[10000]). The downstream window operators and index scan remain unchanged.integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml (1)
11-14: LGTM! Consistent with TopK fusion pattern.The expected output correctly shows the Limit+Sort operators fused into a single
EnumerableLimitSortwhile preserving downstream window operators and the index scan context.opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableTopKRule.java (2)
17-26: LGTM! Well-documented rule implementation.The class-level JavaDoc clearly explains the transformation purpose. The rule correctly extends
InterruptibleRelRuleand follows the established pattern in this codebase.
51-58: Rule predicate correctly identifies TopK candidates.The predicate ensures the rule only matches
LogicalSortnodes that have:
- A fetch or offset (making it a "top" operation)
- Non-empty field collations (actual ordering is specified)
This correctly filters for genuine TopK scenarios.
integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml (1)
11-15: LGTM! Consistent TopK fusion pattern.The expected output correctly demonstrates the Limit+Sort fusion into
EnumerableLimitSortwhile preserving the windowing and index scan operators downstream.
...arch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableTopK.java
Show resolved
Hide resolved
yuancu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Let's suspend on merging until #4992 merged |
| * CalciteEnumerableTopK}. | ||
| */ | ||
| @Value.Enclosing | ||
| public class EnumerableTopKRule extends InterruptibleRelRule<EnumerableTopKRule.Config> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need copy the license from Calcite?
Description
Support enumerable TopK, check #4982 for details.
The
CalciteEnumerableTopKis derived fromEnumerableLimitSortwhich has the corrected cost computation.Related Issues
Resolves #4982
Check List
--signoffor-s.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.