Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private Mono<FeedResponse<T>> applyNoChangesDecision(FeedResponse<T> r) {

if (ModelBridgeInternal.<T>noChanges(r)) {
// 304 for the current sub-feedRange; need to drain the next one.
return surfaceOrSwallowNoChangesPage(r);
return surfaceOrRetryNoChangesPage(r);
}
}
} else {
Expand All @@ -158,23 +158,7 @@ private Mono<FeedResponse<T>> applyNoChangesDecision(FeedResponse<T> r) {
ShouldRetryResult retryResult = continuationSnapshot.handleChangeFeedNotModified(r);
if (retryResult == ShouldRetryResult.RETRY_NOW) {
// not all continuations have been drained yet; repeat with the next continuation
return surfaceOrSwallowNoChangesPage(r);
}
// The noChanges(r) guard is LOAD-BEARING: in production,
// FeedRangeCompositeContinuationImpl.handleChangeFeedNotModified
// returns NO_RETRY for EVERY non-noChanges (i.e. data) response too
// (the early `if (!noChanges(r))` clause resets state and falls
// through to the final `return NO_RETRY`). Without this guard, every
// data page would silently truncate iteration after the first emission.
if (ModelBridgeInternal.<T>noChanges(r) && this.emptyPagesAllowed) {
// NO_RETRY on a noChanges page: the SDK's termination signal. Without
// emptyPagesAllowed=true, isFullyDrained() already flipped shouldFetchMore
// off. With emptyPagesAllowed=true, isFullyDrained() consults only
// continuation.isDone() (which is permanently false for incremental change
// feed), so we must explicitly disable further fetches here to preserve
// the defense-in-depth termination guarantee.
this.disableShouldFetchMore();
return Mono.just(r);
return surfaceOrRetryNoChangesPage(r);
}
}
}
Expand All @@ -187,11 +171,16 @@ private Mono<FeedResponse<T>> applyNoChangesDecision(FeedResponse<T> r) {
* Reactor's repeatWhenEmpty (the legacy behavior). When swallowing, shouldFetchMore must be
* re-enabled first because isFullyDrained() already flipped it off for the noChanges page.
*/
private Mono<FeedResponse<T>> surfaceOrSwallowNoChangesPage(FeedResponse<T> r) {
private Mono<FeedResponse<T>> surfaceOrRetryNoChangesPage(FeedResponse<T> r) {
this.reEnableShouldFetchMoreForRetry();

if (this.emptyPagesAllowed) {
// I think we will need to update the feedResponse here, because for empty pages we do not rotate tokens, so the worst case I can think of
// is when the feedRange spans multi-partitions, when customer using a continuationToken to resume the process, we will always only process the first child partition
// if we keeps getting 304s
ModelBridgeInternal.setFeedResponseContinuationToken(this.changeFeedState.toString(), r);
return Mono.just(r);
}
this.reEnableShouldFetchMoreForRetry();
return Mono.empty();
}

Expand All @@ -209,17 +198,7 @@ protected String applyServerResponseContinuation(

@Override
protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse<T> response) {
// Short-circuit when emptyPagesAllowed=false: noChanges -> fully drained.
// Required because FeedRangeCompositeContinuationImpl.isDone() never flips
// true for incremental change feed (the deque is rotated, never shrunk),
// so handleChangeFeedNotModified()=NO_RETRY would otherwise leave
// shouldFetchMore=true and Paginator would poll indefinitely.
//
// When emptyPagesAllowed=true we deliberately skip the short-circuit so
// each noChanges page surfaces to the caller (e.g., the Spark connector
// iterator); termination is handled either by the consumer or, on
// NO_RETRY, by an explicit disableShouldFetchMore() in nextPageInternal.
if (!this.emptyPagesAllowed && ModelBridgeInternal.noChanges(response)) {
if (ModelBridgeInternal.noChanges(response)) {
return true;
}

Expand Down