[FLINK-26051][table] Fix ROW_NUMBER with CASE WHEN and WHERE clause causing window ordering exception #27592
+129
−4
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request fixes FLINK-26051: when combining a
ROW_NUMBER()window function with a subsequent query containingCASE WHENexpressions and aWHEREclause, the stream planner throws "The window can only be ordered in ASCENDING mode."The root cause is a rule ordering problem in the stream planner's optimization phases:
FlinkCalcMergeRulemerges two Calc nodes — the inner ROW_NUMBER filter (WHERE row_num <= 2) and the outerCASE WHEN/WHERE— into a single complex Calc.FlinkLogicalRankRulelooks for the patternFlinkLogicalCalconFlinkLogicalOverAggregate. By this point, the merged Calc is too complex for the rule to match because the remaining predicates access the rank field, somatches()returns false.StreamExecOverAggregatewhich enforces ascending-only ordering, causing the error.The batch planner already handles this correctly by placing rank rules alongside
FlinkCalcMergeRulein the same Volcano phase, so the optimizer explores both alternatives.The fix adds a new Volcano-safe rank rule variant (
FlinkLogicalRankRuleForConstantRangeAllFunctions) toLOGICAL_RULESin the stream planner. This rule handles allSqlRankFunctiontypes (ROW_NUMBER, RANK, DENSE_RANK) with both constant and variable rank ranges. UnlikeFlinkLogicalRankRuleForRangeEnd, it does not throw exceptions inmatches()— it silently rejectsConstantRankRangeWithoutEnd(rank end not specified) rather than throwing, deferring that error toFlinkLogicalRankRuleForRangeEndin the later HEP phase where exceptions are properly surfaced to the user.Placing the rule in the same Volcano phase as
FlinkCalcMergeRuleallows the optimizer to explore the Rank conversion path before or as an alternative to merging Calcs. The existingFlinkLogicalRankRule.INSTANCEis kept inLOGICAL_REWRITEas a safety net.Brief change log
FlinkLogicalRankRuleForConstantRangeAllFunctions: a new rule class that accepts allSqlRankFunctiontypes with constant and variable rank ranges, safe for the Volcano optimizer (no exceptions inmatches(), silently rejectsConstantRankRangeWithoutEnd)FlinkLogicalRankRule.CONSTANT_RANGE_ALL_FUNCTIONS_INSTANCEtoLOGICAL_RULESinFlinkStreamRuleSets, along withCalcRankTransposeRuleandConstantRankNumberColumnRemoveRule, placed before the calc rules block — mirroring the batch planner patterntestRowNumberWithCaseWhenAndWhereClausein streamRankTesttestRankFunctionInMiddle(streamRankTest) andtestMultiSameRankFunctionsWithSameGroup(FlinkLogicalRankRuleForRangeEndTest) to reflect cosmetic field naming changes from earlier rank conversionVerifying this change
This change added a test and can be verified as follows:
testRowNumberWithCaseWhenAndWhereClausethat reproduces the exact bug scenario:ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC)withWHERE row_num <= 2, wrapped in a query withCASE WHEN c > 10 THEN 'big' ELSE 'small' ENDandWHERE b <> 'z'. The optimized plan correctly shows aRanknode instead ofOverAggregate.RankTest(46 tests), batchRankTest(16 tests),FlinkLogicalRankRuleForRangeEndTest(14 tests)Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation