Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions core/src/main/java/org/opensearch/sql/ast/statement/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand All @@ -19,13 +18,41 @@
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;
private final QueryType queryType;

/** Pagination offset (0-based). Only used when fetchSize > 0. */
protected final int paginationOffset;

/**
* Constructor for backward compatibility (no pagination offset).
*
* @param plan the unresolved plan
* @param fetchSize page size (0 = no pagination)
* @param queryType the query type
*/
public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) {
this(plan, fetchSize, queryType, 0);
}

/**
* Constructor with pagination support.
*
* @param plan the unresolved plan
* @param fetchSize page size (0 = no pagination)
* @param queryType the query type
* @param paginationOffset offset for pagination (0-based)
*/
public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType, int paginationOffset) {
this.plan = plan;
this.fetchSize = fetchSize;
this.queryType = queryType;
this.paginationOffset = paginationOffset;
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
return visitor.visitQuery(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ public class CalcitePlanContext {
@Getter @Setter private boolean isResolvingSubquery = false;
@Getter @Setter private boolean inCoalesceFunction = false;

/** Pagination offset (0-based). */
@Getter @Setter private int paginationOffset = 0;

/** Pagination page size. 0 means pagination is disabled. */
@Getter @Setter private int paginationSize = 0;

/**
* Check if pagination is enabled.
*
* @return true if paginationSize > 0
*/
public boolean isPaginationEnabled() {
return paginationSize > 0;
}

/**
* The flag used to determine whether we do metadata field projection for user 1. If a project is
* never visited, we will do metadata field projection for user 2. Else not because user may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public enum SystemLimitType {
JOIN_SUBSEARCH_MAXOUT,
/** Max output to return from a subsearch. */
SUBSEARCH_MAXOUT,
/**
* Pagination limit type for API pagination support.
*
* <p>This type is used to apply LIMIT/OFFSET for paginating query results. Unlike
* QUERY_SIZE_LIMIT, pagination is applied as post-processing after the user's query executes.
*/
PAGINATION,
}

@Getter private final SystemLimitType type;
Expand Down
136 changes: 128 additions & 8 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.sql.analysis.AnalysisContext;
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
Expand Down Expand Up @@ -71,10 +72,33 @@ public void execute(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
execute(plan, queryType, listener, 0, 0);
}

/**
* Execute the {@link UnresolvedPlan} with pagination support.
*
* @param plan the unresolved plan
* @param queryType the query type
* @param listener the response listener
* @param pageSize page size (0 = no pagination)
* @param offset pagination offset (0-based)
*/
public void execute(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener,
int pageSize,
int offset) {
if (shouldUseCalcite(queryType)) {
executeWithCalcite(plan, queryType, listener);
executeWithCalcite(plan, queryType, listener, pageSize, offset);
} else {
executeWithLegacy(plan, queryType, listener, Optional.empty());
// For legacy path (SQL), wrap with Paginate when pagination is enabled
if (pageSize > 0) {
executeWithLegacy(new Paginate(pageSize, plan), queryType, listener, Optional.empty());
} else {
executeWithLegacy(plan, queryType, listener, Optional.empty());
}
}
}

Expand All @@ -91,16 +115,71 @@ public void explain(
}
}

/**
* Explain with pagination support.
*
* @param plan the unresolved plan
* @param queryType the query type
* @param listener the response listener
* @param format the explain format
* @param pageSize page size for pagination
* @param offset pagination offset (0-based)
*/
public void explain(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
Explain.ExplainFormat format,
int pageSize,
int offset) {
if (shouldUseCalcite(queryType)) {
explainWithCalcite(plan, queryType, listener, format, pageSize, offset);
} else {
// For legacy path (SQL), wrap with Paginate when pagination is enabled
if (pageSize > 0) {
explainWithLegacy(
new Paginate(pageSize, plan), queryType, listener, format, Optional.empty());
} else {
explainWithLegacy(plan, queryType, listener, format, Optional.empty());
}
}
}

public void executeWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executeWithCalcite(plan, queryType, listener, 0, 0);
}

/**
* Execute with Calcite engine and pagination support.
*
* @param plan the unresolved plan
* @param queryType the query type
* @param listener the response listener
* @param pageSize page size (0 = no pagination)
* @param offset pagination offset (0-based)
*/
public void executeWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener,
int pageSize,
int offset) {
CalcitePlanContext.run(
() -> {
try {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);

// Set pagination parameters if enabled
if (pageSize > 0) {
context.setPaginationSize(pageSize);
context.setPaginationOffset(offset);
}

RelNode relNode = analyze(plan, context);
relNode = mergeAdjacentFilters(relNode);
RelNode optimized = optimize(relNode, context);
Expand Down Expand Up @@ -134,12 +213,39 @@ public void explainWithCalcite(
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
Explain.ExplainFormat format) {
explainWithCalcite(plan, queryType, listener, format, 0, 0);
}

/**
* Explain with Calcite engine and pagination support.
*
* @param plan the unresolved plan
* @param queryType the query type
* @param listener the response listener
* @param format the explain format
* @param pageSize page size (0 = no pagination)
* @param offset pagination offset (0-based)
*/
public void explainWithCalcite(
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener,
Explain.ExplainFormat format,
int pageSize,
int offset) {
CalcitePlanContext.run(
() -> {
try {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);

// Set pagination parameters if enabled
if (pageSize > 0) {
context.setPaginationSize(pageSize);
context.setPaginationOffset(offset);
}

context.run(
() -> {
RelNode relNode = analyze(plan, context);
Expand Down Expand Up @@ -277,14 +383,28 @@ public PhysicalPlan plan(LogicalPlan plan) {
}

/**
* Try to optimize the plan by appending a limit operator for QUERY_SIZE_LIMIT Don't add for
* `EXPLAIN` to avoid changing its output plan.
* Try to optimize the plan by appending a limit operator for pagination or QUERY_SIZE_LIMIT. When
* pagination is enabled, use pagination LIMIT/OFFSET instead of system limit.
*
* <p>Pagination is applied AFTER the user's query executes completely. This ensures that
* user-specified commands like 'head X from Y' are respected first, and pagination only slices
* the final result.
*/
public RelNode optimize(RelNode plan, CalcitePlanContext context) {
return LogicalSystemLimit.create(
SystemLimitType.QUERY_SIZE_LIMIT,
plan,
context.relBuilder.literal(context.sysLimit.querySizeLimit()));
if (context.isPaginationEnabled()) {
// Apply pagination LIMIT/OFFSET on top of user's complete query result
return LogicalSystemLimit.create(
SystemLimitType.PAGINATION,
plan,
context.relBuilder.literal(context.getPaginationOffset()),
context.relBuilder.literal(context.getPaginationSize()));
} else {
// Apply system query size limit
return LogicalSystemLimit.create(
SystemLimitType.QUERY_SIZE_LIMIT,
plan,
context.relBuilder.literal(context.sysLimit.querySizeLimit()));
}
}

private boolean isCalciteFallbackAllowed(@Nullable Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
package org.opensearch.sql.executor.execution;

import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
Expand All @@ -29,6 +27,9 @@ public class QueryPlan extends AbstractPlan {

protected final Optional<Integer> pageSize;

/** Pagination offset (0-based). Only used when pageSize is present. */
protected final int paginationOffset;

/** Constructor. */
public QueryPlan(
QueryId queryId,
Expand All @@ -41,27 +42,42 @@ public QueryPlan(
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.empty();
this.paginationOffset = 0;
}

/** Constructor with page size (backward compatible, offset defaults to 0). */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
int pageSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
this(queryId, queryType, plan, pageSize, 0, queryService, listener);
}

/** Constructor with page size. */
/** Constructor with page size and offset for pagination. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
int pageSize,
int paginationOffset,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.of(pageSize);
this.paginationOffset = paginationOffset;
}

@Override
public void execute() {
if (pageSize.isPresent()) {
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener);
// Use new pagination with offset for Calcite path
queryService.execute(plan, getQueryType(), listener, pageSize.get(), paginationOffset);
} else {
queryService.execute(plan, getQueryType(), listener);
}
Expand All @@ -71,9 +87,8 @@ public void execute() {
public void explain(
ResponseListener<ExecutionEngine.ExplainResponse> listener, Explain.ExplainFormat format) {
if (pageSize.isPresent()) {
listener.onFailure(
new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
queryService.explain(
plan, getQueryType(), listener, format, pageSize.get(), paginationOffset);
} else {
queryService.explain(plan, getQueryType(), listener, format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,30 @@ public AbstractPlan visitQuery(
context) {
requireNonNull(context.getLeft(), "[BUG] query listener must be not null");
if (node.getFetchSize() > 0) {
if (canConvertToCursor(node.getPlan())) {
if (node.getQueryType() == QueryType.PPL) {
// PPL pagination - use pageSize and offset (Calcite path)
return new QueryPlan(
QueryId.queryId(),
node.getQueryType(),
node.getPlan(),
node.getFetchSize(),
node.getPaginationOffset(),
queryService,
context.getLeft());
} else {
// This should be picked up by the legacy engine.
throw new UnsupportedCursorRequestException();
// SQL pagination - use legacy cursor-based pagination
if (canConvertToCursor(node.getPlan())) {
return new QueryPlan(
QueryId.queryId(),
node.getQueryType(),
node.getPlan(),
node.getFetchSize(),
queryService,
context.getLeft());
} else {
// This should be picked up by the legacy engine.
throw new UnsupportedCursorRequestException();
}
}
} else {
return new QueryPlan(
Expand Down
Loading
Loading