Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.exception.PersistenceException;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand All @@ -48,6 +56,12 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private static final long START_DELAY = 10 * 1000L;
private static final String CLEANUP_EXECUTION_ENGINE = "AMORO";
private static final String CLEANUP_PROCESS_STAGE = "CLEANUP";
private static final String EXTERNAL_PROCESS_IDENTIFIER = "";
private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator();

private final PersistenceHelper persistenceHelper = new PersistenceHelper();

protected final Set<ServerTableIdentifier> scheduledTables =
Collections.synchronizedSet(new HashSet<>());
Expand Down Expand Up @@ -123,16 +137,31 @@ private void scheduleTableExecution(TableRuntime tableRuntime, long delay) {
}

private void executeTask(TableRuntime tableRuntime) {
TableProcessMeta cleanupProcessMeta = null;
CleanupOperation cleanupOperation = null;
Exception executionError = null;
long cleanupEndTime = 0L;

try {
if (isExecutable(tableRuntime)) {
cleanupOperation = getCleanupOperation();
// create and persist cleanup process info
cleanupProcessMeta = createCleanupProcessInfo(tableRuntime, cleanupOperation);

execute(tableRuntime);

// Different tables take different amounts of time to execute the end of execute(),
// so you need to perform the update operation separately for each table.
persistUpdatingCleanupTime(tableRuntime);
cleanupEndTime = System.currentTimeMillis();
persistUpdatingCleanupTime(tableRuntime, cleanupEndTime);
}
} catch (Exception e) {
logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e);
executionError = e;
} finally {
// persist cleanup result info.
persistCleanupResult(
tableRuntime, cleanupOperation, cleanupProcessMeta, cleanupEndTime, executionError);
scheduledTables.remove(tableRuntime.getTableIdentifier());
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
}
Expand All @@ -156,14 +185,13 @@ protected boolean shouldExecute(Long lastCleanupEndTime) {
return true;
}

private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
private void persistUpdatingCleanupTime(TableRuntime tableRuntime, long currentTime) {
CleanupOperation cleanupOperation = getCleanupOperation();
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return;
}

try {
long currentTime = System.currentTimeMillis();
((DefaultTableRuntime) tableRuntime).updateLastCleanTime(cleanupOperation, currentTime);

logger.debug(
Expand All @@ -178,6 +206,125 @@ private void persistUpdatingCleanupTime(TableRuntime tableRuntime) {
}
}

@VisibleForTesting
public TableProcessMeta createCleanupProcessInfo(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
if (shouldSkipOperation(tableRuntime, cleanupOperation)) {
return null;
}

TableProcessMeta cleanupProcessMeta = buildCleanupProcessMeta(tableRuntime, cleanupOperation);
persistenceHelper.beginAndPersistCleanupProcess(cleanupProcessMeta);
logger.debug(
"Successfully persist cleanup process [processId={}, tableId={}, processType={}]",
cleanupProcessMeta.getProcessId(),
cleanupProcessMeta.getTableId(),
cleanupProcessMeta.getProcessType());

return cleanupProcessMeta;
}

private TableProcessMeta buildCleanupProcessMeta(
TableRuntime tableRuntime, CleanupOperation cleanupOperation) {
TableProcessMeta cleanupProcessMeta = new TableProcessMeta();

cleanupProcessMeta.setTableId(tableRuntime.getTableIdentifier().getId());
cleanupProcessMeta.setProcessId(ID_GENERATOR.generateId());
cleanupProcessMeta.setExternalProcessIdentifier(EXTERNAL_PROCESS_IDENTIFIER);
cleanupProcessMeta.setStatus(ProcessStatus.RUNNING);
cleanupProcessMeta.setProcessType(cleanupOperation.name());
cleanupProcessMeta.setProcessStage(CLEANUP_PROCESS_STAGE);
cleanupProcessMeta.setExecutionEngine(CLEANUP_EXECUTION_ENGINE);
cleanupProcessMeta.setRetryNumber(0);
cleanupProcessMeta.setFinishTime(0);
cleanupProcessMeta.setFailMessage("");
cleanupProcessMeta.setCreateTime(System.currentTimeMillis());
cleanupProcessMeta.setProcessParameters(new HashMap<>());
cleanupProcessMeta.setSummary(new HashMap<>());

return cleanupProcessMeta;
}

@VisibleForTesting
public void persistCleanupResult(
TableRuntime tableRuntime,
CleanupOperation cleanupOperation,
TableProcessMeta cleanupProcessMeta,
long cleanupEndTime,
Exception executionError) {

Comment thread
zhangwl9 marked this conversation as resolved.
if (cleanupOperation == null
|| cleanupProcessMeta == null
|| shouldSkipOperation(tableRuntime, cleanupOperation)) {
return;
}

cleanupProcessMeta.setFinishTime(cleanupEndTime);
if (executionError != null) {
cleanupProcessMeta.setStatus(ProcessStatus.FAILED);
cleanupProcessMeta.setFailMessage(executionError.getMessage());
} else {
Comment thread
zhangwl9 marked this conversation as resolved.
cleanupProcessMeta.setStatus(ProcessStatus.SUCCESS);
}

try {
persistenceHelper.updateAndPersistCleanupProcess(cleanupProcessMeta);
} catch (PersistenceException e) {
logger.error(
"Failed to persist cleanup process result [processId={}, tableId={}, processType={}]",
cleanupProcessMeta.getProcessId(),
cleanupProcessMeta.getTableId(),
cleanupProcessMeta.getProcessType(),
e);
}

logger.debug(
"Successfully updated lastCleanTime and cleanupProcess for table {} with processId={}, cleanup operation {}",
tableRuntime.getTableIdentifier().getTableName(),
cleanupProcessMeta.getProcessId(),
cleanupOperation);
}

private static class PersistenceHelper extends PersistentBase {

public PersistenceHelper() {}

private void beginAndPersistCleanupProcess(TableProcessMeta meta) {
doAs(
TableProcessMapper.class,
mapper ->
mapper.insertProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessType(),
meta.getProcessStage(),
meta.getExecutionEngine(),
meta.getRetryNumber(),
meta.getCreateTime(),
meta.getProcessParameters(),
meta.getSummary()));
}

private void updateAndPersistCleanupProcess(TableProcessMeta meta) {
doAs(
TableProcessMapper.class,
mapper ->
mapper.updateProcess(
meta.getTableId(),
meta.getProcessId(),
meta.getExternalProcessIdentifier(),
meta.getStatus(),
meta.getProcessStage(),
meta.getRetryNumber(),
meta.getFinishTime(),
meta.getFailMessage(),
meta.getProcessParameters(),
meta.getSummary()));
}
}

/**
* Get cleanup operation. Default is NONE, subclasses should override this method to provide
* specific operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.DefaultTableRuntimeStore;
import org.apache.amoro.server.table.TableRuntimeHandler;
import org.apache.amoro.server.table.cleanup.CleanupOperation;
import org.apache.amoro.table.TableRuntimeStore;
import org.apache.amoro.table.TableSummary;
Expand All @@ -48,6 +48,12 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
private static final String TEST_CATALOG = "test_catalog";
private static final String TEST_DB = "test_db";
private static final String TEST_TABLE = "test_table";
private static final List<CleanupOperation> CLEANUP_OPERATIONS =
Arrays.asList(
CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);

static {
try {
Expand All @@ -57,18 +63,6 @@ public class TestPeriodicTableSchedulerCleanup extends PersistentBase {
}
}

private static final TableRuntimeHandler TEST_HANDLER =
new TableRuntimeHandler() {
@Override
public void handleTableChanged(
TableRuntime tableRuntime,
org.apache.amoro.server.optimizing.OptimizingStatus originalStatus) {}

@Override
public void handleTableChanged(
TableRuntime tableRuntime, TableConfiguration originalConfig) {}
};

/**
* Create a test server table identifier with the given ID
*
Expand Down Expand Up @@ -103,7 +97,7 @@ private DefaultTableRuntime createDefaultTableRuntime(ServerTableIdentifier iden
return new DefaultTableRuntime(store, () -> null);
}

private void cleanUpTableRuntimeData(List<Long> tableIds) {
private void cleanupTableRuntimeData(List<Long> tableIds) {
doAs(
TableRuntimeMapper.class,
mapper -> {
Expand Down Expand Up @@ -135,7 +129,7 @@ private void cleanUpTableRuntimeData(List<Long> tableIds) {
* @param testTableIds list of table IDs to clean up
*/
private void prepareTestEnvironment(List<Long> testTableIds) {
cleanUpTableRuntimeData(testTableIds);
cleanupTableRuntimeData(testTableIds);
}

/**
Expand Down Expand Up @@ -165,16 +159,8 @@ private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation clean
*/
@Test
public void testShouldExecuteTaskWithNoPreviousCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);

for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
prepareTestEnvironment(testTableIds);
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
prepareTestEnvironment(Collections.singletonList(1L));

PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
ServerTableIdentifier identifier = createTableIdentifier(1L);
Expand All @@ -190,16 +176,8 @@ public void testShouldExecuteTaskWithNoPreviousCleanup() {
/** Test should not execute task with recent cleanup */
@Test
public void testShouldNotExecuteTaskWithRecentCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);

for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
cleanUpTableRuntimeData(testTableIds);
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
cleanupTableRuntimeData(Collections.singletonList(1L));

PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);

Expand All @@ -220,16 +198,8 @@ public void testShouldNotExecuteTaskWithRecentCleanup() {
/** Test should execute task with old cleanup */
@Test
public void testShouldExecuteTaskWithOldCleanup() {
List<CleanupOperation> operations =
Arrays.asList(
CleanupOperation.ORPHAN_FILES_CLEANING,
CleanupOperation.DANGLING_DELETE_FILES_CLEANING,
CleanupOperation.DATA_EXPIRING,
CleanupOperation.SNAPSHOTS_EXPIRING);

for (CleanupOperation operation : operations) {
List<Long> testTableIds = Collections.singletonList(1L);
cleanUpTableRuntimeData(testTableIds);
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
cleanupTableRuntimeData(Collections.singletonList(1L));

PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);

Expand Down Expand Up @@ -262,4 +232,51 @@ public void testShouldExecuteTaskWithNoneOperation() {
boolean shouldExecute = executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.NONE);
Assert.assertTrue("Should always execute with NONE operation", shouldExecute);
}

/** Test cleanup process info is persisted with SUCCESS status for each cleanup operation */
@Test
public void testCleanupProcessPersistedOnSuccess() {
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
assertCleanupProcessPersisted(operation, null, ProcessStatus.SUCCESS, null);
}
}

/** Test cleanup process info is persisted with FAILED status when execution fails */
@Test
public void testCleanupProcessPersistedOnFailure() {
for (CleanupOperation operation : CLEANUP_OPERATIONS) {
Exception error = new RuntimeException("Simulated cleanup failure for " + operation);
assertCleanupProcessPersisted(operation, error, ProcessStatus.FAILED, error.getMessage());
}
}

/** Assert cleanup process is persisted with expected status and failMessage */
private void assertCleanupProcessPersisted(
CleanupOperation operation,
Exception executionError,
ProcessStatus expectedStatus,
String expectedFailMessage) {
prepareTestEnvironment(Collections.singletonList(1L));

PeriodicTableSchedulerTestBase executor = createTestExecutor(operation);
DefaultTableRuntime tableRuntime = createDefaultTableRuntime(createTableIdentifier(1L));

// 1、Create cleanup process info and persist result
TableProcessMeta meta = executor.createCleanupProcessInfo(tableRuntime, operation);

// 2、Update cleanup result with execution outcome
long cleanupEndTime = System.currentTimeMillis();
executor.persistCleanupResult(tableRuntime, operation, meta, cleanupEndTime, executionError);

// 3、Verify persisted process info in database
TableProcessMeta persisted =
getAs(TableProcessMapper.class, mapper -> mapper.getProcessMeta(meta.getProcessId()));

Assert.assertEquals(expectedStatus, persisted.getStatus());
Assert.assertEquals(operation.name(), persisted.getProcessType());
Assert.assertEquals(cleanupEndTime, persisted.getFinishTime());
if (expectedFailMessage != null) {
Assert.assertEquals(expectedFailMessage, persisted.getFailMessage());
}
}
}
Loading