Skip to content

Conversation

@smengcl
Copy link
Contributor

@smengcl smengcl commented Jan 28, 2026

What changes were proposed in this pull request?

This is the continuation of #9390

This addressed comment from @sumitagrawl (#9390 (comment)) which significantly reduces the number of files touched.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14020

How was this patch tested?

  • Existing tests

swamirishi and others added 4 commits November 28, 2025 19:39
…rvice

Change-Id: I4c1a051e8574d32375cbebeb10546563f4a4f817
Change-Id: Ie2118356f902443a93fe666d890ad4d59e9dd467
…omment: apache#9390 (comment)

Refactor BackgroundTask to use wrapper pattern for ForkJoinPool integration

This commit introduces BackgroundTaskForkJoin as a wrapper class to
integrate BackgroundTask with ForkJoinPool, avoiding the need to change
all service implementations from 'implements' to 'extends'.

Key changes:
- Reverted BackgroundTask from abstract class back to interface
- Created BackgroundTaskForkJoin wrapper extending RecursiveTask
- Updated BackgroundService to wrap tasks before forking
- Reverted all service task classes to 'implements BackgroundTask'
@smengcl
Copy link
Contributor Author

smengcl commented Jan 28, 2026

There are some issues after rebasing. Let me fix it.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Ozone background services to run tasks via a ForkJoinPool (instead of a ScheduledThreadPoolExecutor) and updates directory deletion to support fork/join-style parallelism, targeting shutdown deadlock avoidance (HDDS-14020).

Changes:

  • Reworked BackgroundService scheduling/execution to use a ForkJoinPool plus a shared ScheduledExecutorService.
  • Updated DirectoryDeletingService to optionally fork internal deletion work and adjusted related tests.
  • Introduced fork/join wrappers and minor task-wrapping/refactoring in deleting services and SST filtering.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.

Show a summary per file
File Description
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java Core switch to ForkJoin-based execution and custom periodic scheduling.
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java New ForkJoin wrapper for BackgroundTask execution.
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java New shared scheduler supplier for periodic rescheduling.
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java Uses fork/join-style parallelism instead of internal executors; adds allowTasksToFork plumbing.
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java Refactors task wrapping into a reusable BackgroundDeleteTask.
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java Adjusts call() signature and adds interrupt handling.
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java Updates task construction for the new DirDeletingTask signature.
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java Updates PeriodicalTask construction to match new signature.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After catching InterruptedException, the code re-interrupts the thread but continues processing subsequent snapshots. Typically interruption should stop the task (e.g., break the loop / return) to allow the service to shut down promptly and avoid doing more work on an interrupted thread.

Suggested change
LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e);
LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e);
break;

Copilot uses AI. Check for mistakes.
Comment on lines +242 to +243
scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)),
intervalInMillis, TimeUnit.MILLISECONDS);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduled runnable in scheduleNextTask() calls exec.submit(...) without re-checking shutdown state or whether exec is still non-null. Since shutdown() sets exec = null and the shared scheduler uses shutdown() (not shutdownNow()), already-scheduled tasks can still execute after shutdown and hit NPE / RejectedExecutionException. Guard inside the scheduled runnable (check shutdown flag + exec != null) and/or cancel scheduled tasks on shutdown.

Suggested change
scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)),
intervalInMillis, TimeUnit.MILLISECONDS);
scheduledExecuterService.schedule(() -> {
if (!isShutdown.get() && exec != null) {
exec.submit(new PeriodicalTask(this));
}
}, intervalInMillis, TimeUnit.MILLISECONDS);

Copilot uses AI. Check for mistakes.
Comment on lines +311 to +312
exec = new ForkJoinPool(threadPoolSize, factory, null, false);
isShutdown = new AtomicReference<>(false);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background tasks frequently do blocking I/O (eg OM Ratis submitRequest, RocksDB calls). Using a ForkJoinPool for blocking work can reduce parallelism and hurt throughput unless blocking sections use ForkJoinPool.managedBlock(...) (or a dedicated blocking pool is used). Consider addressing blocking sections or documenting why ForkJoinPool is safe here.

Copilot uses AI. Check for mistakes.
Comment on lines +44 to +47
public static synchronized UncheckedAutoCloseableSupplier<ScheduledExecutorService> get() {
if (executor == null) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) -> {
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BackgroundServiceScheduler uses the default thread factory for ScheduledThreadPoolExecutor, which creates non-daemon threads. Since this is a shared background scheduler, a non-daemon thread can keep the JVM alive if something forgets to close/release it. Consider using a daemon thread factory (and naming the thread) for the scheduler.

Copilot uses AI. Check for mistakes.
Comment on lines +29 to +33
public class BackgroundTaskForkJoin extends RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> {
private static final long serialVersionUID = 1L;
private final transient BackgroundTask backgroundTask;

public BackgroundTaskForkJoin(BackgroundTask backgroundTask) {
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is added under the hdds-common module but depends on BackgroundTask/BackgroundTaskResult, which are defined in hdds-server-framework (and hdds-common does not depend on it). Since hdds-server-framework already depends on hdds-common, adding the reverse dependency would create a cycle; as-is, this likely won’t compile. Consider moving this class into hdds-server-framework, or into a new shared module that both can depend on.

Copilot uses AI. Check for mistakes.
}, exec).exceptionally(e -> null), (Void1, Void) -> null);
Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> {
task.fork();
tasksInFlight.offer(task);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method accept ignores exceptional return value of Queue.offer.

Suggested change
tasksInFlight.offer(task);
if (!tasksInFlight.offer(task)) {
LOG.error("Failed to enqueue background task for service {}. Task will not be tracked.", serviceName);
}

Copilot uses AI. Check for mistakes.
}
};
task.fork();
recursiveTasks.offer(task);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method processDeletedDirsForStore ignores exceptional return value of Queue<RecursiveTask>.offer.

Suggested change
recursiveTasks.offer(task);
if (!recursiveTasks.offer(task)) {
// If the task cannot be enqueued, ensure it is joined and
// mark that not all deleted directories were processed.
task.join();
processedAllDeletedDirs = false;
break;
}

Copilot uses AI. Check for mistakes.
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum);
dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum);
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
startTime, getOzoneManager().getKeyManager(),
startTime, dds.getOzoneManager().getKeyManager(),
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Suggested change
startTime, dds.getOzoneManager().getKeyManager(),
startTime, keyManager,

Copilot uses AI. Check for mistakes.
omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(),
snapInfo.getName())) {
KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager()
KeyManager keyManager = snapInfo == null ? dds.getOzoneManager().getKeyManager()
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants