@@ -545,6 +545,80 @@ void testResumeBlockedEmptySubpartition() throws IOException, InterruptedExcepti
545545 assertNoNextBuffer (readView );
546546 }
547547
548+ @ TestTemplate
549+ void testPriorityEventBypassesBlockedSubpartition () throws Exception {
550+ subpartition .setChannelStateWriter (ChannelStateWriter .NO_OP );
551+
552+ // Block the subpartition by consuming an aligned checkpoint barrier
553+ blockSubpartitionByCheckpoint (1 );
554+ assertThat (availablityListener .getNumPriorityEvents ()).isZero ();
555+
556+ // While blocked, add an unaligned checkpoint barrier (priority event).
557+ // Even though isBlocked=true, the priority event notification should NOT
558+ // be suppressed — priority events must bypass blocking.
559+ CheckpointOptions options =
560+ CheckpointOptions .unaligned (
561+ CheckpointType .CHECKPOINT ,
562+ new CheckpointStorageLocationReference (new byte [] {0 , 1 , 2 }));
563+ BufferConsumer barrierBuffer =
564+ EventSerializer .toBufferConsumer (new CheckpointBarrier (0 , 0 , options ), true );
565+ subpartition .add (barrierBuffer );
566+ // Priority notification should fire immediately despite isBlocked=true
567+ assertThat (availablityListener .getNumPriorityEvents ()).isOne ();
568+
569+ assertNextEvent (
570+ readView ,
571+ barrierBuffer .getWrittenBytes (),
572+ CheckpointBarrier .class ,
573+ false ,
574+ 0 ,
575+ false ,
576+ true );
577+ assertNoNextBuffer (readView );
578+ }
579+
580+ @ TestTemplate
581+ void testDataStillBlockedAfterPriorityEventBypasses () throws Exception {
582+ final RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter ();
583+ subpartition .setChannelStateWriter (channelStateWriter );
584+
585+ // Block the subpartition
586+ blockSubpartitionByCheckpoint (1 );
587+ subpartition .add (createFilledFinishedBufferConsumer (BUFFER_SIZE ));
588+ assertNoNextBuffer (readView );
589+
590+ // Add priority event while blocked — should notify and be pollable
591+ CheckpointOptions options =
592+ CheckpointOptions .unaligned (
593+ CheckpointType .CHECKPOINT ,
594+ new CheckpointStorageLocationReference (new byte [] {0 , 1 , 2 }));
595+ channelStateWriter .start (0 , options );
596+ BufferConsumer barrierBuffer =
597+ EventSerializer .toBufferConsumer (new CheckpointBarrier (0 , 0 , options ), true );
598+ subpartition .add (barrierBuffer );
599+ assertThat (availablityListener .getNumPriorityEvents ()).isOne ();
600+
601+ // Recycle inflight buffer copies held by channel state writer
602+ final List <Buffer > inflight =
603+ channelStateWriter .getAddedOutput ().get (subpartition .getSubpartitionInfo ());
604+ assertThat (inflight ).hasSize (1 );
605+ inflight .forEach (Buffer ::recycleBuffer );
606+
607+ assertNextEvent (
608+ readView ,
609+ barrierBuffer .getWrittenBytes (),
610+ CheckpointBarrier .class ,
611+ false ,
612+ 0 ,
613+ false ,
614+ true );
615+ assertNoNextBuffer (readView );
616+
617+ // After resumeConsumption, data becomes available
618+ readView .resumeConsumption ();
619+ assertNextBuffer (readView , BUFFER_SIZE , false , 0 , false , true );
620+ }
621+
548622 // ------------------------------------------------------------------------
549623
550624 private void blockSubpartitionByCheckpoint (int numNotifications )
0 commit comments