Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/analytics-engine-compat.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Analytics Engine Compatibility

on:
pull_request:
push:
branches-ignore:
- 'backport/**'
- 'dependabot/**'
paths:
- '**/*.java'
- '**gradle*'
- 'integ-test/**'
- '.github/workflows/analytics-engine-compat.yml'
merge_group:

jobs:
Get-CI-Image-Tag:
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
with:
product: opensearch

analytics-engine-compat:
needs: Get-CI-Image-Tag
runs-on: ubuntu-latest
container:
image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}

steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}

- uses: actions/checkout@v4

- name: Set up JDK 25
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 25

- name: Run analytics-engine compatibility smoke test
run: |
chown -R 1000:1000 `pwd`
su `id -un 1000` -c "./gradlew :integ-test:analyticsEngineCompatIT"
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.sql.api;

import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;

Expand Down Expand Up @@ -119,13 +121,31 @@ public static class Builder {
/**
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
* to avoid coupling with OpenSearchSettings.
*
* <p>{@link Settings.Key#CALCITE_ENGINE_ENABLED} defaults to {@code true} here because the
* unified query path is by definition Calcite-based — every query reaching this context flows
* through Calcite's planner, never the v2 engine. The PPL {@link
* org.opensearch.sql.api.parser.PPLQueryParser} reuses the v2 {@code AstBuilder}, which gates
* Calcite-only commands (e.g. {@code visitTableCommand}) on this setting; without the default,
* those commands fail at parse time even when the cluster setting is true.
*
* <p>{@link Settings.Key#PPL_REX_MAX_MATCH_LIMIT} defaults to {@code 10} here because {@code
* AstBuilder.visitRexCommand} reads it unconditionally and unboxes to {@code int} — a {@code
* null} return from {@code getSettingValue} NPEs the planner before any operator-level
* capability check runs. The value mirrors the cluster-side default of {@code 10} registered by
* {@code OpenSearchSettings.PPL_REX_MAX_MATCH_LIMIT_SETTING}. Cluster-side overrides reach this
* map via {@link #setting(String, Object)} — the REST handler reads the live value from {@code
* OpenSearchSettings} and routes it through that existing API, keeping {@link
* UnifiedQueryContext} decoupled from any specific {@link Settings} implementation.
*/
private final Map<Settings.Key, Object> settings =
new HashMap<Settings.Key, Object>(
Map.of(
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit()));
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit(),
CALCITE_ENGINE_ENABLED, true,
PPL_REX_MAX_MATCH_LIMIT, 10));

/**
* Sets the query language frontend to be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public void testContextCreationWithDefaults() {
"Settings should have default system limits",
SysLimit.DEFAULT,
SysLimit.fromSettings(context.getSettings()));
assertEquals(
"PPL_REX_MAX_MATCH_LIMIT default should be 10",
Integer.valueOf(10),
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}

@Test
Expand All @@ -43,10 +47,15 @@ public void testContextCreationWithCustomConfig() {
.catalog("opensearch", testSchema)
.cacheMetadata(true)
.setting("plugins.query.size_limit", 200)
.setting("plugins.ppl.rex.max_match.limit", 5)
.build();

Integer querySizeLimit = context.getSettings().getSettingValue(QUERY_SIZE_LIMIT);
assertEquals("Custom setting should be applied", Integer.valueOf(200), querySizeLimit);
assertEquals(
"Cluster-side override for PPL_REX_MAX_MATCH_LIMIT should reach the unified path",
Integer.valueOf(5),
context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}

@Test(expected = IllegalArgumentException.class)
Expand Down
7 changes: 7 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ plugins {
}

repositories {
mavenLocal()
mavenCentral()
}

Expand Down Expand Up @@ -63,6 +64,12 @@ dependencies {
}
api 'org.apache.calcite:calcite-linq4j:1.41.0'
api project(':common')
compileOnly 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
// Needed because analytics-api's QueryPlanExecutor signature uses
// org.opensearch.core.action.ActionListener; AnalyticsExecutionEngine references that type.
compileOnly group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
testImplementation 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
testImplementation group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
implementation "com.github.seancfoley:ipaddress:5.4.2"
implementation "com.jayway.jsonpath:json-path:2.9.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Locale;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.sql.SqlExplainLevel;

@RequiredArgsConstructor
public enum ExplainMode {
Expand All @@ -26,4 +27,13 @@ public static ExplainMode of(String mode) {
return ExplainMode.STANDARD;
}
}

/** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */
public SqlExplainLevel toExplainLevel() {
return switch (this) {
case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES;
case COST -> SqlExplainLevel.ALL_ATTRIBUTES;
default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1538,10 +1538,25 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
* count(a.b)] returns true.
*/
private boolean containsNestedAggregator(RelBuilder relBuilder, List<RexInputRef> aggCallRefs) {
// For each aggregator argument, take the part of its column name before the first dot
// (e.g. "city" from "city.location.latitude") and check whether that's a top-level
// ARRAY column — the marker for an OpenSearch `nested` field.
//
// The classic path always exposes a top-level column for object/nested parents. The
// analytics-engine path emits only the flat leaves ("city.name", "city.location.latitude")
// because parent placeholder types (MAP<VARCHAR, ANY>) can't round-trip through Substrait.
// RelDataType.getField returns null when the column doesn't exist — for analytics-engine,
// that null just means "not nested," which is the right answer.
RelDataType rowType = relBuilder.peek().getRowType();
return aggCallRefs.stream()
.map(r -> relBuilder.peek().getRowType().getFieldNames().get(r.getIndex()))
.map(r -> rowType.getFieldNames().get(r.getIndex()))
.map(name -> org.apache.commons.lang3.StringUtils.substringBefore(name, "."))
.anyMatch(root -> relBuilder.field(root).getType().getSqlTypeName() == SqlTypeName.ARRAY);
.anyMatch(
root -> {
RelDataTypeField field =
rowType.getField(root, /* caseSensitive= */ true, /* elideRecord= */ false);
return field != null && field.getType().getSqlTypeName() == SqlTypeName.ARRAY;
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.utils;

import java.util.concurrent.Callable;

/**
* Helper for setting the thread context classloader before Calcite operations. This is needed for
* patched Calcite (CALCITE-3745): when analytics-engine is the parent classloader, Janino uses the
* parent's classloader which can't see SQL plugin classes. The patched Calcite checks {@code
* Thread.currentThread().getContextClassLoader()} first. This helper sets it to the SQL plugin's
* classloader (child) which can see both parent and child classes.
*
* @see <a href="https://issues.apache.org/jira/browse/CALCITE-3745">CALCITE-3745</a>
* @see <a href="https://github.com/opensearch-project/sql/issues/5306">sql#5306</a>
*/
public final class CalciteClassLoaderHelper {

private CalciteClassLoaderHelper() {}

/**
* Run an action with the thread context classloader set to the caller's classloader.
*
* @param action the action to run
* @param callerClass the class whose classloader should be used (pass {@code MyClass.class})
* @param <T> the return type
* @return the result of the action
*/
public static <T> T withCalciteClassLoader(Callable<T> action, Class<?> callerClass) {
Thread currentThread = Thread.currentThread();
ClassLoader originalCl = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(callerClass.getClassLoader());
try {
return action.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
currentThread.setContextClassLoader(originalCl);
}
}

/**
* Run a void action with the thread context classloader set to the caller's classloader.
*
* @see #withCalciteClassLoader(Callable, Class)
*/
public static void withCalciteClassLoader(Runnable action, Class<?> callerClass) {
withCalciteClassLoader(
() -> {
action.run();
return null;
},
callerClass);
}
}
71 changes: 40 additions & 31 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper;
import org.opensearch.sql.common.error.ErrorReport;
import org.opensearch.sql.common.error.QueryProcessingStage;
import org.opensearch.sql.common.error.StageErrorHandler;
Expand Down Expand Up @@ -142,33 +143,37 @@ public void executeWithCalcite(
QueryProfiling.activate(QueryContext.isProfileEnabled());
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
long analyzeStart = System.nanoTime();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
CalciteClassLoaderHelper.withCalciteClassLoader(
() -> {
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);

context.setHighlightConfig(highlightConfig);
context.setHighlightConfig(highlightConfig);

// Wrap analyze with ANALYZING stage tracking
RelNode relNode =
StageErrorHandler.executeStage(
QueryProcessingStage.ANALYZING,
() -> analyze(plan, context),
"while preparing and validating the query plan");
// Wrap analyze with ANALYZING stage tracking
RelNode relNode =
StageErrorHandler.executeStage(
QueryProcessingStage.ANALYZING,
() -> analyze(plan, context),
"while preparing and validating the query plan");

// Wrap plan conversion with PLAN_CONVERSION stage tracking
RelNode calcitePlan =
StageErrorHandler.executeStage(
QueryProcessingStage.PLAN_CONVERSION,
() -> convertToCalcitePlan(relNode, context),
"while converting the query to an executable plan");
// Wrap plan conversion with PLAN_CONVERSION stage tracking
RelNode calcitePlan =
StageErrorHandler.executeStage(
QueryProcessingStage.PLAN_CONVERSION,
() -> convertToCalcitePlan(relNode, context),
"while converting the query to an executable plan");

analyzeMetric.set(System.nanoTime() - analyzeStart);
analyzeMetric.set(System.nanoTime() - analyzeStart);

// Wrap execution with EXECUTING stage tracking
StageErrorHandler.executeStageVoid(
QueryProcessingStage.EXECUTING,
() -> executionEngine.execute(calcitePlan, context, listener),
"while running the query");
// Wrap execution with EXECUTING stage tracking
StageErrorHandler.executeStageVoid(
QueryProcessingStage.EXECUTING,
() -> executionEngine.execute(calcitePlan, context, listener),
"while running the query");
},
QueryService.class);
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
Expand All @@ -191,17 +196,21 @@ public void explainWithCalcite(
() -> {
try {
QueryProfiling.noop();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
context.setHighlightConfig(highlightConfig);
context.run(
CalciteClassLoaderHelper.withCalciteClassLoader(
() -> {
RelNode relNode = analyze(plan, context);
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
executionEngine.explain(calcitePlan, mode, context, listener);
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
context.setHighlightConfig(highlightConfig);
context.run(
() -> {
RelNode relNode = analyze(plan, context);
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
executionEngine.explain(calcitePlan, mode, context, listener);
},
settings);
},
settings);
QueryService.class);
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t)) {
log.warn("Fallback to V2 query engine since got exception", t);
Expand Down
Loading
Loading