Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 56 additions & 27 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ This module provides a high-level integration layer for the Calcite-based query

## Overview

This module provides two primary components:
This module provides components organized into two main areas aligned with the [Unified Query API architecture](https://github.com/opensearch-project/sql/issues/4782):

### Unified Language Specification

- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation.
- **`UnifiedQueryTranspiler`**: Converts Calcite logical plans (`RelNode`) into SQL strings for various target databases using different SQL dialects.

Together, these components enable a complete workflow: parse PPL queries into logical plans, then transpile those plans into target database SQL.
### Unified Execution Runtime

- **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution.

Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation.

### Experimental API Design

Expand Down Expand Up @@ -59,40 +65,63 @@ UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
String sql = transpiler.toSql(plan);
```

### Complete Workflow Example
Supported SQL dialects include:
- `SparkSqlDialect.DEFAULT` - Apache Spark SQL
- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL
- `MysqlSqlDialect.DEFAULT` - MySQL
- And other Calcite-supported dialects

### UnifiedQueryCompiler

Combining all components to transpile PPL queries into target database SQL:
Use `UnifiedQueryCompiler` to compile Calcite logical plans into executable JDBC statements. This separates compilation from execution and returns standard JDBC types.

```java
// Step 1: Create reusable context (shared across components)
UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.build();
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);

// Step 2: Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
try (PreparedStatement statement = compiler.compile(plan)) {
ResultSet rs = statement.executeQuery();
while (rs.next()) {
// Standard JDBC ResultSet access
}
}
```

// Step 3: Plan PPL query into logical plan
RelNode plan = planner.plan("source = employees | where age > 30");
### Complete Workflow Examples

// Step 4: Create transpiler with target dialect
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
.dialect(SparkSqlDialect.DEFAULT)
.build();
Combining all components for a complete PPL query workflow:

// Step 5: Transpile to target SQL
String sparkSql = transpiler.toSql(plan);
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
```java
// Step 1: Create reusable context (shared across all components)
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.build()) {

// Step 2: Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);

// Step 3: Plan PPL query into logical plan
RelNode plan = planner.plan("source = employees | where age > 30");

// Option A: Transpile to target SQL
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
.dialect(SparkSqlDialect.DEFAULT)
.build();
String sparkSql = transpiler.toSql(plan);
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30

// Option B: Compile and execute directly
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);
try (PreparedStatement statement = compiler.compile(plan)) {
ResultSet rs = statement.executeQuery();
while (rs.next()) {
// Process results with standard JDBC
}
}
Comment on lines +114 to +121
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnifiedQueryCompiler.compile() compiles a RelNode into an executable query plan by leveraging Calcite’s Enumerable physical operators?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the same as current PPL Calcite logic in core module. Thanks!

}
```

Supported SQL dialects include:
- `SparkSqlDialect.DEFAULT` - Apache Spark SQL
- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL
- `MysqlSqlDialect.DEFAULT` - MySQL
- And other Calcite-supported dialects

## Development & Testing

A set of unit tests is provided to validate planner behavior.
Expand Down
5 changes: 5 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

plugins {
id 'java-library'
id 'java-test-fixtures'
id "io.freefair.lombok"
id 'jacoco'
id 'com.diffplug.spotless'
Expand All @@ -13,10 +14,14 @@ plugins {
dependencies {
api project(':ppl')

testImplementation testFixtures(project(':api'))
testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}"
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.41.0'

testFixturesApi group: 'junit', name: 'junit', version: '4.13.2'
testFixturesApi group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrest_version}"
}

spotless {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,26 @@
* enabling consistent behavior across all unified query operations.
*/
@Value
public class UnifiedQueryContext {
public class UnifiedQueryContext implements AutoCloseable {

/** CalcitePlanContext containing Calcite framework configuration and query type. */
CalcitePlanContext planContext;

/** Settings containing execution limits and feature flags used by parsers and planners. */
Settings settings;

/**
* Closes the underlying resource managed by this context.
*
* @throws Exception if an error occurs while closing the connection
*/
@Override
public void close() throws Exception {
if (planContext != null && planContext.connection != null) {
planContext.connection.close();
}
}

/** Creates a new builder for UnifiedQueryContext. */
public static Builder builder() {
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api.compiler;

import java.sql.Connection;
import java.sql.PreparedStatement;
import lombok.NonNull;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.tools.RelRunner;
import org.opensearch.sql.api.UnifiedQueryContext;

/**
* {@code UnifiedQueryCompiler} compiles Calcite logical plans ({@link RelNode}) into executable
* JDBC statements, separating query compilation from execution.
*/
public class UnifiedQueryCompiler {

/** Unified query context containing CalcitePlanContext with all configuration. */
private final UnifiedQueryContext context;

/**
* Constructs a UnifiedQueryCompiler with a unified query context.
*
* @param context the unified query context containing CalcitePlanContext
*/
public UnifiedQueryCompiler(UnifiedQueryContext context) {
this.context = context;
}

/**
* Compiles a Calcite logical plan into an executable {@link PreparedStatement}. Similar to {@code
* CalciteToolsHelper.OpenSearchRelRunners.run()} but does not close the connection, leaving
* resource management to {@link UnifiedQueryContext}.
*
* @param plan the logical plan to compile (must not be null)
* @return a compiled PreparedStatement ready for execution
* @throws IllegalStateException if compilation fails
*/
public PreparedStatement compile(@NonNull RelNode plan) {
try {
// Apply shuttle to convert LogicalTableScan to BindableTableScan
final RelHomogeneousShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
RelNode transformedPlan = plan.accept(shuttle);

Connection connection = context.getPlanContext().connection;
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(transformedPlan);
} catch (Exception e) {
throw new IllegalStateException("Failed to compile logical plan", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package org.opensearch.sql.api;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.opensearch.sql.common.setting.Settings.Key.*;

import org.junit.Test;
Expand Down Expand Up @@ -79,4 +81,19 @@ public void testInvalidDefaultNamespace() {
.defaultNamespace("nonexistent")
.build();
}

@Test
public void testContextClose() throws Exception {
// Create a separate context for this test to avoid affecting other tests
UnifiedQueryContext testContext =
UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("opensearch", testSchema)
.defaultNamespace("opensearch")
.build();

assertFalse(testContext.getPlanContext().connection.isClosed());
testContext.close();
assertTrue(testContext.getPlanContext().connection.isClosed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,37 @@

package org.opensearch.sql.api;

import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;

import java.util.List;
import java.util.Map;
import lombok.Builder;
import lombok.Singular;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.After;
import org.junit.Before;
import org.opensearch.sql.executor.QueryType;

/** Base class for unified query tests providing common test schema and utilities. */
public abstract class UnifiedQueryTestBase {

/** Default catalog name */
protected static final String DEFAULT_CATALOG = "catalog";

/** Test schema containing sample tables for testing */
protected AbstractSchema testSchema;

Expand All @@ -41,23 +58,74 @@ protected Map<String, Table> getTableMap() {
context =
UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", testSchema)
.catalog(DEFAULT_CATALOG, testSchema)
.build();
planner = new UnifiedQueryPlanner(context);
}

@After
public void tearDown() throws Exception {
if (context != null) {
context.close();
}
}

/** Creates employees table with sample data for testing */
protected Table createEmployeesTable() {
return new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.createStructType(
List.of(
typeFactory.createSqlType(SqlTypeName.INTEGER),
typeFactory.createSqlType(SqlTypeName.VARCHAR),
typeFactory.createSqlType(SqlTypeName.INTEGER),
typeFactory.createSqlType(SqlTypeName.VARCHAR)),
List.of("id", "name", "age", "department"));
}
};
return SimpleTable.builder()
.col("id", INTEGER)
.col("name", VARCHAR)
.col("age", INTEGER)
.col("department", VARCHAR)
.row(new Object[] {1, "Alice", 25, "Engineering"})
.row(new Object[] {2, "Bob", 35, "Sales"})
.row(new Object[] {3, "Charlie", 45, "Engineering"})
.row(new Object[] {4, "Diana", 28, "Marketing"})
.build();
}

/** Reusable scannable table with builder pattern for easy table creation */
@Builder
protected static class SimpleTable implements ScannableTable {
@Singular("col")
private final Map<String, SqlTypeName> schema;

@Singular private final List<Object[]> rows;

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = typeFactory.builder();
schema.forEach(builder::add);
return builder.build();
}

@Override
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(rows);
}

@Override
public Statistic getStatistic() {
return Statistics.UNKNOWN;
}

@Override
public Schema.TableType getJdbcTableType() {
return Schema.TableType.TABLE;
}

@Override
public boolean isRolledUp(String column) {
return false;
}

@Override
public boolean rolledUpColumnValidInsideAgg(
String column,
SqlCall call,
SqlNode parent,
org.apache.calcite.config.CalciteConnectionConfig config) {
return false;
}
}
}
Loading
Loading