Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFamily;
Expand Down Expand Up @@ -675,25 +680,133 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

private static final String REVERSE_ROW_NUM = "__reverse_row_num__";
/**
* Backtrack through the RelNode tree to find the first Sort node with non-empty collation. Stops
* at blocking operators that break ordering:
*
* <ul>
* <li>Aggregate - aggregation destroys input ordering
* <li>BiRel - covers Join, Correlate, and other binary relations
* <li>SetOp - covers Union, Intersect, Except
* <li>Uncollect - unnesting operation that may change ordering
* <li>Project with window functions (RexOver) - ordering determined by window's ORDER BY
* </ul>
*
* @param node the starting RelNode to backtrack from
* @return the collation found, or null if no sort or blocking operator encountered
*/
private RelCollation backtrackForCollation(RelNode node) {
while (node != null) {
// Check for blocking operators that destroy collation
// BiRel covers Join, Correlate, and other binary relations
// SetOp covers Union, Intersect, Except
// Uncollect unnests arrays/multisets which may change ordering
if (node instanceof Aggregate
|| node instanceof BiRel
|| node instanceof SetOp
|| node instanceof Uncollect) {
return null;
}

// Project with window functions has ordering determined by the window's ORDER BY clause
// We should not destroy its output order by inserting a reversed sort
if (node instanceof LogicalProject && ((LogicalProject) node).containsOver()) {
return null;
}

// Check for Sort node with collation
if (node instanceof org.apache.calcite.rel.core.Sort) {
org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) node;
if (sort.getCollation() != null && !sort.getCollation().getFieldCollations().isEmpty()) {
return sort.getCollation();
}
}

// Continue to child node
if (node.getInputs().isEmpty()) {
break;
}
node = node.getInput(0);
}
return null;
}

/**
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
* with the reversed sort inserted right after the original sort.
*
* @param root the root of the tree to rebuild
* @param reversedCollation the reversed collation to insert
* @param context the Calcite plan context
* @return the rebuilt tree with reversed sort inserted
*/
private RelNode insertReversedSortInTree(
RelNode root, RelCollation reversedCollation, CalcitePlanContext context) {
return root.accept(
new org.apache.calcite.rel.RelHomogeneousShuttle() {
boolean sortFound = false;

@Override
public RelNode visit(RelNode other) {
// Check if this is a Sort node and we haven't inserted the reversed sort yet
if (!sortFound && other instanceof org.apache.calcite.rel.core.Sort) {
org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) other;
if (sort.getCollation() != null
&& !sort.getCollation().getFieldCollations().isEmpty()) {
// Found the sort node - insert reversed sort after it
sortFound = true;
// First visit the sort's children
RelNode visitedSort = super.visit(other);
// Create a new reversed sort on top of the original sort
return org.apache.calcite.rel.logical.LogicalSort.create(
visitedSort, reversedCollation, null, null);
}
}
// For all other nodes, continue traversal
return super.visit(other);
}
});
}

@Override
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
context.relBuilder.sort(reversedCollation);
} else {
// Collation not found on current node - try backtracking
RelNode currentNode = context.relBuilder.peek();
RelCollation backtrackCollation = backtrackForCollation(currentNode);

if (backtrackCollation != null && !backtrackCollation.getFieldCollations().isEmpty()) {
// Found collation through backtracking - rebuild tree with reversed sort
RelCollation reversedCollation = PlanUtils.reverseCollation(backtrackCollation);
RelNode rebuiltTree = insertReversedSortInTree(currentNode, reversedCollation, context);
// Replace the current node in the builder with the rebuilt tree
context.relBuilder.build(); // Pop the current node
context.relBuilder.push(rebuiltTree); // Push the rebuilt tree
} else {
// Check if @timestamp field exists in the row type
List<String> fieldNames = context.relBuilder.peek().getRowType().getFieldNames();
if (fieldNames.contains(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)) {
// If @timestamp exists, sort by it in descending order
context.relBuilder.sort(
context.relBuilder.desc(
context.relBuilder.field(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)));
}
// If neither collation nor @timestamp exists, ignore the reverse command (no-op)
}
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -596,6 +599,37 @@ public Void visitCorrelVariable(RexCorrelVariable correlVar) {
}
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection =
field.nullDirection == RelFieldCollation.NullDirection.FIRST
? RelFieldCollation.NullDirection.LAST
: field.nullDirection == RelFieldCollation.NullDirection.LAST
? RelFieldCollation.NullDirection.FIRST
: field.nullDirection;

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Expand Down
Loading
Loading