Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,17 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
int pageNumber = (offset / limit) + 1;
List<TableProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
org.apache.amoro.Action action = null;
if (type != null && !type.isEmpty()) {
action =
org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName(
type);
}
final org.apache.amoro.Action finalAction = action;
processMetaList =
getAs(
TableProcessMapper.class,
mapper -> mapper.listProcessMeta(identifier.getId(), type, status));
mapper -> mapper.listProcessMeta(identifier.getId(), finalAction, status));
PageInfo<TableProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public TableOptimizingProcess(
OptimizingProcessState processState) {
this.tableRuntime = tableRuntime;
processId = tableRuntime.getProcessId();
optimizingType = OptimizingType.valueOf(processMeta.getProcessType());
optimizingType = getOptimizingTypeFromAction(processMeta.getAction());
targetSnapshotId = processState.getTargetSnapshotId();
targetChangeSnapshotId = processState.getTargetChangeSnapshotId();
planTime = processMeta.getCreateTime();
Expand Down Expand Up @@ -528,6 +528,53 @@ public OptimizingType getOptimizingType() {
return optimizingType;
}

/**
* Convert OptimizingType to corresponding Action.
*
* @param optimizingType optimizing type
* @return corresponding Action
*/
private org.apache.amoro.Action getOptimizingAction(OptimizingType optimizingType) {
switch (optimizingType) {
case MINOR:
return org.apache.amoro.IcebergActions.OPTIMIZING_MINOR;
case MAJOR:
return org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR;
case FULL:
return org.apache.amoro.IcebergActions.OPTIMIZING_FULL;
default:
throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType);
}
}

/**
* Convert Action to corresponding OptimizingType.
*
* @param action action
* @return corresponding OptimizingType
*/
private OptimizingType getOptimizingTypeFromAction(org.apache.amoro.Action action) {
if (action == null) {
throw new IllegalArgumentException("Action cannot be null");
}
String actionName = action.getName();
if (org.apache.amoro.IcebergActions.OPTIMIZING_MINOR.getName().equals(actionName)) {
return OptimizingType.MINOR;
} else if (org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR.getName().equals(actionName)) {
return OptimizingType.MAJOR;
} else if (org.apache.amoro.IcebergActions.OPTIMIZING_FULL.getName().equals(actionName)) {
return OptimizingType.FULL;
} else {
// Fallback to old behavior for backward compatibility
try {
return OptimizingType.valueOf(actionName.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Cannot convert action " + actionName + " to OptimizingType", e);
}
}
}

@Override
public ProcessStatus getStatus() {
return status;
Expand Down Expand Up @@ -838,7 +885,7 @@ private void beginAndPersistProcess() {
processId,
"",
status,
optimizingType.name().toUpperCase(),
getOptimizingAction(optimizingType),
tableRuntime.getOptimizingStatus().name().toLowerCase(),
"AMORO",
0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.persistence.converter;

import org.apache.amoro.Action;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.PaimonActions;
import org.apache.amoro.TableFormat;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;
import org.apache.ibatis.type.TypeHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* MyBatis TypeHandler for converting Action to/from String in database. This converter maintains a
* registry of known actions and can dynamically create temporary actions for unknown names to
* support backward compatibility.
*/
@MappedTypes(Action.class)
@MappedJdbcTypes(JdbcType.VARCHAR)
public class Action2StringConverter implements TypeHandler<Action> {

private static final Logger LOG = LoggerFactory.getLogger(Action2StringConverter.class);

/** Registry of all registered actions, keyed by action name. */
private static final Map<String, Action> ACTION_REGISTRY = new ConcurrentHashMap<>();

/** Default formats for dynamically created actions. */
private static final TableFormat[] DEFAULT_FORMATS =
new TableFormat[] {
TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.PAIMON
};

/** Static initialization block to register all built-in Iceberg and Paimon actions. */
static {
// Register Iceberg actions
registerAction(IcebergActions.SYSTEM);
registerAction(IcebergActions.REWRITE);
registerAction(IcebergActions.DELETE_ORPHANS);
registerAction(IcebergActions.SYNC_HIVE);
registerAction(IcebergActions.EXPIRE_DATA);
registerAction(IcebergActions.OPTIMIZING_MINOR);
registerAction(IcebergActions.OPTIMIZING_MAJOR);
registerAction(IcebergActions.OPTIMIZING_FULL);

// Register Paimon actions
registerAction(PaimonActions.COMPACT);
registerAction(PaimonActions.FULL_COMPACT);
registerAction(PaimonActions.CLEAN_METADATA);
registerAction(PaimonActions.DELETE_SNAPSHOTS);
}

/**
* Register an action in the registry.
*
* @param action the action to register
*/
public static void registerAction(Action action) {
if (action != null && action.getName() != null) {
ACTION_REGISTRY.put(action.getName(), action);
}
}

/**
* Register a custom action. This is a convenience method that delegates to {@link
* #registerAction(Action)}.
*
* @param action the custom action to register
*/
public static void registerCustomAction(Action action) {
registerAction(action);
}

/**
* Get an action by its name from the registry.
*
* @param name the action name to look up
* @return the registered action, or null if not found and name is null/empty
*/
public static Action getActionByName(String name) {
if (name == null || name.isEmpty()) {
return null;
}
return ACTION_REGISTRY.get(name);
}

/**
* Get all registered actions.
*
* @return array of all registered actions
*/
public static Action[] getRegisteredActions() {
return ACTION_REGISTRY.values().toArray(new Action[0]);
}

@Override
public void setParameter(PreparedStatement ps, int i, Action parameter, JdbcType jdbcType)
throws SQLException {
if (parameter == null) {
ps.setString(i, "");
} else {
ps.setString(i, parameter.getName());
}
}

@Override
public Action getResult(ResultSet rs, String columnName) throws SQLException {
String actionName = rs.getString(columnName);
return convertToAction(actionName);
}

@Override
public Action getResult(ResultSet rs, int columnIndex) throws SQLException {
String actionName = rs.getString(columnIndex);
return convertToAction(actionName);
}

@Override
public Action getResult(CallableStatement cs, int columnIndex) throws SQLException {
String actionName = cs.getString(columnIndex);
return convertToAction(actionName);
}

/**
* Convert a string action name to an Action object. First attempts to find the action in the
* registry. If not found, creates a temporary action with the given name for backward
* compatibility.
*
* @param actionName the action name to convert
* @return the corresponding Action object, or null if actionName is null/empty
*/
private Action convertToAction(String actionName) {
if (actionName == null || actionName.isEmpty()) {
return null;
}

Action action = ACTION_REGISTRY.get(actionName);
if (action != null) {
return action;
}

LOG.warn(
"Unknown action name '{}', creating temporary action for backward compatibility",
actionName);
Action tempAction = new Action(DEFAULT_FORMATS, 0, actionName);
registerAction(tempAction);
return tempAction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,21 @@

import java.util.Map;

/**
* Mapper for table_process_state table.
*
* @deprecated This mapper is deprecated as of AMORO-3951. Use {@link TableProcessMapper} instead.
* The table_process_state table has been merged into table_process.
*/
@Deprecated
public interface ProcessStateMapper {

/**
* Create a new process state.
*
* @deprecated Use {@link TableProcessMapper#insertProcess} instead.
*/
@Deprecated
@Insert(
"INSERT INTO table_process_state "
+ "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) "
Expand All @@ -40,24 +53,48 @@ public interface ProcessStateMapper {
@Options(useGeneratedKeys = true, keyProperty = "id")
void createProcessState(TableProcessState state);

/**
* Update process state to running.
*
* @deprecated Use {@link TableProcessMapper#updateProcess} instead.
*/
@Deprecated
@Update(
"UPDATE table_process_state "
+ "SET status = #{status}, start_time = #{startTime} "
+ "WHERE process_id = #{id} and retry_num = #{retryNumber}")
void updateProcessRunning(TableProcessState state);

/**
* Update process state to completed.
*
* @deprecated Use {@link TableProcessMapper#updateProcess} instead.
*/
@Deprecated
@Update(
"UPDATE table_process_state "
+ "SET status = #{status}, end_time = #{endTime} "
+ "WHERE process_id = #{id} and retry_num = #{retryNumber}")
void updateProcessCompleted(TableProcessState state);

/**
* Update process state to failed.
*
* @deprecated Use {@link TableProcessMapper#updateProcess} instead.
*/
@Deprecated
@Update(
"UPDATE table_process_state "
+ "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} "
+ "WHERE process_id = #{id} and retry_num = #{retryNumber}")
void updateProcessFailed(TableProcessState state);

/**
* Query TableProcessState by process_id.
*
* @deprecated Use {@link TableProcessMapper#getProcessMeta} instead.
*/
@Deprecated
@Select(
"SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary "
+ "FROM table_process_state "
Expand All @@ -77,7 +114,12 @@ public interface ProcessStateMapper {
})
TableProcessState getProcessStateById(@Param("processId") long processId);

/** Query TableProcessState by table_id */
/**
* Query TableProcessState by table_id.
*
* @deprecated Use {@link TableProcessMapper#listProcessMeta} instead.
*/
@Deprecated
@Select(
"SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary "
+ "FROM table_process_state "
Expand Down
Loading
Loading