-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-14020. Use ForkJoinPool instead of a ScheduledThreadPoolExecutor in BackgroundService #9686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…rvice Change-Id: I4c1a051e8574d32375cbebeb10546563f4a4f817
Change-Id: Ie2118356f902443a93fe666d890ad4d59e9dd467
…omment: apache#9390 (comment) Refactor BackgroundTask to use wrapper pattern for ForkJoinPool integration This commit introduces BackgroundTaskForkJoin as a wrapper class to integrate BackgroundTask with ForkJoinPool, avoiding the need to change all service implementations from 'implements' to 'extends'. Key changes: - Reverted BackgroundTask from abstract class back to interface - Created BackgroundTaskForkJoin wrapper extending RecursiveTask - Updated BackgroundService to wrap tasks before forking - Reverted all service task classes to 'implements BackgroundTask'
|
There are some issues after rebasing. Let me fix it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Refactors Ozone background services to run tasks via a ForkJoinPool (instead of a ScheduledThreadPoolExecutor) and updates directory deletion to support fork/join-style parallelism, targeting shutdown deadlock avoidance (HDDS-14020).
Changes:
- Reworked
BackgroundServicescheduling/execution to use aForkJoinPoolplus a sharedScheduledExecutorService. - Updated
DirectoryDeletingServiceto optionally fork internal deletion work and adjusted related tests. - Introduced fork/join wrappers and minor task-wrapping/refactoring in deleting services and SST filtering.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java | Core switch to ForkJoin-based execution and custom periodic scheduling. |
| hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java | New ForkJoin wrapper for BackgroundTask execution. |
| hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java | New shared scheduler supplier for periodic rescheduling. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java | Uses fork/join-style parallelism instead of internal executors; adds allowTasksToFork plumbing. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java | Refactors task wrapping into a reusable BackgroundDeleteTask. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java | Adjusts call() signature and adds interrupt handling. |
| hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java | Updates task construction for the new DirDeletingTask signature. |
| hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java | Updates PeriodicalTask construction to match new signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After catching InterruptedException, the code re-interrupts the thread but continues processing subsequent snapshots. Typically interruption should stop the task (e.g., break the loop / return) to allow the service to shut down promptly and avoid doing more work on an interrupted thread.
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); | |
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); | |
| break; |
| scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), | ||
| intervalInMillis, TimeUnit.MILLISECONDS); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduled runnable in scheduleNextTask() calls exec.submit(...) without re-checking shutdown state or whether exec is still non-null. Since shutdown() sets exec = null and the shared scheduler uses shutdown() (not shutdownNow()), already-scheduled tasks can still execute after shutdown and hit NPE / RejectedExecutionException. Guard inside the scheduled runnable (check shutdown flag + exec != null) and/or cancel scheduled tasks on shutdown.
| scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), | |
| intervalInMillis, TimeUnit.MILLISECONDS); | |
| scheduledExecuterService.schedule(() -> { | |
| if (!isShutdown.get() && exec != null) { | |
| exec.submit(new PeriodicalTask(this)); | |
| } | |
| }, intervalInMillis, TimeUnit.MILLISECONDS); |
| exec = new ForkJoinPool(threadPoolSize, factory, null, false); | ||
| isShutdown = new AtomicReference<>(false); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Background tasks frequently do blocking I/O (eg OM Ratis submitRequest, RocksDB calls). Using a ForkJoinPool for blocking work can reduce parallelism and hurt throughput unless blocking sections use ForkJoinPool.managedBlock(...) (or a dedicated blocking pool is used). Consider addressing blocking sections or documenting why ForkJoinPool is safe here.
| public static synchronized UncheckedAutoCloseableSupplier<ScheduledExecutorService> get() { | ||
| if (executor == null) { | ||
| ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); | ||
| executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) -> { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BackgroundServiceScheduler uses the default thread factory for ScheduledThreadPoolExecutor, which creates non-daemon threads. Since this is a shared background scheduler, a non-daemon thread can keep the JVM alive if something forgets to close/release it. Consider using a daemon thread factory (and naming the thread) for the scheduler.
| public class BackgroundTaskForkJoin extends RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> { | ||
| private static final long serialVersionUID = 1L; | ||
| private final transient BackgroundTask backgroundTask; | ||
|
|
||
| public BackgroundTaskForkJoin(BackgroundTask backgroundTask) { |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is added under the hdds-common module but depends on BackgroundTask/BackgroundTaskResult, which are defined in hdds-server-framework (and hdds-common does not depend on it). Since hdds-server-framework already depends on hdds-common, adding the reverse dependency would create a cycle; as-is, this likely won’t compile. Consider moving this class into hdds-server-framework, or into a new shared module that both can depend on.
| }, exec).exceptionally(e -> null), (Void1, Void) -> null); | ||
| Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> { | ||
| task.fork(); | ||
| tasksInFlight.offer(task); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method accept ignores exceptional return value of Queue.offer.
| tasksInFlight.offer(task); | |
| if (!tasksInFlight.offer(task)) { | |
| LOG.error("Failed to enqueue background task for service {}. Task will not be tracked.", serviceName); | |
| } |
| } | ||
| }; | ||
| task.fork(); | ||
| recursiveTasks.offer(task); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method processDeletedDirsForStore ignores exceptional return value of Queue<RecursiveTask>.offer.
| recursiveTasks.offer(task); | |
| if (!recursiveTasks.offer(task)) { | |
| // If the task cannot be enqueued, ensure it is joined and | |
| // mark that not all deleted directories were processed. | |
| task.join(); | |
| processedAllDeletedDirs = false; | |
| break; | |
| } |
| pendingDeletedDirInfo.getValue(), | ||
| pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList, | ||
| getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); | ||
| dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, | ||
| subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey, | ||
| startTime, getOzoneManager().getKeyManager(), | ||
| startTime, dds.getOzoneManager().getKeyManager(), |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
| startTime, dds.getOzoneManager().getKeyManager(), | |
| startTime, keyManager, |
| omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(), | ||
| snapInfo.getName())) { | ||
| KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() | ||
| KeyManager keyManager = snapInfo == null ? dds.getOzoneManager().getKeyManager() |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access of element annotated with VisibleForTesting found in production code.
What changes were proposed in this pull request?
This is the continuation of #9390
This addressed comment from @sumitagrawl (#9390 (comment)) which significantly reduces the number of files touched.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14020
How was this patch tested?