Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,10 @@ public Statistics getStatistics() {
}

/**
* Increase the number of bytes write into the container.
* Also decrement committed bytes against the bytes written.
* @param bytes the number of bytes write into the container.
* Calculate how much committedBytes should be decremented for write.
* This is used both for decrementing on write and restoring on failure.
*/
private void incrWriteBytes(long bytes) {
/*
Increase the cached Used Space in VolumeInfo as it
maybe not updated, DU or DedicatedDiskSpaceUsage runs
periodically to update the Used Space in VolumeInfo.
*/
this.getVolume().incrementUsedSpace(bytes);
private long getCommittedBytesDecrement(long bytes) {
// Calculate bytes used before this write operation.
// Note that getBytesUsed() already includes the 'bytes' from the current write.
long bytesUsedBeforeWrite = getBytesUsed() - bytes;
Expand All @@ -417,8 +410,31 @@ private void incrWriteBytes(long bytes) {
if (committedSpace && availableSpaceBeforeWrite > 0) {
// Decrement committed space only by the portion of the write that fits within the originally committed space,
// up to maxSize
long decrement = Math.min(bytes, availableSpaceBeforeWrite);
this.getVolume().incCommittedBytes(-decrement);
return Math.min(bytes, availableSpaceBeforeWrite);
}
return 0;
}

/**
* Increase the number of bytes write into the container.
* Also decrement committed bytes against the bytes written.
* @param bytes the number of bytes write into the container.
*/
private void incrWriteBytes(long bytes) {
long committedBytesDecrement = getCommittedBytesDecrement(bytes);
if (committedBytesDecrement > 0) {
this.getVolume().incCommittedBytes(-committedBytesDecrement);
}
}

/**
* Restore committedBytes when a write operation fails after writeChunk succeeded.
* This undoes the committedBytes decrement done in incrWriteBytes().
*/
public void restoreCommittedBytesOnWriteFailure(long bytes) {
long committedBytesDecrement = getCommittedBytesDecrement(bytes);
if (committedBytesDecrement > 0) {
this.getVolume().incCommittedBytes(committedBytesDecrement);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class HddsVolume extends StorageVolume {
private ContainerController controller;

private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full
private final AtomicLong spaceReservedForWrites = new AtomicLong(); // for in-flight writes
private Function<HddsVolume, Long> gatherContainerUsages = (K) -> 0L;

private final ConcurrentSkipListSet<Long> containerIds = new ConcurrentSkipListSet<>();
Expand Down Expand Up @@ -410,6 +411,24 @@ public long getCommittedBytes() {
return committedBytes.get();
}

/**
* Reserve space for an in-flight write operation.
*
* @param bytes bytes to reserve
*/
public void reserveSpaceForWrite(long bytes) {
spaceReservedForWrites.addAndGet(bytes);
}

/**
* Release space reserved for write when write completes or fails.
*
* @param bytes bytes to release
*/
public void releaseReservedSpaceForWrite(long bytes) {
spaceReservedForWrites.addAndGet(-bytes);
}

public long getFreeSpaceToSpare(long volumeCapacity) {
return getDatanodeConfig().getMinFreeSpace(volumeCapacity);
}
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add test coverage to verify the restore of both usedspace and committedBytes

Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,11 @@ ContainerCommandResponseProto handleWriteChunk(
}

ContainerProtos.BlockData blockDataProto = null;
HddsVolume volume = kvContainer.getContainerData().getVolume();
long bytesToWrite = 0;
boolean spaceReserved = false;
boolean writeChunkSucceeded = false;

try {
checkContainerOpen(kvContainer);

Expand All @@ -1057,9 +1062,17 @@ ContainerCommandResponseProto handleWriteChunk(
ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
// TODO: Can improve checksum validation here. Make this one-shot after protocol change.
validateChunkChecksumData(data, chunkInfo);
bytesToWrite = chunkInfo.getLen();

// Reserve space before writing
if (volume != null && bytesToWrite > 0) {
volume.reserveSpaceForWrite(bytesToWrite);
spaceReserved = true;
}
}
chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext);
writeChunkSucceeded = true;

final boolean isCommit = dispatcherContext.getStage().isCommit();
if (isCommit && writeChunk.hasBlock()) {
Expand All @@ -1086,14 +1099,17 @@ ContainerCommandResponseProto handleWriteChunk(
metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime);
}

commitSpaceReservedForWrite(volume, spaceReserved, bytesToWrite);
// We should increment stats after writeChunk
if (isWrite) {
metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
.getChunkData().getLen());
}
} catch (StorageContainerException ex) {
releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer);
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer);
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
request);
Expand All @@ -1102,6 +1118,31 @@ ContainerCommandResponseProto handleWriteChunk(
return getWriteChunkResponseSuccess(request, blockDataProto);
}

/**
* Commit space reserved for write to usedSpace when write operation succeeds.
*/
private void commitSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, long bytes) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can commit happen when space is not reserved?

if (spaceReserved) {
volume.releaseReservedSpaceForWrite(bytes);
volume.incrementUsedSpace(bytes);
Copy link

@yandrey321 yandrey321 Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since release and increment are not inside critical section, its better to incrementUsedSpace first and then release reserved space, if its done as it currently implemented if other thread check free space between these 2 calls it could try to use free space that already used by this write.

}
}

/**
* Release space reserved for write when write operation fails.
* Also restores committedBytes if it was decremented during write.
*/
private void releaseSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved,
boolean writeChunkSucceeded, long bytes, KeyValueContainer kvContainer) {
if (spaceReserved) {
volume.releaseReservedSpaceForWrite(bytes);
// Only restore committedBytes if write chunk succeeded
if (writeChunkSucceeded) {
kvContainer.getContainerData().restoreCommittedBytesOnWriteFailure(bytes);
}
}
}

/**
* Handle Write Chunk operation for closed container. Calls ChunkManager to process the request.
*/
Expand Down