-
Notifications
You must be signed in to change notification settings - Fork 593
HDDS-14432. SnapshotDeletingService incorrectly checks Ratis Buffer Limit #9656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ | |
| import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; | ||
| import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; | ||
| import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; | ||
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; | ||
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; | ||
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; | ||
| import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest; | ||
|
|
@@ -76,7 +77,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService { | |
| // from the same table and can send deletion requests for same snapshot | ||
| // multiple times. | ||
| private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1; | ||
| private static final int MIN_ERR_LIMIT_PER_TASK = 1000; | ||
| private final ClientId clientId = ClientId.randomId(); | ||
|
|
||
| private final OzoneManager ozoneManager; | ||
|
|
@@ -210,8 +210,8 @@ public BackgroundTaskResult call() throws InterruptedException { | |
| renameKeys.add(HddsProtos.KeyValue.newBuilder().setKey(renameEntry.getKey()) | ||
| .setValue(renameEntry.getValue()).build()); | ||
| } | ||
| submitSnapshotMoveDeletedKeys(snapInfo, deletedKeys, renameKeys, deletedDirs); | ||
| remaining -= moveCount; | ||
| int submitted = submitSnapshotMoveDeletedKeysWithBatching(snapInfo, deletedKeys, renameKeys, deletedDirs); | ||
| remaining -= submitted; | ||
| } else { | ||
| snapshotsToBePurged.add(snapInfo.getTableKey()); | ||
| } | ||
|
|
@@ -247,39 +247,166 @@ private void submitSnapshotPurgeRequest(List<String> purgeSnapshotKeys) { | |
| } | ||
| } | ||
|
|
||
| private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, | ||
| List<SnapshotMoveKeyInfos> deletedKeys, | ||
| List<HddsProtos.KeyValue> renamedList, | ||
| List<SnapshotMoveKeyInfos> dirsToMove) { | ||
|
|
||
| SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder() | ||
| /** | ||
| * Submits a single batch of snapshot move requests. | ||
| * | ||
| * @param snapInfo The snapshot being processed | ||
| * @param deletedKeys List of deleted keys to move | ||
| * @param renamedList List of renamed keys | ||
| * @param dirsToMove List of deleted directories to move | ||
| * @return true if submission was successful, false otherwise | ||
| */ | ||
| private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo, | ||
| List<SnapshotMoveKeyInfos> deletedKeys, | ||
| List<HddsProtos.KeyValue> renamedList, | ||
| List<SnapshotMoveKeyInfos> dirsToMove) { | ||
| SnapshotMoveTableKeysRequest.Builder moveDeletedKeys = SnapshotMoveTableKeysRequest.newBuilder() | ||
| .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())); | ||
|
|
||
| SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder | ||
| .addAllDeletedKeys(deletedKeys) | ||
| .addAllRenamedKeys(renamedList) | ||
| .addAllDeletedDirs(dirsToMove) | ||
| .build(); | ||
| if (isBufferLimitCrossed(ratisByteLimit, 0, moveDeletedKeys.getSerializedSize())) { | ||
| int remaining = MIN_ERR_LIMIT_PER_TASK; | ||
| deletedKeys = deletedKeys.subList(0, Math.min(remaining, deletedKeys.size())); | ||
| remaining -= deletedKeys.size(); | ||
| renamedList = renamedList.subList(0, Math.min(remaining, renamedList.size())); | ||
| remaining -= renamedList.size(); | ||
| dirsToMove = dirsToMove.subList(0, Math.min(remaining, dirsToMove.size())); | ||
| moveDeletedKeys = moveDeletedKeysBuilder | ||
| .addAllDeletedKeys(deletedKeys) | ||
| .addAllRenamedKeys(renamedList) | ||
| .addAllDeletedDirs(dirsToMove) | ||
| .build(); | ||
| if (!deletedKeys.isEmpty()) { | ||
| moveDeletedKeys.addAllDeletedKeys(deletedKeys); | ||
| } | ||
|
|
||
| if (!renamedList.isEmpty()) { | ||
| moveDeletedKeys.addAllRenamedKeys(renamedList); | ||
| } | ||
|
|
||
| if (!dirsToMove.isEmpty()) { | ||
| moveDeletedKeys.addAllDeletedDirs(dirsToMove); | ||
| } | ||
|
|
||
| OMRequest omRequest = OMRequest.newBuilder() | ||
| .setCmdType(Type.SnapshotMoveTableKeys) | ||
| .setSnapshotMoveTableKeysRequest(moveDeletedKeys) | ||
| .setSnapshotMoveTableKeysRequest(moveDeletedKeys.build()) | ||
| .setClientId(clientId.toString()) | ||
| .build(); | ||
| submitOMRequest(omRequest); | ||
|
|
||
| try { | ||
| OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest); | ||
| if (response == null || !response.getSuccess()) { | ||
| LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run."); | ||
| return false; | ||
| } | ||
| return true; | ||
| } catch (ServiceException e) { | ||
| LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run", e); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Submits snapshot move requests with batching to respect the Ratis buffer limit. | ||
| * This method progressively builds batches while checking size limits before adding entries. | ||
| * | ||
| * @param snapInfo The snapshot being processed | ||
| * @param deletedKeys List of deleted keys to move | ||
| * @param renamedList List of renamed keys | ||
| * @param dirsToMove List of deleted directories to move | ||
| * @return The number of entries successfully submitted | ||
| */ | ||
| private int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo snapInfo, | ||
| List<SnapshotMoveKeyInfos> deletedKeys, | ||
| List<HddsProtos.KeyValue> renamedList, | ||
| List<SnapshotMoveKeyInfos> dirsToMove) { | ||
| List<SnapshotMoveKeyInfos> currentDeletedKeys = new ArrayList<>(); | ||
| List<HddsProtos.KeyValue> currentRenamedKeys = new ArrayList<>(); | ||
| List<SnapshotMoveKeyInfos> currentDeletedDirs = new ArrayList<>(); | ||
| long batchBytes = 0; | ||
| int totalSubmitted = 0; | ||
| int batchCount = 0; | ||
|
|
||
| for (SnapshotMoveKeyInfos key : deletedKeys) { | ||
| int keySize = key.getSerializedSize(); | ||
|
|
||
| // If adding this key would exceed the limit, flush the current batch first | ||
| if (batchBytes + keySize > ratisByteLimit && !currentDeletedKeys.isEmpty()) { | ||
| batchCount++; | ||
| LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + | ||
| "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), | ||
| currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); | ||
|
|
||
| if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { | ||
| return totalSubmitted; | ||
| } | ||
|
|
||
| totalSubmitted += currentDeletedKeys.size(); | ||
| currentDeletedKeys.clear(); | ||
| batchBytes = 0; | ||
| } | ||
|
|
||
| currentDeletedKeys.add(key); | ||
| batchBytes += keySize; | ||
| } | ||
|
Comment on lines
+314
to
+339
|
||
|
|
||
| for (HddsProtos.KeyValue renameKey : renamedList) { | ||
| int keySize = renameKey.getSerializedSize(); | ||
|
|
||
| // If adding this key would exceed the limit, flush the current batch first | ||
| if (batchBytes + keySize > ratisByteLimit && | ||
| (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) { | ||
| batchCount++; | ||
| LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + | ||
| "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), | ||
| currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); | ||
|
|
||
| if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { | ||
| return totalSubmitted; | ||
| } | ||
|
|
||
| totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size(); | ||
| currentDeletedKeys.clear(); | ||
| currentRenamedKeys.clear(); | ||
| batchBytes = 0; | ||
| } | ||
|
|
||
| currentRenamedKeys.add(renameKey); | ||
| batchBytes += keySize; | ||
| } | ||
|
|
||
| for (SnapshotMoveKeyInfos dir : dirsToMove) { | ||
| int dirSize = dir.getSerializedSize(); | ||
|
|
||
| // If adding this dir would exceed the limit, flush the current batch first | ||
| if (batchBytes + dirSize > ratisByteLimit && | ||
| (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty())) { | ||
| batchCount++; | ||
| LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + | ||
| "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), | ||
| currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); | ||
|
|
||
| if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { | ||
| return totalSubmitted; | ||
| } | ||
|
|
||
| totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); | ||
| currentDeletedKeys.clear(); | ||
| currentRenamedKeys.clear(); | ||
| currentDeletedDirs.clear(); | ||
| batchBytes = 0; | ||
| } | ||
|
|
||
| currentDeletedDirs.add(dir); | ||
| batchBytes += dirSize; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| // Submit the final batch if any | ||
| if (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty()) { | ||
| batchCount++; | ||
| LOG.debug("Submitting final batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + | ||
| "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), | ||
| currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); | ||
|
|
||
| if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { | ||
| return totalSubmitted; | ||
| } | ||
|
|
||
| totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); | ||
| } | ||
|
|
||
| LOG.debug("Successfully submitted {} total entries in {} batches for snapshot {}", totalSubmitted, batchCount, | ||
| snapInfo.getTableKey()); | ||
|
|
||
| return totalSubmitted; | ||
| } | ||
|
|
||
| private void submitOMRequest(OMRequest omRequest) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After flushing a batch during deletedKeys processing, only currentDeletedKeys is cleared (line 333), while currentRenamedKeys and currentDeletedDirs are not explicitly cleared. Although they should be empty at this point, for consistency and defensive programming, consider clearing all three lists after every flush to maintain a consistent state. This pattern is followed correctly in the later flush points (lines 357-359 and 382-385).