Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ private ResultFetcher callAlterMaterializedTableSuspend(
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private CatalogMaterializedTable suspendContinuousRefreshJob(
private ResolvedCatalogMaterializedTable suspendContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable materializedTable) {
ResolvedCatalogMaterializedTable materializedTable) {
try {
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
Expand Down Expand Up @@ -371,7 +371,7 @@ private void suspendRefreshWorkflow(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable materializedTable) {
ResolvedCatalogMaterializedTable materializedTable) {
if (RefreshStatus.SUSPENDED == materializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
Expand Down Expand Up @@ -489,7 +489,7 @@ private void resumeRefreshWorkflow(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable catalogMaterializedTable,
ResolvedCatalogMaterializedTable catalogMaterializedTable,
Map<String, String> dynamicOptions) {
// Repeated resume refresh workflow is not supported
if (RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus()) {
Expand Down Expand Up @@ -577,7 +577,7 @@ private void executeContinuousRefreshJob(
operationExecutor,
handle,
materializedTableIdentifier,
catalogMaterializedTable,
resolveCatalogMaterializedTable(operationExecutor, catalogMaterializedTable),
RefreshStatus.ACTIVATED,
continuousRefreshHandler.asSummaryString(),
serializedBytes);
Expand Down Expand Up @@ -822,15 +822,15 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(

if (RefreshStatus.ACTIVATED == oldMaterializedTable.getRefreshStatus()) {
// 1. suspend the materialized table
CatalogMaterializedTable suspendMaterializedTable =
ResolvedCatalogMaterializedTable suspendMaterializedTable =
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, oldMaterializedTable);

// 2. alter materialized table schema & query definition
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(),
op.getTableChanges(),
oldTable -> op.getTableChanges(),
suspendMaterializedTable);
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
Expand All @@ -840,8 +840,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
executeContinuousRefreshJob(
operationExecutor,
handle,
alterMaterializedTableChangeOperation
.getMaterializedTableWithAppliedChanges(),
alterMaterializedTableChangeOperation.getNewTable(),
tableIdentifier,
Collections.emptyMap(),
Optional.empty());
Expand All @@ -851,7 +850,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
LOG.warn(
"Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.",
tableIdentifier,
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
op.getNewTable().getExpandedQuery(),
suspendMaterializedTable.getExpandedQuery(),
e);

Expand All @@ -874,8 +873,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
throw new SqlExecutionException(
String.format(
"Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.",
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
tableIdentifier),
op.getNewTable().getExpandedQuery(), tableIdentifier),
e);
}
} else if (RefreshStatus.SUSPENDED == oldMaterializedTable.getRefreshStatus()) {
Expand All @@ -889,7 +887,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(

AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, tableChanges, oldMaterializedTable);
tableIdentifier, oldTable -> tableChanges, oldMaterializedTable);

operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
Expand All @@ -904,11 +902,11 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
}

private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation(
CatalogMaterializedTable oldMaterializedTable,
ResolvedCatalogMaterializedTable oldMaterializedTable,
AlterMaterializedTableChangeOperation op) {

return new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(), List.of(), oldMaterializedTable);
op.getTableIdentifier(), oldTable -> List.of(), oldMaterializedTable);
}

private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
Expand Down Expand Up @@ -1158,11 +1156,20 @@ private ResolvedCatalogMaterializedTable getCatalogMaterializedTable(
return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
}

private CatalogMaterializedTable updateRefreshHandler(
private ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
OperationExecutor operationExecutor, CatalogMaterializedTable materializedTable) {
return operationExecutor
.getSessionContext()
.getSessionState()
.catalogManager
.resolveCatalogMaterializedTable(materializedTable);
}

private ResolvedCatalogMaterializedTable updateRefreshHandler(
OperationExecutor operationExecutor,
OperationHandle operationHandle,
ObjectIdentifier materializedTableIdentifier,
CatalogMaterializedTable catalogMaterializedTable,
ResolvedCatalogMaterializedTable catalogMaterializedTable,
RefreshStatus refreshStatus,
String refreshHandlerSummary,
byte[] serializedRefreshHandler) {
Expand All @@ -1172,12 +1179,15 @@ private CatalogMaterializedTable updateRefreshHandler(
TableChange.modifyRefreshHandler(refreshHandlerSummary, serializedRefreshHandler));
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
materializedTableIdentifier, tableChanges, catalogMaterializedTable);
materializedTableIdentifier,
oldTable -> tableChanges,
catalogMaterializedTable);
// update RefreshHandler to Catalog
operationExecutor.callExecutableOperation(
operationHandle, alterMaterializedTableChangeOperation);

return alterMaterializedTableChangeOperation.getMaterializedTableWithAppliedChanges();
return resolveCatalogMaterializedTable(
operationExecutor, alterMaterializedTableChangeOperation.getNewTable());
}

/** Generate insert statement for materialized table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.TableChange;

import java.util.List;
import java.util.function.Function;

/**
* Operation to describe an ALTER MATERIALIZED TABLE AS query operation. The operation is not
Expand All @@ -38,9 +39,9 @@ public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTab

public AlterMaterializedTableAsQueryOperation(
ObjectIdentifier tableIdentifier,
List<TableChange> tableChanges,
CatalogMaterializedTable oldTable) {
super(tableIdentifier, tableChanges, oldTable);
Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangesForTable,
ResolvedCatalogMaterializedTable oldTable) {
super(tableIdentifier, tableChangesForTable, oldTable);
}

@Override
Expand All @@ -53,7 +54,6 @@ public TableResultInternal execute(Context ctx) {
public String asSummaryString() {
return String.format(
"ALTER MATERIALIZED TABLE %s AS %s",
tableIdentifier.asSummaryString(),
getMaterializedTableWithAppliedChanges().getExpandedQuery());
tableIdentifier.asSummaryString(), getNewTable().getExpandedQuery());
}
}
Loading