Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,42 +156,12 @@ public class AmoroManagementConf {
.defaultValue(1000000)
.withDescription("The queue size of the executors of the external catalog explorer.");

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable synchronizing Hive tables.");

public static final ConfigOption<Integer> SYNC_HIVE_TABLES_THREAD_COUNT =
ConfigOptions.key("sync-hive-tables.thread-count")
.intType()
.defaultValue(10)
.withDescription("The number of threads used for synchronizing Hive tables.");

public static final ConfigOption<Integer> REFRESH_TABLES_THREAD_COUNT =
ConfigOptions.key("refresh-tables.thread-count")
.intType()
.defaultValue(10)
.withDescription("The number of threads used for refreshing tables.");

public static final ConfigOption<Boolean> AUTO_CREATE_TAGS_ENABLED =
ConfigOptions.key("auto-create-tags.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable creating tags.");

public static final ConfigOption<Integer> AUTO_CREATE_TAGS_THREAD_COUNT =
ConfigOptions.key("auto-create-tags.thread-count")
.intType()
.defaultValue(3)
.withDescription("The number of threads used for creating tags.");

public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for creating tags.");

public static final ConfigOption<Duration> REFRESH_TABLES_INTERVAL =
ConfigOptions.key("refresh-tables.interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ public static void validateConfig(Configurations configurations) {

validateThreadCount(configurations, AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);

if (configurations.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(configurations, AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
}

private static void validateThreadCount(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,7 @@ public void startOptimizingService() throws Exception {
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getProcessDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
tableManager.setTableService(tableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,75 @@
* limitations under the License.
*/

package org.apache.amoro.server.scheduler.inline;
package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.hive.utils.HiveMetaSynchronizer;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalProcess;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;

public class HiveCommitSyncExecutor extends PeriodicTableScheduler {
private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncExecutor.class);
/** Local table process for syncing Iceberg metadata to Hive. */
public class HiveCommitSyncProcess extends TableProcess implements LocalProcess {

// 10 minutes
private static final long INTERVAL = 10 * 60 * 1000L;
private static final Logger LOG = LoggerFactory.getLogger(HiveCommitSyncProcess.class);

public HiveCommitSyncExecutor(TableService tableService, int poolSize) {
super(tableService, poolSize);
}

@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
return INTERVAL;
public static void syncIcebergToHive(MixedTable mixedTable) {
HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable);
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return true;
public HiveCommitSyncProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
super(tableRuntime, engine);
}

@Override
protected long getExecutorDelay() {
return ThreadLocalRandom.current().nextLong(INTERVAL);
public String tag() {
return getAction().getName().toLowerCase();
}

@Override
protected void execute(TableRuntime tableRuntime) {
long startTime = System.currentTimeMillis();
public void run() {
ServerTableIdentifier tableIdentifier = tableRuntime.getTableIdentifier();
try {
MixedTable mixedTable = (MixedTable) loadTable(tableRuntime).originalTable();
AmoroTable<?> amoroTable = tableRuntime.loadTable();
MixedTable mixedTable = (MixedTable) amoroTable.originalTable();
if (!TableTypeUtil.isHive(mixedTable)) {
LOG.debug("{} is not a support hive table", tableIdentifier);
return;
}

LOG.info("{} start hive sync", tableIdentifier);
syncIcebergToHive(mixedTable);
} catch (Exception e) {
LOG.error("{} hive sync failed", tableIdentifier, e);
} finally {
LOG.info(
"{} hive sync finished, cost {}ms",
tableIdentifier,
System.currentTimeMillis() - startTime);
throw new RuntimeException(e);
}
}

public static void syncIcebergToHive(MixedTable mixedTable) {
HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable);
@Override
public Action getAction() {
return IcebergActions.SYNC_HIVE_TABLES;
}

@Override
public Map<String, String> getProcessParameters() {
return Maps.newHashMap();
}

@Override
public Map<String, String> getSummary() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ public class IcebergProcessFactory implements ProcessFactory {
public static final ConfigOption<Duration> DATA_EXPIRE_INTERVAL =
ConfigOptions.key("expire-data.interval").durationType().defaultValue(Duration.ofDays(1));

public static final ConfigOption<Boolean> AUTO_CREATE_TAGS_ENABLED =
ConfigOptions.key("auto-create-tags.enabled").booleanType().defaultValue(true);

public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.durationType()
.defaultValue(Duration.ofMinutes(1));

public static final ConfigOption<Boolean> SYNC_HIVE_TABLES_ENABLED =
ConfigOptions.key("sync-hive-tables.enabled").booleanType().defaultValue(false);

public static final ConfigOption<Duration> SYNC_HIVE_TABLES_INTERVAL =
ConfigOptions.key("sync-hive-tables.interval")
.durationType()
.defaultValue(Duration.ofMinutes(10));

private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap();
private final List<TableFormat> formats =
Expand Down Expand Up @@ -119,6 +135,10 @@ public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action)
return triggerCleanDanglingDelete(tableRuntime);
} else if (IcebergActions.EXPIRE_DATA.equals(action)) {
return triggerDataExpiring(tableRuntime);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return triggerAutoCreateTag(tableRuntime);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return triggerHiveCommitSync(tableRuntime);
}

return Optional.empty();
Expand All @@ -135,8 +155,7 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
+ action);
}

// SnapshotsExpiringProcess, OrphanFilesCleaningProcess, DanglingDeleteFilesCleaningProcess
// and DataExpiringProcess are stateless, idempotent one-shot local maintenance tasks
// The following processes are stateless, idempotent one-shot local maintenance tasks
// (no checkpoint), so recovery simply rebuilds the process so it can run again.
// The store/processId/tracking is owned by ProcessService.
if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
Expand All @@ -147,6 +166,10 @@ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
return new DanglingDeleteFilesCleaningProcess(tableRuntime, localEngine);
} else if (IcebergActions.EXPIRE_DATA.equals(action)) {
return new DataExpiringProcess(tableRuntime, localEngine);
} else if (IcebergActions.AUTO_CREATE_TAGS.equals(action)) {
return new TagsAutoCreatingProcess(tableRuntime, localEngine);
} else if (IcebergActions.SYNC_HIVE_TABLES.equals(action)) {
return new HiveCommitSyncProcess(tableRuntime, localEngine);
}

throw new RecoverProcessFailedException(
Expand Down Expand Up @@ -182,6 +205,18 @@ public void open(Map<String, String> properties) {
this.actions.put(
IcebergActions.EXPIRE_DATA, ProcessTriggerStrategy.triggerAtFixRate(interval));
}

if (configs.getBoolean(AUTO_CREATE_TAGS_ENABLED)) {
Duration interval = configs.getDuration(AUTO_CREATE_TAGS_INTERVAL);
this.actions.put(
IcebergActions.AUTO_CREATE_TAGS, ProcessTriggerStrategy.triggerAtFixRate(interval));
}

if (configs.getBoolean(SYNC_HIVE_TABLES_ENABLED)) {
Duration interval = configs.getDuration(SYNC_HIVE_TABLES_INTERVAL);
this.actions.put(
IcebergActions.SYNC_HIVE_TABLES, ProcessTriggerStrategy.triggerAtFixRate(interval));
}
}

private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
Expand Down Expand Up @@ -248,6 +283,24 @@ private Optional<TableProcess> triggerDataExpiring(TableRuntime tableRuntime) {
return Optional.of(new DataExpiringProcess(tableRuntime, localEngine));
}

private Optional<TableProcess> triggerAutoCreateTag(TableRuntime tableRuntime) {
if (localEngine == null
|| tableRuntime.getFormat() != TableFormat.ICEBERG
|| !tableRuntime.getTableConfiguration().getTagConfiguration().isAutoCreateTag()) {
return Optional.empty();
}

return Optional.of(new TagsAutoCreatingProcess(tableRuntime, localEngine));
}

private Optional<TableProcess> triggerHiveCommitSync(TableRuntime tableRuntime) {
if (localEngine == null) {
return Optional.empty();
}

return Optional.of(new HiveCommitSyncProcess(tableRuntime, localEngine));
}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalProcess;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainerFactory;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/** Local table process for auto-creating Iceberg tags. */
public class TagsAutoCreatingProcess extends TableProcess implements LocalProcess {

private static final Logger LOG = LoggerFactory.getLogger(TagsAutoCreatingProcess.class);

public TagsAutoCreatingProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
super(tableRuntime, engine);
}

@Override
public String tag() {
return getAction().getName().toLowerCase();
}

@Override
public void run() {
try {
AmoroTable<?> amoroTable = tableRuntime.loadTable();
TableMaintainer tableMaintainer = TableMaintainerFactory.create(amoroTable, tableRuntime);
tableMaintainer.autoCreateTags();
} catch (Throwable t) {
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
throw new RuntimeException(t);
}
}

@Override
public Action getAction() {
return IcebergActions.AUTO_CREATE_TAGS;
}

@Override
public Map<String, String> getProcessParameters() {
return Maps.newHashMap();
}

@Override
public Map<String, String> getSummary() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public class InlineTableExecutors {
private BlockerExpiringExecutor blockerExpiringExecutor;
private OptimizingCommitExecutor optimizingCommitExecutor;
private ProcessDataExpiringExecutor processDataExpiringExecutor;
private HiveCommitSyncExecutor hiveCommitSyncExecutor;
private TagsAutoCreatingExecutor tagsAutoCreatingExecutor;

public static InlineTableExecutors getInstance() {
return instance;
Expand Down Expand Up @@ -60,24 +58,12 @@ public void setup(TableService tableService, Configurations conf) {
new ProcessDataExpiringExecutor(
tableService, optimizingKeepTime, expireInterval, processKeepTime);
this.blockerExpiringExecutor = new BlockerExpiringExecutor(tableService);
if (conf.getBoolean(AmoroManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
this.hiveCommitSyncExecutor =
new HiveCommitSyncExecutor(
tableService, conf.getInteger(AmoroManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT));
}
this.tableRefreshingExecutor =
new TableRuntimeRefreshExecutor(
tableService,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
tableService,
conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT),
conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis());
}
}

public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
Expand All @@ -95,12 +81,4 @@ public OptimizingCommitExecutor getOptimizingCommitExecutor() {
public ProcessDataExpiringExecutor getProcessDataExpiringExecutor() {
return processDataExpiringExecutor;
}

public HiveCommitSyncExecutor getHiveCommitSyncExecutor() {
return hiveCommitSyncExecutor;
}

public TagsAutoCreatingExecutor getTagsAutoCreatingExecutor() {
return tagsAutoCreatingExecutor;
}
}
Loading
Loading