Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ void testIsUnalignedCheckpointInterruptibleTimersEnabled() {
}

@Test
void testIsUnalignedDuringRecoveryEnabled() {
void testIsCheckpointingDuringRecoveryEnabled() {
// Test when both options are disabled (default) - should return false
Configuration defaultConfig = new Configuration();
assertThat(CheckpointingOptions.isCheckpointingDuringRecoveryEnabled(defaultConfig))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial
@GuardedBy("buffers")
private boolean needNotifyPriorityEvent() {
assert Thread.holdsLock(buffers);
// if subpartition is blocked then downstream doesn't expect any notifications
return buffers.getNumPriorityElements() == 1 && !isBlocked;
// Priority events (unaligned checkpoint barriers) must notify downstream even when
// blocked. The blocking mechanism is for data flow control, not for priority events.
return buffers.getNumPriorityElements() == 1;
}

@GuardedBy("buffers")
Expand Down Expand Up @@ -456,7 +457,9 @@ public void release() {
@Nullable
BufferAndBacklog pollBuffer() {
synchronized (buffers) {
if (isBlocked) {
// When blocked, only allow priority buffers (e.g. unaligned checkpoint barriers)
// to be polled. Data buffers remain blocked until resumeConsumption() is called.
if (isBlocked && buffers.getNumPriorityElements() == 0) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,12 @@ public String toString() {

public abstract CompletableFuture<Void> getStateConsumedFuture();

/**
* Returns a future that completes when buffer filtering is complete for all channels. This
* future completes before {@link #getStateConsumedFuture()}, enabling earlier RUNNING state
* transition when unaligned checkpoint during recovery is enabled.
*/
public abstract CompletableFuture<Void> getBufferFilteringCompleteFuture();

public abstract void finishReadRecoveredState() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -80,6 +80,13 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit

private final Deque<BufferAndBacklog> toBeConsumedBuffers = new ArrayDeque<>();

/**
* Flag indicating whether there is a pending priority event (e.g., checkpoint barrier) in the
* subpartitionView that should be consumed before toBeConsumedBuffers. This is set by {@link
* #notifyPriorityEvent} and checked in {@link #getNextBuffer()}.
*/
private volatile boolean hasPendingPriorityEvent = false;

public LocalInputChannel(
SingleInputGate inputGate,
int channelIndex,
Expand All @@ -91,7 +98,8 @@ public LocalInputChannel(
int maxBackoff,
Counter numBytesIn,
Counter numBuffersIn,
ChannelStateWriter stateWriter) {
ChannelStateWriter stateWriter,
@Nullable ArrayDeque<Buffer> initialRecoveredBuffers) {

super(
inputGate,
Expand All @@ -106,14 +114,47 @@ public LocalInputChannel(
this.partitionManager = checkNotNull(partitionManager);
this.taskEventPublisher = checkNotNull(taskEventPublisher);
this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo());

// Migrate recovered buffers from RecoveredInputChannel if provided.
// These buffers have been filtered but not yet consumed by the Task.
if (initialRecoveredBuffers != null && !initialRecoveredBuffers.isEmpty()) {
final int expectedCount = initialRecoveredBuffers.size();
// Sequence number starts at Integer.MIN_VALUE, consistent with RecoveredInputChannel.
int seqNum = Integer.MIN_VALUE;
while (!initialRecoveredBuffers.isEmpty()) {
Buffer buffer = initialRecoveredBuffers.poll();
// Determine next data type based on the next buffer in the queue
Buffer.DataType nextDataType =
initialRecoveredBuffers.isEmpty()
? Buffer.DataType.NONE
: initialRecoveredBuffers.peek().getDataType();
// buffersInBacklog is set to 0 as these are recovered buffers
BufferAndBacklog bufferAndBacklog =
new BufferAndBacklog(buffer, 0, nextDataType, seqNum++);
toBeConsumedBuffers.add(bufferAndBacklog);
}
checkState(
toBeConsumedBuffers.size() == expectedCount,
"Buffer migration failed: expected %s buffers but got %s",
expectedCount,
toBeConsumedBuffers.size());
}
}

// ------------------------------------------------------------------------
// Consume
// ------------------------------------------------------------------------

public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
// Collect inflight buffers from toBeConsumedBuffers to be persisted.
// These are buffers that have not been consumed yet when the checkpoint barrier arrives.
List<Buffer> inflightBuffers = new ArrayList<>();
for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
if (bufferAndBacklog.buffer().isBuffer()) {
inflightBuffers.add(bufferAndBacklog.buffer().retainBuffer());
}
}
channelStatePersister.startPersisting(barrier.getId(), inflightBuffers);
}

public void checkpointStopped(long checkpointId) {
Expand All @@ -122,8 +163,6 @@ public void checkpointStopped(long checkpointId) {

@Override
protected void requestSubpartitions() throws IOException {
checkState(toBeConsumedBuffers.isEmpty());

boolean retriggerRequest = false;
boolean notifyDataAvailable = false;

Expand Down Expand Up @@ -234,7 +273,63 @@ public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkError();

if (!toBeConsumedBuffers.isEmpty()) {
return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
// If there is a pending priority event (e.g., unaligned checkpoint barrier), fetch it
// from subpartitionView first, skipping toBeConsumedBuffers. This ensures priority
// events are processed immediately even when there are pending recovered buffers.
if (hasPendingPriorityEvent) {
checkState(subpartitionView != null, "No subpartition view available");
BufferAndBacklog next = subpartitionView.getNextBuffer();
checkState(
next != null && next.buffer().getDataType().hasPriority(),
"Expected priority event, but got %s",
next == null ? "null" : next.buffer().getDataType());

// Check for barrier to update channel state persister.
// Note: maybePersist is not needed for barriers as they are not regular data
// buffers.
channelStatePersister.checkForBarrier(next.buffer());

Buffer.DataType expectedNextDataType = next.getNextDataType();
if (!expectedNextDataType.hasPriority()) {
// Reset hasPendingPriorityEvent to false if no more priority event
hasPendingPriorityEvent = false;
if (!toBeConsumedBuffers.isEmpty()) {
// Correct nextDataType: if toBeConsumedBuffers is not empty, the actual
// next
// element to consume is from toBeConsumedBuffers, not from subpartitionView
expectedNextDataType = toBeConsumedBuffers.peek().buffer().getDataType();
}
}

return getBufferAndAvailability(
new BufferAndBacklog(
next.buffer(),
next.buffersInBacklog(),
expectedNextDataType,
next.getSequenceNumber()));
}

BufferAndBacklog next = toBeConsumedBuffers.removeFirst();

// If this is the last recovered buffer and nextDataType is NONE,
// dynamically check if subpartitionView has data available.
// The last buffer's nextDataType was preset to NONE during construction,
// but subpartitionView may already have data available.
if (toBeConsumedBuffers.isEmpty()
&& next.getNextDataType() == Buffer.DataType.NONE
&& subpartitionView != null) {
ResultSubpartitionView.AvailabilityWithBacklog availability =
subpartitionView.getAvailabilityAndBacklog(true);
if (availability.isAvailable()) {
next =
new BufferAndBacklog(
next.buffer(),
availability.getBacklog(),
Buffer.DataType.DATA_BUFFER,
next.getSequenceNumber());
}
}
return getBufferAndAvailability(next);
}

ResultSubpartitionView subpartitionView = this.subpartitionView;
Expand Down Expand Up @@ -331,6 +426,14 @@ public void notifyDataAvailable(ResultSubpartitionView view) {
notifyChannelNonEmpty();
}

@Override
public void notifyPriorityEvent(int prioritySequenceNumber) {
// Set flag so that getNextBuffer() knows to fetch priority event from subpartitionView
// before consuming toBeConsumedBuffers.
hasPendingPriorityEvent = true;
super.notifyPriorityEvent(prioritySequenceNumber);
}

private ResultSubpartitionView checkAndWaitForSubpartitionView() {
// synchronizing on the request lock means this blocks until the asynchronous request
// for the partition view has been completed
Expand Down Expand Up @@ -402,6 +505,13 @@ void releaseAllResources() throws IOException {
view.releaseAllResources();
subpartitionView = null;
}

// Release any remaining buffers in toBeConsumedBuffers to avoid memory leak.
// These may be recovered buffers or partial buffers from FullyFilledBuffer.
for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
bufferAndBacklog.buffer().recycleBuffer();
}
toBeConsumedBuffers.clear();
}
}

Expand All @@ -418,18 +528,19 @@ void announceBufferSize(int newBufferSize) {
@Override
int getBuffersInUseCount() {
ResultSubpartitionView view = this.subpartitionView;
return view == null ? 0 : view.getNumberOfQueuedBuffers();
return toBeConsumedBuffers.size() + (view == null ? 0 : view.getNumberOfQueuedBuffers());
}

@Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
ResultSubpartitionView view = subpartitionView;

int count = toBeConsumedBuffers.size();
if (view != null) {
return view.unsynchronizedGetNumberOfQueuedBuffers();
count += view.unsynchronizedGetNumberOfQueuedBuffers();
}

return 0;
return count;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;

import java.util.ArrayDeque;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -61,7 +64,7 @@ public class LocalRecoveredInputChannel extends RecoveredInputChannel {
}

@Override
protected InputChannel toInputChannelInternal() {
protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers) {
return new LocalInputChannel(
inputGate,
getChannelIndex(),
Expand All @@ -73,6 +76,7 @@ protected InputChannel toInputChannelInternal() {
maxBackoff,
numBytesIn,
numBuffersIn,
channelStateWriter);
channelStateWriter,
remainingBuffers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public abstract class RecoveredInputChannel extends InputChannel implements Chan
private final CompletableFuture<?> stateConsumedFuture = new CompletableFuture<>();
protected final BufferManager bufferManager;

/**
* Future that completes when recovered buffers have been filtered for this channel. This
* completes before stateConsumedFuture, enabling earlier RUNNING state transition when
* unaligned checkpoint during recovery is enabled.
*/
private final CompletableFuture<Void> bufferFilteringCompleteFuture = new CompletableFuture<>();

@GuardedBy("receivedBuffers")
private boolean isReleased;

Expand Down Expand Up @@ -109,9 +116,26 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
}

public final InputChannel toInputChannel() throws IOException {
Preconditions.checkState(
stateConsumedFuture.isDone(), "recovered state is not fully consumed");
final InputChannel inputChannel = toInputChannelInternal();
// Check the appropriate future based on configuration:
// - When unaligned during recovery is enabled: check bufferFilteringCompleteFuture
// - When disabled: check stateConsumedFuture (original behavior)
if (inputGate.isCheckpointingDuringRecoveryEnabled()) {
Preconditions.checkState(
bufferFilteringCompleteFuture.isDone(), "buffer filtering is not complete");
} else {
Preconditions.checkState(
stateConsumedFuture.isDone(), "recovered state is not fully consumed");
}

// Extract remaining buffers before conversion.
// These buffers have been filtered but not yet consumed by the Task.
final ArrayDeque<Buffer> remainingBuffers;
synchronized (receivedBuffers) {
remainingBuffers = new ArrayDeque<>(receivedBuffers);
receivedBuffers.clear();
}

final InputChannel inputChannel = toInputChannelInternal(remainingBuffers);
inputChannel.checkpointStopped(lastStoppedCheckpointId);
return inputChannel;
}
Expand All @@ -121,7 +145,23 @@ public void checkpointStopped(long checkpointId) {
this.lastStoppedCheckpointId = checkpointId;
}

protected abstract InputChannel toInputChannelInternal() throws IOException;
/**
* Creates the physical InputChannel from this recovered channel.
*
* @param remainingBuffers buffers that have been filtered but not yet consumed by the Task.
* These buffers will be migrated to the new physical channel.
* @return the physical InputChannel (LocalInputChannel or RemoteInputChannel)
*/
protected abstract InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers)
throws IOException;

/**
* Returns the future that completes when buffer filtering is complete. This future completes
* before stateConsumedFuture, at the point when finishReadRecoveredState() is called.
*/
CompletableFuture<Void> getBufferFilteringCompleteFuture() {
return bufferFilteringCompleteFuture;
}

CompletableFuture<?> getStateConsumedFuture() {
return stateConsumedFuture;
Expand Down Expand Up @@ -163,6 +203,13 @@ public void finishReadRecoveredState() throws IOException {
EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
bufferManager.releaseFloatingBuffers();
LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo);

// Complete bufferFilteringCompleteFuture only when unaligned during recovery is enabled.
// This signals that buffer filtering is complete, allowing earlier RUNNING state
// transition. When the config is disabled, this future should not be completed.
if (inputGate.isCheckpointingDuringRecoveryEnabled()) {
bufferFilteringCompleteFuture.complete(null);
}
}

@Nullable
Expand Down
Loading