Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pinot-common/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ data: {
"PROPERTIES"
"TABLES"
"TABLE_TYPE"
# Pinot DDL (CREATE / SHOW / DROP MATERIALIZED VIEW(S)) keywords.
# MATERIALIZED is shared by every MV verb; VIEWS is the plural-form lexeme used by the
# Catalog-listing verb `SHOW MATERIALIZED VIEWS [FROM db]`. VIEW (singular) is a core
# Calcite keyword and is not redeclared here.
"MATERIALIZED"
"REFRESH"
"VIEWS"
# IF is not a SQL reserved word but Calcite core does not declare it as a token; we need it
# for DDL "IF [NOT] EXISTS" clauses.
"IF"
Expand Down Expand Up @@ -92,6 +99,9 @@ data: {
"TABLES"
"TABLE_TYPE"
"IF"
"MATERIALIZED"
"REFRESH"
"VIEWS"

# The following keywords are reserved in core Calcite,
# are reserved in some version of SQL,
Expand Down Expand Up @@ -590,8 +600,9 @@ data: {
statementParserMethods: [
"SqlInsertFromFile()"
"SqlPhysicalExplain()"
"SqlPinotCreateMaterializedView()"
"SqlPinotCreateTable()"
"SqlPinotDropTable()"
"SqlPinotDrop()"
"SqlPinotShow()"
]

Expand Down
193 changes: 178 additions & 15 deletions pinot-common/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,115 @@ SqlNode SqlPhysicalExplain() :
}
}

/// CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]name ( ... )
/// [ REFRESH [INTERVAL] EVERY <period> ]
/// [ PROPERTIES ( 'k' = 'v', ... ) ]
/// AS <query>
///
/// The optional REFRESH clause registers a per-table cron schedule for the MV
/// minion task. When omitted, the task scheduler picks up the MV under the
/// cluster-wide default cron (`controller.task.frequencyInSeconds` /
/// `controller.task.taskTypeFrequenciesInSeconds.MaterializedViewTask`).
///
/// The `AS <query>` clause accepts only SELECT/VALUES/WITH/UNION/etc. queries
/// — NOT DML (INSERT/UPDATE/DELETE/MERGE). We invoke `OrderedQueryOrExpr` with
/// `ACCEPT_QUERY` rather than `SqlQueryOrDml` (the default Calcite production
/// that also accepts DML) because a materialized view's body is always a
/// projection-style query whose output rows are persisted; routing a DML
/// statement here would either be silently ignored by the downstream MV
/// analyzer or fail with a non-actionable error well below the grammar.
SqlNode SqlPinotCreateMaterializedView() :
{
SqlParserPos pos;
SqlIdentifier name;
boolean ifNotExists = false;
// Column list is optional. When absent, the DDL compiler infers the columns from
// the AS <query> projection via SingleStageMaterializedViewSchemaInferer. We model
// the absent case as an empty SqlNodeList so downstream consumers can branch on
// emptiness rather than nullability — keeps the consumer's public API stable.
SqlNodeList columns = SqlNodeList.EMPTY;
SqlPinotRefreshClause refresh = null;
SqlNodeList properties = null;
SqlNode query;
}
{
<CREATE> { pos = getPos(); }
<MATERIALIZED> <VIEW>
[ LOOKAHEAD(3) <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
name = CompoundIdentifier()
[ LOOKAHEAD(2) columns = PinotColumnList() ]
[ refresh = PinotRefreshClause() ]
[ <PROPERTIES> properties = PinotPropertyList() ]
<AS> query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
return new SqlPinotCreateMaterializedView(pos, name, ifNotExists, columns, refresh,
properties, query);
}
}

SqlPinotRefreshClause PinotRefreshClause() :
{
SqlParserPos pos;
SqlLiteral refreshPeriod;
}
{
<REFRESH> { pos = getPos(); }
[ <INTERVAL> ]
<EVERY> refreshPeriod = PinotRefreshEveryPeriod()
{
return new SqlPinotRefreshClause(pos, refreshPeriod);
}
}

/// EVERY period. Two forms are accepted:
/// 1. Quoted Pinot period literal, e.g. `'1d'`, `'6h'`, `'15m'` — passes through verbatim.
/// 2. Sugared `N <UNIT>`, where `UNIT` is one of `MINUTE / MINUTES / HOUR / HOURS / DAY /
/// DAYS`, normalized to `Nm`, `Nh`, `Nd` respectively.
///
/// Common-but-unsupported units (`SECOND[S]`, `WEEK[S]`, `MONTH[S]`, `YEAR[S]`) are
/// explicitly matched and rejected here with a message listing the supported units, rather
/// than left to fall off the parser's default "Encountered ..." path which prints an empty
/// expected-tokens list and offers no hint about the supported set.
SqlLiteral PinotRefreshEveryPeriod() :
{
SqlLiteral n;
SqlParserPos pos;
String unitSuffix = null;
String unsupportedUnit = null;
}
{
(
n = NumericLiteral()
(
<DAY> { unitSuffix = "d"; pos = getPos(); }
| <DAYS> { unitSuffix = "d"; pos = getPos(); }
| <HOUR> { unitSuffix = "h"; pos = getPos(); }
| <HOURS> { unitSuffix = "h"; pos = getPos(); }
| <MINUTE> { unitSuffix = "m"; pos = getPos(); }
| <MINUTES> { unitSuffix = "m"; pos = getPos(); }
| <SECOND> { unsupportedUnit = "SECOND"; pos = getPos(); }
| <SECONDS> { unsupportedUnit = "SECONDS"; pos = getPos(); }
| <WEEK> { unsupportedUnit = "WEEK"; pos = getPos(); }
| <WEEKS> { unsupportedUnit = "WEEKS"; pos = getPos(); }
| <MONTH> { unsupportedUnit = "MONTH"; pos = getPos(); }
| <MONTHS> { unsupportedUnit = "MONTHS"; pos = getPos(); }
| <YEAR> { unsupportedUnit = "YEAR"; pos = getPos(); }
| <YEARS> { unsupportedUnit = "YEARS"; pos = getPos(); }
)
{
if (unsupportedUnit != null) {
throw new RuntimeException(
"REFRESH EVERY: unit '" + unsupportedUnit + "' is not supported. "
+ "Supported units: MINUTE / MINUTES, HOUR / HOURS, DAY / DAYS. "
+ "Alternatively use the quoted Pinot period form, e.g. '15m', '6h', '1d'.");
}
return SqlLiteral.createCharString(n.toValue() + unitSuffix, pos);
}
|
{ return (SqlLiteral) StringLiteral(); }
)
}

/// CREATE TABLE [IF NOT EXISTS] [db.]name (
/// col TYPE [NULL | NOT NULL] [DEFAULT literal]
/// [ DIMENSION | METRIC | DATETIME FORMAT 'fmt' GRANULARITY 'gran' ],
Expand Down Expand Up @@ -260,7 +369,18 @@ SqlPinotProperty PinotProperty() :
}

/// DROP TABLE [IF EXISTS] [db.]name [TYPE OFFLINE | REALTIME]
SqlNode SqlPinotDropTable() :
/// | DROP MATERIALIZED VIEW [IF EXISTS] [db.]name
///
/// Both branches share the leading `DROP` token; combining them into a single entry point keeps
/// the JavaCC choice unambiguous (no need for LOOKAHEAD across multiple statementParser methods
/// that all start with DROP).
///
/// `DROP MATERIALIZED VIEW` does NOT take a `TYPE` clause: an MV is always realized as an
/// OFFLINE physical table, so accepting `TYPE REALTIME` would be confusing and `TYPE OFFLINE`
/// would be redundant. The controller refuses the bare `DROP TABLE` form on an MV (see
/// `PinotDdlRestletResource#executeDrop`) and refuses the MV form on a plain table — the two
/// statements are strictly partitioned by underlying TableConfig shape (Q2=B contract).
SqlNode SqlPinotDrop() :
{
SqlParserPos pos;
SqlIdentifier name;
Expand All @@ -269,21 +389,49 @@ SqlNode SqlPinotDropTable() :
}
{
<DROP> { pos = getPos(); }
<TABLE>
[ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
name = CompoundIdentifier()
[ <TYPE> tableType = PinotTableTypeLiteral() ]
{
return new SqlPinotDropTable(pos, name, ifExists, tableType);
}
(
<TABLE>
[ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
name = CompoundIdentifier()
[ <TYPE> tableType = PinotTableTypeLiteral() ]
{
return new SqlPinotDropTable(pos, name, ifExists, tableType);
}
|
<MATERIALIZED> <VIEW>
[ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
name = CompoundIdentifier()
{
return new SqlPinotDropMaterializedView(pos, name, ifExists);
}
)
}

/// SHOW TABLES [FROM db]
/// | SHOW MATERIALIZED VIEWS [FROM db]
/// | SHOW CREATE TABLE [db.]name [TYPE OFFLINE | REALTIME]
/// | SHOW CREATE MATERIALIZED VIEW [db.]name
///
/// All four grammar branches share a leading `SHOW` token; combining them into a single entry
/// point keeps the JavaCC choice unambiguous (no need for LOOKAHEAD across multiple
/// statementParser methods that all start with SHOW).
///
/// Both grammar branches share a leading `SHOW` token; combining them into a single entry point
/// keeps the JavaCC choice unambiguous (no need for LOOKAHEAD across multiple statementParser
/// methods that all start with SHOW).
/// `SHOW CREATE MATERIALIZED VIEW` does NOT take a `TYPE` clause: an MV is always backed by an
/// OFFLINE table, so accepting `TYPE REALTIME` would be confusing and accepting `TYPE OFFLINE`
/// would be redundant. The controller refuses the bare `SHOW CREATE TABLE` form on an MV (see
/// `PinotDdlRestletResource#executeShowCreate`) — callers always use this dedicated form so the
/// reverse DDL output is unambiguous (`CREATE MATERIALIZED VIEW`, not `CREATE TABLE`).
///
/// `SHOW MATERIALIZED VIEWS` is the MV-listing peer of `SHOW TABLES`: it returns the raw names
/// (no `_OFFLINE` suffix) of every TableConfig in the resolved database whose
/// `isMaterializedView` flag is true (PR #18564 is the canonical MV identity contract).
/// Choosing the plural lexeme `VIEWS` keeps the form aligned with `SHOW TABLES` and matches
/// the established Snowflake-style catalog-listing convention; the resulting names are
/// directly reusable as input to `SHOW CREATE MATERIALIZED VIEW` / `DROP MATERIALIZED VIEW`.
/// `SHOW TABLES` and `SHOW MATERIALIZED VIEWS` are intentionally NOT mutually exclusive at the
/// listing level — an MV physically is an OFFLINE table, and a user who runs `SHOW TABLES`
/// today sees it. Separating the two views at the DDL surface lets callers pick the verb that
/// matches their intent without renaming what `SHOW TABLES` has always meant.
SqlNode SqlPinotShow() :
{
SqlParserPos pos;
Expand All @@ -300,11 +448,26 @@ SqlNode SqlPinotShow() :
return new SqlPinotShowTables(pos, database);
}
|
<CREATE> <TABLE>
name = CompoundIdentifier()
[ <TYPE> tableType = PinotTableTypeLiteral() ]
<MATERIALIZED> <VIEWS>
[ <FROM> database = SimpleIdentifier() ]
{
return new SqlPinotShowCreateTable(pos, name, tableType);
return new SqlPinotShowMaterializedViews(pos, database);
}
|
<CREATE>
(
<TABLE>
name = CompoundIdentifier()
[ <TYPE> tableType = PinotTableTypeLiteral() ]
{
return new SqlPinotShowCreateTable(pos, name, tableType);
}
|
<MATERIALIZED> <VIEW>
name = CompoundIdentifier()
{
return new SqlPinotShowCreateMaterializedView(pos, name);
}
)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.materializedview;

import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.data.FieldSpec.DataType;


/// Single source of truth for the aggregation functions an MV definedSQL is allowed to use,
/// and the {@link DataType} the MV's storage column for that aggregation MUST be declared as.
///
/// This catalog is consumed by:
///
/// - the MV analyzer (allow-list check at create / update time), and
/// - the MV schema inferer (picks the column DataType when the user omits explicit column
/// definitions in {@code CREATE MATERIALIZED VIEW}).
///
/// Both consumers must agree on the supported set, otherwise an MV that the inferer happily
/// builds could later be rejected by the analyzer (or, worse, accepted and then never picked
/// up by the rewrite engine because the MV column type doesn't match what the rewrite expects).
///
/// <h3>Why it lives in {@code pinot-common}</h3>
///
/// The schema inferer lives in {@code pinot-sql-ddl}, which must not depend on
/// {@code pinot-materialized-view} in production code. {@code pinot-common} is the
/// lowest-level module both consumers already depend on.
///
/// <h3>Sketch types: deliberate divergence from both engines' surface types</h3>
///
/// Both Pinot engines surface {@code STRING} for raw-sketch aggregations:
///
/// - The v1 (single-stage) engine's {@code DistinctCountRaw{HLL,HLLPlus,ThetaSketch}AggregationFunction}
/// return {@code STRING} from {@code getFinalResultColumnType()}.
/// - The v2 (multi-stage) engine's Calcite return-type inference (via
/// {@code PinotReturnTypeInferenceUtils} / {@code SqlReturnTypeInference}) likewise reports
/// {@code STRING} for these aggregations.
///
/// In both cases the {@code STRING} is the *display* type — the hex-encoded scalar response a
/// SQL client receives. It is not the type of the underlying sketch the MV needs to store.
///
/// For an MV column the right type is {@code BYTES}: that is the only shape the rewrite engine
/// can re-aggregate (each row's stored value is a serialized sketch that gets merged with the
/// running accumulator). Storing {@code STRING} would silently route the rewrite through the
/// "hash the literal" branch of {@code DistinctCountHLLAggregationFunction}, producing wrong
/// answers without any error.
///
/// This is the source of truth: the
/// {@code MaterializedViewSchemaInferer} consults this catalog rather than reading
/// the engine's surface type for each aggregation, so the inferer is correct even though the
/// MSE validator would naively report {@code STRING} for these three operators.
///
/// {@code MaterializedViewTaskExecutor#decodeBytesValue} already converts the incoming
/// hex-encoded string from the source query response into raw bytes when the destination
/// schema column is {@code BYTES}, so this divergence is end-to-end consistent — but it is
/// load-bearing and must not be "fixed" to match either engine's surface type.
///
/// The static regression test on this class pins both the supported-set and the type mapping
/// so that the divergence cannot be silently undone by a future refactor.
public final class MaterializedViewAggregationCatalog {

private MaterializedViewAggregationCatalog() {
}

/// Maps canonical (uppercase) aggregation operator name to the {@link DataType} an MV
/// schema column produced by that aggregation MUST be declared as, for MV ingestion AND
/// rewrite to remain semantically correct.
///
/// Iteration order is insertion order (see {@link Map#of} on Java 11+: no guarantee, but
/// callers should not rely on it). Use {@link #getSupportedOperators()} for the allow-list
/// check, {@link #getInferredDataType(String)} for the type lookup.
///
/// See the class javadoc for the BYTES-vs-STRING explanation on the sketch entries.
public static final Map<String, DataType> SUPPORTED_AGGREGATIONS = Map.of(
"SUM", DataType.DOUBLE,
"MIN", DataType.DOUBLE,
"MAX", DataType.DOUBLE,
"COUNT", DataType.LONG,
"DISTINCTCOUNTRAWHLL", DataType.BYTES,
"DISTINCTCOUNTRAWHLLPLUS", DataType.BYTES,
"DISTINCTCOUNTRAWTHETASKETCH", DataType.BYTES
);

/// Returns the canonicalised set of aggregation operator names this catalog supports.
/// Suitable as the allow-list for an MV analyzer's "is this aggregation re-aggregatable?"
/// check.
public static Set<String> getSupportedOperators() {
return SUPPORTED_AGGREGATIONS.keySet();
}

/// Returns true iff {@code operator} (case-insensitive) is one of the catalog's supported
/// aggregation function names.
public static boolean isSupported(String operator) {
if (operator == null) {
return false;
}
return SUPPORTED_AGGREGATIONS.containsKey(operator.toUpperCase(Locale.ROOT));
}

/// Returns the {@link DataType} an MV schema column produced by {@code operator} must be
/// declared as, or {@code null} if {@code operator} is not in the catalog.
///
/// Callers should typically pre-check membership with {@link #isSupported(String)} so the
/// "unsupported aggregation" error path can carry a clear, single-shot message rather than
/// a downstream {@link NullPointerException}.
public static DataType getInferredDataType(String operator) {
if (operator == null) {
return null;
}
return SUPPORTED_AGGREGATIONS.get(operator.toUpperCase(Locale.ROOT));
}
}
Loading
Loading