[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030
[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030wuchong merged 13 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements Phase 1 of JBOD (multi local disk) support in the Fluss TabletServer by introducing a LocalDiskManager that validates/locks multiple local directories, persists per-disk identity, and enables per-directory checkpointing + placement decisions across Log/KV/Replica/RemoteLog components.
Changes:
- Add
data.dirsconfig option and a newLocalDiskManagerthat validates/locks directories and manages per-disk metadata + simple load counters. - Plumb multi-directory awareness through
TabletServer,LogManager,KvManager,ReplicaManager,Replica, andRemoteLogManager(per-dir checkpoints, placement, cache). - Extend/adjust unit tests to cover multi-directory placement and per-directory checkpoint/cache behavior.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java | New unit tests for directory validation, locking, disk.properties, and placement counters. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java | Test base now initializes LocalDiskManager and supports tagged JBOD multi-dir tests. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java | Adds JBOD multi-dir tests for distribution and per-dir high-watermark checkpointing. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java | Updates replica-manager construction in tests to pass LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java | Adds JBOD multi-dir test verifying index cache is tied to replica directory. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java | Uses per-dir RemoteLogIndexCache accessor. |
| fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java | Adds JBOD tests for per-dir recovery checkpoints and clean shutdown markers. |
| fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java | Updates tests to pass explicit data dir and close/recreate LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java | Updates KV tests to pass LocalDiskManager and explicit data dir for tablet creation. |
| fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java | Refactors to operate on a list of data dirs; lists tablets per dir; adds per-dir executor helper. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java | Creates/closes LocalDiskManager and passes it into log/kv/replica managers. |
| fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java | New component that owns multi-dir validation, locking, disk identity, and placement counters. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java | Per-dir high-watermark checkpoints; placement uses LocalDiskManager; remote log manager ctor updated. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java | Replica now carries dataDir to ensure Log/KV tablets and paths are created in the chosen directory. |
| fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java | Maintains per-dir remote index caches; resolves cache based on bucket/log location. |
| fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java | Multi-dir recovery/shutdown, per-dir recovery-point checkpoints, and explicit data-dir log creation. |
| fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java | Multi-dir tablet creation APIs; plumbs selected data dir into KV tablet creation. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds new data.dirs configuration option. |
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java:134
- With multiple
data.dirs, log recovery/shutdown now creates per-directory thread pools. BecauserecoveryThreadsPerDataDiris sourced fromnetty.server.num-worker-threads(default 8), the total thread count scales asnumDataDirs * netty.server.num-worker-threads(plus extra per-dir executors), which can be much higher than before. Consider introducing a dedicated recovery thread config (total vs per-dir) or otherwise capping threads to avoid startup/shutdown thread explosion on hosts with many disks.
return new LogManager(
localDiskManager,
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
scheduler,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public int lookupPositionForOffset(RemoteLogSegment remoteLogSegment, long offset) { | ||
| return remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); | ||
| RemoteLogIndexCache remoteLogIndexCache = | ||
| remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()); | ||
| return remoteLogIndexCache == null | ||
| ? -1 |
There was a problem hiding this comment.
lookupPositionForOffset returns -1 when remoteLogIndexCacheForBucket(...) cannot resolve a cache. That firstStartPos is propagated into RemoteLogFetchInfo and the client uses it as the starting position within the first remote segment; -1 effectively causes the client to read from the beginning of the segment (wrong offset / duplicates). Consider failing fast (throw) or providing a deterministic fallback cache so a valid position is always computed for buckets that have remote segments.
| for (File dataDir : localDiskManager.dataDirs()) { | ||
| remoteLogIndexCachesByDir.put( | ||
| dataDir, | ||
| new RemoteLogIndexCache( | ||
| (int) | ||
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | ||
| .getBytes(), |
There was a problem hiding this comment.
REMOTE_LOG_INDEX_FILE_CACHE_SIZE is applied per configured dataDir by creating one RemoteLogIndexCache per directory, which multiplies the effective on-disk cache budget by #data.dirs (and changes the meaning of the config option, whose description implies a total budget). Consider splitting the configured cache size across directories or updating the configuration semantics/documentation accordingly.
| for (File dataDir : localDiskManager.dataDirs()) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| (int) | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | |
| .getBytes(), | |
| List<File> dataDirs = localDiskManager.dataDirs(); | |
| long totalRemoteLogIndexFileCacheSizeBytes = | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE).getBytes(); | |
| int perDataDirRemoteLogIndexFileCacheSizeBytes = | |
| dataDirs.isEmpty() | |
| ? (int) totalRemoteLogIndexFileCacheSizeBytes | |
| : (int) (totalRemoteLogIndexFileCacheSizeBytes / dataDirs.size()); | |
| for (File dataDir : dataDirs) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| perDataDirRemoteLogIndexFileCacheSizeBytes, |
| String serverId = properties.getProperty(SERVER_ID_KEY); | ||
| return new DiskProperties(parsedVersion, diskId, serverId); | ||
| } catch (NoSuchFileException e) { | ||
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); |
There was a problem hiding this comment.
Missing disk.properties appears to be an expected state on first startup for a fresh directory, but this is logged at WARN. This can create noisy logs during normal bootstraps; consider lowering this to INFO/DEBUG (or only warn when the directory is non-empty / previously initialized).
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); | |
| LOG.info("No disk.properties file under dir {}", file.getAbsolutePath()); |
| /** Test for {@link LogManager}. */ | ||
| final class LogManagerTest extends LogTestBase { | ||
|
|
||
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; |
There was a problem hiding this comment.
JBOD_MULTI_DIR_TAG duplicates the same tag constant already defined in ReplicaTestBase (ReplicaTestBase.JBOD_MULTI_DIR_TAG). Reusing the shared constant would avoid accidental drift if the tag string changes.
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; | |
| private static final String JBOD_MULTI_DIR_TAG = | |
| org.apache.fluss.server.replica.ReplicaTestBase.JBOD_MULTI_DIR_TAG; |
| } | ||
| } | ||
| } catch (LogStorageException e) { | ||
| throw e; |
There was a problem hiding this comment.
This exception has been thrown inside the loop, the outer try-catch could be removed.
There was a problem hiding this comment.
This exception has been thrown inside the loop, the outer try-catch could be removed.
Addressed in 6999b9e. I removed the redundant outer try-catch since the exception is already handled inside the loop.
| return remoteLogIndexCache.lookupOffsetForTimestamp(segment, timestamp); | ||
| RemoteLogIndexCache remoteLogIndexCache = remoteLogIndexCacheForBucket(tableBucket); | ||
| return remoteLogIndexCache == null | ||
| ? -1L |
There was a problem hiding this comment.
When remoteLogIndexCache is null, will it be better to throw an exception instead of return -1?
The same returning value -1 will mix up the cases between 1. failing to find the offset index. and 2. indexCache is null.
There was a problem hiding this comment.
When remoteLogIndexCache is null, will it be better to throw an exception instead of return -1? The same returning value -1 will mix up the cases between 1. failing to find the offset index. and 2. indexCache is null.
Addressed in 6999b9e. I changed this branch to throw NotLeaderOrFollowerException instead of returning -1, so we no longer mix up “index lookup miss” with “failed to resolve the index cache”.
| public synchronized File resolveDataDir(File path) { | ||
| Path pathToResolve = path.toPath(); | ||
| for (File dataDir : dataDirs) { | ||
| if (pathToResolve.startsWith(dataDir.toPath())) { |
There was a problem hiding this comment.
should ensure that the pathToResolve should be an absolute path, since we check with startsWith function. pathToResolve.toAbsolutePath().normalize()
There was a problem hiding this comment.
should ensure that the pathToResolve should be an absolute path, since we check with
startsWithfunction.pathToResolve.toAbsolutePath().normalize()
Thanks. Addressed in 6999b9e.
| } | ||
|
|
||
| try { | ||
| return Integer.parseInt(serverId.trim()); |
There was a problem hiding this comment.
many trim() calls in one function. In the constructor the serverId already trimmed. Or just trim() once in one function.
There was a problem hiding this comment.
many
trim()calls in one function. In the constructor the serverId already trimmed. Or just trim() once in one function.
Thanks. Addressed in 6999b9e.
| } | ||
|
|
||
| private int serverIdAsInt(File dataDir) throws IOException { | ||
| if (serverId == null || serverId.trim().isEmpty()) { |
There was a problem hiding this comment.
serverId is already trimmed in constructor.
There was a problem hiding this comment.
serverId is already trimmed in constructor.
Thanks. Addressed in 6999b9e.
|
|
||
| private String diskId(File dataDir) throws IOException { | ||
| if (diskId == null || diskId.trim().isEmpty()) { | ||
| throw new IOException( |
There was a problem hiding this comment.
same as above. diskId also was trimmed in the constructor.
There was a problem hiding this comment.
same as above. diskId also was trimmed in the constructor.
Thanks. Addressed in 6999b9e.
6999b9e to
8c7375b
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @hanliu0830 , I left some comments.
| try { | ||
| Map<TableBucket, Long> recoveryOffsets = new HashMap<>(); | ||
| for (Map.Entry<TableBucket, LogTablet> entry : currentLogs.entrySet()) { | ||
| void checkpointRecoveryOffsets(File dataDir) { |
There was a problem hiding this comment.
Could you refactor this method to collect the LogTablets that belong to the dataDir, and directly call checkpointRecoveryOffsets(File dataDir, List<LogTablet> logs)? This helps us to reuse code.
Code reviewFound 1 issue:
fluss/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java Lines 180 to 185 in 9bc453c 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
wuchong
left a comment
There was a problem hiding this comment.
Besides, The JBOD multi-directory feature has comprehensive unit tests (LocalDiskManagerTest with 14 cases, ReplicaManagerTest with 3 JBOD-tagged cases, plus adapted LogManagerTest/KvManagerTest), but it lacks integration tests (ITCase) that validate the end-to-end behavior in a real multi-TabletServer cluster. I'd suggest adding a JbodMultiDirITCase under fluss-server/.../server/replica/ (alongside existing KvSnapshotITCase, ReplicaFetcherITCase, etc.) using FlussClusterExtension with data.dirs configured to multiple directories. Key scenarios to cover: (1) creating a table and verifying buckets are distributed across directories, (2) writing and reading data correctly under multi-dir setup, (3) PK table Log/KV co-location across a real cluster. This would require first extending FlussClusterExtension to support per-TabletServer data.dirs configuration if it doesn't already.
Addressed in dc334c3.Yes, I split the handling of |
6dac12a to
87db556
Compare
wuchong
left a comment
There was a problem hiding this comment.
I pushed a commit to refactor checkpointRecoveryOffsets(File dataDir) to invoke checkpointRecoveryOffsets(File dataDir, List logs) which is minor improvements. Others looks good to me.
I will merge this once the CI is passed.
The integration IT case JbodMultiDirITCase can be added in a follow up PR.
…RecoveryOffsets(File dataDir, List<LogTablet> logs)
13f66b6 to
45f7b8f
Compare
(The sections below can be removed for hotfixes or typos)
-->
Purpose
Linked issue: close #145
Brief change log
Tests
API and Format
Documentation