Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest;
Expand All @@ -76,7 +77,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
// from the same table and can send deletion requests for same snapshot
// multiple times.
private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1;
private static final int MIN_ERR_LIMIT_PER_TASK = 1000;
private final ClientId clientId = ClientId.randomId();

private final OzoneManager ozoneManager;
Expand Down Expand Up @@ -210,8 +210,8 @@ public BackgroundTaskResult call() throws InterruptedException {
renameKeys.add(HddsProtos.KeyValue.newBuilder().setKey(renameEntry.getKey())
.setValue(renameEntry.getValue()).build());
}
submitSnapshotMoveDeletedKeys(snapInfo, deletedKeys, renameKeys, deletedDirs);
remaining -= moveCount;
int submitted = submitSnapshotMoveDeletedKeysWithBatching(snapInfo, deletedKeys, renameKeys, deletedDirs);
remaining -= submitted;
} else {
snapshotsToBePurged.add(snapInfo.getTableKey());
}
Expand Down Expand Up @@ -247,39 +247,166 @@ private void submitSnapshotPurgeRequest(List<String> purgeSnapshotKeys) {
}
}

private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {

SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder()
/**
* Submits a single batch of snapshot move requests.
*
* @param snapInfo The snapshot being processed
* @param deletedKeys List of deleted keys to move
* @param renamedList List of renamed keys
* @param dirsToMove List of deleted directories to move
* @return true if submission was successful, false otherwise
*/
private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {
SnapshotMoveTableKeysRequest.Builder moveDeletedKeys = SnapshotMoveTableKeysRequest.newBuilder()
.setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()));

SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
.addAllDeletedKeys(deletedKeys)
.addAllRenamedKeys(renamedList)
.addAllDeletedDirs(dirsToMove)
.build();
if (isBufferLimitCrossed(ratisByteLimit, 0, moveDeletedKeys.getSerializedSize())) {
int remaining = MIN_ERR_LIMIT_PER_TASK;
deletedKeys = deletedKeys.subList(0, Math.min(remaining, deletedKeys.size()));
remaining -= deletedKeys.size();
renamedList = renamedList.subList(0, Math.min(remaining, renamedList.size()));
remaining -= renamedList.size();
dirsToMove = dirsToMove.subList(0, Math.min(remaining, dirsToMove.size()));
moveDeletedKeys = moveDeletedKeysBuilder
.addAllDeletedKeys(deletedKeys)
.addAllRenamedKeys(renamedList)
.addAllDeletedDirs(dirsToMove)
.build();
if (!deletedKeys.isEmpty()) {
moveDeletedKeys.addAllDeletedKeys(deletedKeys);
}

if (!renamedList.isEmpty()) {
moveDeletedKeys.addAllRenamedKeys(renamedList);
}

if (!dirsToMove.isEmpty()) {
moveDeletedKeys.addAllDeletedDirs(dirsToMove);
}

OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.SnapshotMoveTableKeys)
.setSnapshotMoveTableKeysRequest(moveDeletedKeys)
.setSnapshotMoveTableKeysRequest(moveDeletedKeys.build())
.setClientId(clientId.toString())
.build();
submitOMRequest(omRequest);

try {
OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest);
if (response == null || !response.getSuccess()) {
LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run.");
return false;
}
return true;
} catch (ServiceException e) {
LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run", e);
return false;
}
}

/**
* Submits snapshot move requests with batching to respect the Ratis buffer limit.
* This method progressively builds batches while checking size limits before adding entries.
*
* @param snapInfo The snapshot being processed
* @param deletedKeys List of deleted keys to move
* @param renamedList List of renamed keys
* @param dirsToMove List of deleted directories to move
* @return The number of entries successfully submitted
*/
private int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {
List<SnapshotMoveKeyInfos> currentDeletedKeys = new ArrayList<>();
List<HddsProtos.KeyValue> currentRenamedKeys = new ArrayList<>();
List<SnapshotMoveKeyInfos> currentDeletedDirs = new ArrayList<>();
long batchBytes = 0;
int totalSubmitted = 0;
int batchCount = 0;

for (SnapshotMoveKeyInfos key : deletedKeys) {
int keySize = key.getSerializedSize();

// If adding this key would exceed the limit, flush the current batch first
if (batchBytes + keySize > ratisByteLimit && !currentDeletedKeys.isEmpty()) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size();
currentDeletedKeys.clear();
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After flushing a batch during deletedKeys processing, only currentDeletedKeys is cleared (line 333), while currentRenamedKeys and currentDeletedDirs are not explicitly cleared. Although they should be empty at this point, for consistency and defensive programming, consider clearing all three lists after every flush to maintain a consistent state. This pattern is followed correctly in the later flush points (lines 357-359 and 382-385).

Suggested change
currentDeletedKeys.clear();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
currentDeletedDirs.clear();

Copilot uses AI. Check for mistakes.
batchBytes = 0;
}

currentDeletedKeys.add(key);
batchBytes += keySize;
}
Comment on lines +314 to +339
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch size calculation sums the serialized sizes of individual entries but doesn't account for the protocol buffer message overhead from SnapshotMoveTableKeysRequest and OMRequest wrappers. While the 10% safety margin in ratisByteLimit (line 113) may provide some buffer, consider explicitly accounting for base message overhead or documenting this limitation to ensure batches reliably stay under the limit even with many small entries.

Copilot uses AI. Check for mistakes.

for (HddsProtos.KeyValue renameKey : renamedList) {
int keySize = renameKey.getSerializedSize();

// If adding this key would exceed the limit, flush the current batch first
if (batchBytes + keySize > ratisByteLimit &&
(!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
batchBytes = 0;
}

currentRenamedKeys.add(renameKey);
batchBytes += keySize;
}

for (SnapshotMoveKeyInfos dir : dirsToMove) {
int dirSize = dir.getSerializedSize();

// If adding this dir would exceed the limit, flush the current batch first
if (batchBytes + dirSize > ratisByteLimit &&
(!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty())) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
currentDeletedDirs.clear();
batchBytes = 0;
}

currentDeletedDirs.add(dir);
batchBytes += dirSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchBytes is not accounting for fromSnapshotID serialized size and the size of enclosing OMRequest?

}

// Submit the final batch if any
if (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty()) {
batchCount++;
LOG.debug("Submitting final batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size();
}

LOG.debug("Successfully submitted {} total entries in {} batches for snapshot {}", totalSubmitted, batchCount,
snapInfo.getTableKey());

return totalSubmitted;
}

private void submitOMRequest(OMRequest omRequest) {
Expand Down
Loading