Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ public class ConfigOptions {
"The interval for cleaning up expired producer offsets "
+ "and orphan files in remote storage. Default is 1 hour.");

/** The interval at which the coordinator resumes in-progress table and partition deletions. */
public static final ConfigOption<Duration> COORDINATOR_RESUME_DELETION_INTERVAL =
key("coordinator.resume-deletion.interval")
.durationType()
.defaultValue(Duration.ofHours(6))
.withDescription(
"The interval at which the coordinator resumes in-progress table and "
+ "partition deletions via a periodic event. Default is 6 hours.");

// ------------------------------------------------------------------------
// ConfigOptions for Tablet Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.ResumeDeletionEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher;
Expand Down Expand Up @@ -122,6 +123,8 @@
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
import org.apache.fluss.utils.AutoPartitionStrategy;
import org.apache.fluss.utils.ExecutorUtils;
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
import org.apache.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand All @@ -142,6 +145,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
Expand Down Expand Up @@ -185,6 +191,8 @@ public class CoordinatorEventProcessor implements EventProcessor {
private final RebalanceManager rebalanceManager;
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
private final LakeTableHelper lakeTableHelper;
private final ScheduledExecutorService resumeDeletionScheduledExecutor;
private final long resumeDeletionIntervalMs;

public CoordinatorEventProcessor(
ZooKeeperClient zooKeeperClient,
Expand Down Expand Up @@ -253,6 +261,11 @@ public CoordinatorEventProcessor(
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
this.resumeDeletionScheduledExecutor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("resume-deletion-scheduler"));
this.resumeDeletionIntervalMs =
conf.get(ConfigOptions.COORDINATOR_RESUME_DELETION_INTERVAL).toMillis();
}

public CoordinatorEventManager getCoordinatorEventManager() {
Expand Down Expand Up @@ -302,9 +315,16 @@ public void startup() {

// start rebalance manager.
rebalanceManager.startup();

resumeDeletionScheduledExecutor.scheduleWithFixedDelay(
() -> coordinatorEventManager.put(new ResumeDeletionEvent()),
resumeDeletionIntervalMs,
resumeDeletionIntervalMs,
TimeUnit.MILLISECONDS);
}

public void shutdown() {
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, resumeDeletionScheduledExecutor);
// close the event manager
coordinatorEventManager.close();
rebalanceManager.close();
Expand Down Expand Up @@ -574,6 +594,8 @@ public void process(CoordinatorEvent event) {
processDropTable((DropTableEvent) event);
} else if (event instanceof DropPartitionEvent) {
processDropPartition((DropPartitionEvent) event);
} else if (event instanceof ResumeDeletionEvent) {
tableManager.resumeDeletions();
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
processSchemaChange(schemaChangeEvent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.event;

import org.apache.fluss.annotation.Internal;

/** An event to resume in-progress table and partition deletions. */
@Internal
public class ResumeDeletionEvent implements CoordinatorEvent {}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,38 @@ void testDropTableWithRetry() throws Exception {
() -> assertThat(zookeeperClient.getTableAssignment(t1Id)).isEmpty());
}

@Test
void testScheduledResumeDeletionResumesPendingDeletion() throws Exception {
initCoordinatorChannel();

// create a table
TablePath tablePath = TablePath.of(defaultDatabase, "resume_pending");
final long tableId =
createTable(
tablePath,
new TabletServerInfo[] {
new TabletServerInfo(0, "rack0"),
new TabletServerInfo(1, "rack1"),
new TabletServerInfo(2, "rack2")
});
retryVerifyContext(ctx -> assertThat(ctx.getTablePathById(tableId)).isNotNull());

// drop while the coordinator is down so the restart has a pending deletion to resume.
eventProcessor.shutdown();
metadataManager.dropTable(tablePath, false);

Configuration conf = new Configuration();
conf.set(ConfigOptions.COORDINATOR_RESUME_DELETION_INTERVAL, Duration.ofMillis(100));
eventProcessor = buildCoordinatorEventProcessor(conf);
initCoordinatorChannel();
// eventProcessor.startup() boots its internal scheduler, which triggers ResumeDeletionEvent
eventProcessor.startup();

retry(
Duration.ofMinutes(1),
() -> assertThat(zookeeperClient.getTableAssignment(tableId)).isEmpty());
}

@Test
void testServerBecomeOnlineAndOfflineLine() throws Exception {
// make sure all request to gateway should be successful
Expand Down Expand Up @@ -1169,6 +1201,10 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List<Integer> expecte
}

private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
return buildCoordinatorEventProcessor(new Configuration());
}

private CoordinatorEventProcessor buildCoordinatorEventProcessor(Configuration conf) {
return new CoordinatorEventProcessor(
zookeeperClient,
serverMetadataCache,
Expand All @@ -1177,7 +1213,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
autoPartitionManager,
lakeTableTieringManager,
TestingMetricGroups.COORDINATOR_METRICS,
new Configuration(),
conf,
Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")),
metadataManager,
kvSnapshotLeaseManager);
Expand Down