Skip to content

[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276

Open
tvaron3 wants to merge 6 commits into
Azure:mainfrom
tvaron3:tvaron3/spark-allow-empty-pages
Open

[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276
tvaron3 wants to merge 6 commits into
Azure:mainfrom
tvaron3:tvaron3/spark-allow-empty-pages

Conversation

@tvaron3
Copy link
Copy Markdown
Member

@tvaron3 tvaron3 commented May 26, 2026

Summary

Spark partition tasks reading change feed (or executing cross-partition queries) against a sparse workload could hit OperationCancelledException ("End-to-end timeout hit when trying to retrieve the next page") at the connector's 65-second per-operation end-to-end timeout.

Root cause: with the default emptyPagesAllowed=false, ParallelDocumentQueryExecutionContext and ChangeFeedFetcher swallow empty / 304 pages internally — a single producer-side nextPage() call can keep draining many sub-feedRanges before emitting one non-empty page. For sparse cross-partition workloads the cumulative wall time blows the per-operation timeout.

Fix

  1. Spark ItemsPartitionReader (query path) calls setAllowEmptyPages(true) on CosmosQueryRequestOptions so the SDK's existing emptyPagesAllowed plumbing applies — each physical page surfaces independently and is bound by the per-page timeout window rather than the per-operation one.

  2. New internal-only emptyPagesAllowed flag on CosmosChangeFeedRequestOptionsImpl (default false — behavior unchanged for all other callers). Plumbed through Paginator.getChangeFeedQueryResultAsObservable into ChangeFeedFetcher.nextPageInternal. When the flag is true, both 304/noChanges branches return Mono.just(r) so empty pages bubble up to the iterator. Exposed via new package-private bridge accessor CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages (deliberately no public setter — see in-code comment).

  3. ChangeFeedFetcher.isFullyDrained correctness fix. The original cleanup (consult only continuation.isDone()) regressed every non-Spark caller because FeedRangeCompositeContinuationImpl.isDone() returns compositeContinuationTokens.size() == 0, which is permanently false for incremental change feed (moveToNextToken() rotates the deque, never shrinks it). The noChanges → true short-circuit is now gated on !emptyPagesAllowed, preserving the original termination signal for every default-mode caller while still letting Spark surface empty pages.

  4. Defense-in-depth NO_RETRY arm. With emptyPagesAllowed=true and a streaming change feed (no endLSN), if handleChangeFeedNotModified returns NO_RETRY on a noChanges page (single-partition case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl), the SDK's built-in termination signal would otherwise be silently dropped. nextPageInternal now explicitly calls disableShouldFetchMore() when the SDK signals NO_RETRY on a noChanges page so the defense-in-depth termination guarantee survives for the new flag too. The noChanges(r) && emptyPagesAllowed guard is load-bearing because handleChangeFeedNotModified returns NO_RETRY for every data page too — without the guard, every data page would silently truncate iteration.

  5. Spark ChangeFeedPartitionReader opts into the new flag via the bridge accessor.

  6. CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptions also propagates emptyPagesAllowed when the paged-flux pull mechanism supplies a continuation token (the freshly-built impl would otherwise silently lose the flag — added a comment flagging the broader drift hazard in that copy list).

  7. DRY: extracted the two near-identical surface or swallow via repeatWhenEmpty blocks into a surfaceOrSwallowNoChangesPage(r) helper.

Behavior matrix

Before After
Spark query, sparse data Single nextPage() blocks > 65 s, throws OCE Each empty page surfaces; per-page timeout applies
Spark change feed, sparse data Same OCE risk Same fix applies
All other SDK callers unchanged (emptyPagesAllowed defaults to false) unchanged

…meout

Spark partition tasks reading change feed (or executing cross-partition
queries) against a sparse workload could hit OperationCancelledException
("End-to-end timeout hit when trying to retrieve the next page") at the
connector's 65-second per-operation end-to-end timeout. Root cause: with
the default emptyPagesAllowed=false, ParallelDocumentQueryExecutionContext
and ChangeFeedFetcher swallow empty / 304 pages internally — a single
producer-side nextPage() call can keep draining many sub-feedRanges before
emitting one non-empty page. For sparse workloads the cumulative time blows
the per-operation timeout.

Fix:

  * Spark ItemsPartitionReader (query path) calls
    setAllowEmptyPages(true) on the CosmosQueryRequestOptions so the SDK's
    existing emptyPagesAllowed plumbing applies.

  * New internal-only emptyPagesAllowed flag on
    CosmosChangeFeedRequestOptionsImpl (default false; behavior unchanged
    for all other callers) plumbed through Paginator.
    getChangeFeedQueryResultAsObservable into ChangeFeedFetcher.
    nextPageInternal. When the flag is true, both 304 branches return
    Mono.just(r) so empty pages bubble up to the iterator. Surfaced via
    new package-private bridge accessor
    CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages.

  * ChangeFeedFetcher.isFullyDrained no longer short-circuits to true on
    noChanges responses (it now consults only continuation.isDone()),
    which removes the load-bearing reEnableShouldFetchMoreForRetry()
    pattern that was previously needed to undo a base-class decision.

  * Spark ChangeFeedPartitionReader opts into the new flag via the bridge
    accessor.

  * CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptions now also
    propagates emptyPagesAllowed when the paged-flux pull mechanism
    supplies a continuation token (the freshly-built impl would otherwise
    silently lose the flag — comment added flagging the broader drift
    hazard).

Tests:

  * New ChangeFeedFetcherEmptyPagesTest (5 unit tests): exercises the
    isFullyDrained behavior change and asserts that nextPageInternal
    surfaces noChanges responses individually when the flag is true and
    swallows them via repeatWhenEmpty when the flag is false.

  * New CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest (3 unit
    tests): locks in the flag propagation through
    withCosmosPagedFluxOptions.

  * Extended TransientIOErrorsRetryingIteratorSpec with a regression test
    that drains hundreds of leading empty pages followed by data without
    hitting the end-to-end timeout.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings May 26, 2026 23:13
@tvaron3 tvaron3 requested review from a team and kirankumarkolli as code owners May 26, 2026 23:13
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Cosmos DB Java SDK and Cosmos Spark connector to optionally surface empty (304 / noChanges) change feed pages to callers, preventing Spark partition tasks from exceeding the connector’s 65s per-operation end-to-end timeout when draining sparse, cross-partition workloads.

Changes:

  • Adds an internal emptyPagesAllowed flag for change feed requests and plumbs it through ChangeFeedQueryImplPaginatorChangeFeedFetcher, so empty pages can be emitted instead of swallowed.
  • Updates Cosmos Spark readers (query + change feed) to opt into “allow empty pages” so each physical page is bounded by the per-page timeout window.
  • Adds new unit/regression tests covering the new behavior and the paged-flux continuation rebuild path; updates changelogs accordingly.

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java Preserves emptyPagesAllowed across paged-flux continuation rebuild; exposes internal accessor bridge methods.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java Adds emptyPagesAllowed parameter and forwards it into ChangeFeedFetcher.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java Implements “surface empty pages” behavior and simplifies isFullyDrained to rely on continuation completion.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java Extends change feed request options accessor interface with allow-empty-pages getters/setters.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java Adds internal emptyPagesAllowed state and cloning support.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java Passes allow-empty-pages option into the paginator pipeline.
sdk/cosmos/azure-cosmos/CHANGELOG.md Documents the fix and clarifies Spark connector behavior.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java Adds regression coverage for surfacing vs swallowing empty pages and new isFullyDrained semantics.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java Ensures allow-empty-pages survives withCosmosPagedFluxOptions continuation rebuild path.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala Enables “allow empty pages” for Spark query reads.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala Enables “allow empty pages” for Spark change feed reads.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala Adds Spark regression test for long empty-page runs without timing out.
sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md Notes the Spark connector fix for sparse workloads.
sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md Notes the Spark connector fix for sparse workloads.
sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md Notes the Spark connector fix for sparse workloads.
sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md Notes the Spark connector fix for sparse workloads.

tvaron3 and others added 4 commits May 26, 2026 16:25
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…wed=false

The previous cleanup of ChangeFeedFetcher.isFullyDrained removed the
noChanges short-circuit unconditionally, on the rationale that consulting
continuation.isDone() was simpler. That regressed every non-Spark caller of
the change-feed API:

FeedRangeCompositeContinuationImpl.isDone() returns
compositeContinuationTokens.size() == 0, but moveToNextToken() rotates the
deque via poll() + add() and never shrinks it. So isDone() is permanently
false for normal incremental change-feed iteration. For the default
emptyPagesAllowed=false path:

  1. The 304 arrives.
  2. updateState calls isFullyDrained -> false (because isDone() is false).
  3. nextPageInternal's else-branch sees handleChangeFeedNotModified return
     NO_RETRY (single-partition case, multi-partition cycle-complete, or
     the >4*(size+1) consecutive-304 defense) and falls through to
     Mono.just(r).
  4. Paginator's generate-loop checks shouldFetchMore() -> true and calls
     nextPage() again -> infinite poll loop.

Customer-visible impact would be: any consumer that drains
queryChangeFeed(...).byPage() to completion (e.g.
.toIterable().iterator(), .collectList(), .blockLast()) hangs forever once
the change feed catches up.

flag is true (Spark path), surface every noChanges to the caller and let
the consumer decide when to stop iterating. When the flag is false (every
other caller, including the SDK's public queryChangeFeed API), preserve
the original termination signal.

Also addressed reviewer feedback:
  * Drop unused org.mockito.Mockito import (would break checkstyle's
    UnusedImports rule).
  * Replace reflective field assignment in stubRequest() with direct field
    writes on the public fields RxDocumentServiceRequest.requestContext
    and .faultInjectionRequestContext. Mockito only intercepts method
    calls; field writes on a mock work directly.
  * Add a regression test
    isFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue
    that locks in the termination signal for the default path.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Defense-in-depth (F1): when emptyPagesAllowed=true the streaming change-
  feed path takes branch 2 of nextPageInternal. If
  handleChangeFeedNotModified returns NO_RETRY on a noChanges page (single-
  partition case, multi-partition full-cycle complete, or the >4*(size+1)
  consecutive-304 defense in FeedRangeCompositeContinuationImpl), the SDK's
  built-in termination signal was being silently dropped because
  isFullyDrained() consults only continuation.isDone() in that mode (which
  is permanently false for incremental change feed). Now we explicitly call
  disableShouldFetchMore() to preserve the defense-in-depth termination
  guarantee even for emptyPagesAllowed=true callers.

* DRY (F4): extracted the two near-identical 'surface or swallow via
  repeatWhenEmpty' blocks in nextPageInternal into a private
  surfaceOrSwallowNoChangesPage(r) helper. The branches now read as a
  one-line intent instead of seven near-identical lines.

* Comment density (F9): tightened the long isFullyDrained comment to a
  2-line tl;dr followed by the detailed rationale.

* Test (F2): new nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_
  terminatesIteration locks in the defense-in-depth fix - asserts that
  after a terminal NO_RETRY noChanges page, shouldFetchMore() flips to
  false so Paginator stops calling nextPage().

* Test (F3): added Mockito.verify(continuation, never()).isDone() to the
  pass-2 regression test so a future refactor that accidentally drops the
  noChanges short-circuit and falls through to the (permanently-false)
  continuation.isDone() check fails loudly instead of silently hanging.

Test results: ChangeFeedFetcherEmptyPagesTest 7/7, FetcherTest 5/5,
CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3,
TransientIOErrorsRetryingIteratorSpec 7/7 - all green.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Test (F1): assert callIndex==4 after NO_RETRY termination in the pass-3
  defense-in-depth test so a future regression that terminates iteration
  but still over-fetches is caught.

* Test (F2): new nextPage_emptyPagesAllowedTrueWithDataPages_doesNotTerminate
  pins the production contract that the noChanges(r) guard on the
  disableShouldFetchMore() arm is load-bearing. In production,
  FeedRangeCompositeContinuationImpl.handleChangeFeedNotModified returns
  NO_RETRY for EVERY non-noChanges response (the early branch resets state
  and falls through). Without the noChanges(r) guard, every data page
  would silently truncate iteration after the first emission.

* Comment (F5): added an inline rationale next to the noChanges(r) guard
  explaining why it must remain - prevents a future engineer from
  'simplifying' away the guard without realizing the production
  truncation hazard.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@tvaron3
Copy link
Copy Markdown
Member Author

tvaron3 commented May 27, 2026

/azp run java - cosmos - tests

@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines successfully started running 1 pipeline(s).

* Drop the new CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages
  wrapper methods. Callers (ChangeFeedQueryImpl, Spark ChangeFeedPartitionReader,
  the propagation unit test) now use the already-exposed
  accessor.getImpl(options).{is,set}EmptyPagesAllowed() instead, keeping the
  bridge accessor interface at its pre-PR shape.

* Move the azure-cosmos CHANGELOG entry from 'Bugs Fixed' to 'Other Changes'
  and reword: this PR adds an internal-only field on
  CosmosChangeFeedRequestOptionsImpl that pure SDK consumers cannot reach
  without going through getImpl(). The customer-facing fix lives in the Spark
  connector CHANGELOGs (which keep their 'Bugs Fixed' entries).

Test results: ChangeFeedFetcherEmptyPagesTest 8/8, FetcherTest 5/5,
CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3,
TransientIOErrorsRetryingIteratorSpec 7/7 - all green.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@xinlian12
Copy link
Copy Markdown
Member

@sdkReviewAgent

Copy link
Copy Markdown
Member

@kushagraThapar kushagraThapar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated multi-reviewer code review — Tier 3 (Deep)

Reviewers dispatched in parallel: PR Deep Reviewer (correctness) + Engineering Quality Reviewer (code quality / testing / perf / agentic) + cosmos-pr-reviewer (Cosmos design alignment) + Cross SDK Reviewer (peers: dotnet, python).
Risk score 8 (public_api +4, query+retry capped +6, source/test-ratio −2). Design-doc clone synced fresh against tvaron3/cosmosdb-design-docs@main. Existing inline comments from copilot-pull-request-reviewer were de-duped — all 3 were already addressed by the author in commit 08f690b.


Overall verdict

Approve with suggestions. Correctness story is solid — PR Deep Reviewer independently verified every claim in the PR body against the sandbox (10 claims, all ✅). Cross SDK Reviewer confirmed neither .NET nor Python has an equivalent flag (because neither ships CosmosEndToEndOperationLatencyPolicyConfig per-operation timeout) — Java is deliberately and correctly divergent. cosmos-pr-reviewer found no domain-correctness regressions, only design-doc gaps.

Findings cluster into four addressable themes: (1) stale PR description [major], (2) broader drift hazard in continuation-rebuild copy list [major], (3) test-timeout discipline to match .NET parity [major], (4) various minor API hygiene / observability / javadoc gaps.


Blockers

(none)


Major findings

M1. PR description references removed bridge accessor methods — stale and misleading

Files: PR body (sections 2 & 5)
Severity: major | Category: code_quality

PR body claims:

"Exposed via new package-private bridge accessor CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages"

But commit 34d3da4fa51 ("Shrink bridge accessor surface") removed those wrapper methods. Spark now uses accessor.getImpl(options).setEmptyPagesAllowed(true) — the existing getImpl() escape hatch.

Verified:

$ grep -n "setEmptyPagesAllowed\|isEmptyPagesAllowed\|setAllowEmptyPages" \
    sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
300:   void setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
302:   boolean getAllowEmptyPages(CosmosQueryRequestOptions options);
# only the pre-existing QUERY-side accessor; no change-feed-side wrapper

Why it matters: Future archaeologists triaging regressions will grep for accessor methods that don't exist.

Suggested fix: Update PR description items 2 & 5 to:

"Exposed via the existing accessor.getImpl(options).setEmptyPagesAllowed(...) escape hatch — no new bridge accessor wrappers are added (see commit 34d3da4fa51 for the rationale on keeping the accessor surface small for internal-only flags)."


M2. Drift hazard in withCosmosPagedFluxOptions extends well beyond emptyPagesAllowed

File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java:444-458
Severity: major | Category: design_alignment (with code-quality angle)

The PR adds a great drift-hazard comment ("this hand-maintained copy list is a known drift hazard") and propagates one field (emptyPagesAllowed) — but the same code path silently drops ~13 other fields when a user supplies a continuation token via byPage(savedContinuation):

endLSN, customSerializer, keywordIdentifiers, excludeRegions, readConsistencyStrategy, quotaInfoEnabled, customOptions, operationContextAndListenerTuple, thresholds, properties, isSplitHandlingDisabled, completeAfterAllCurrentChangesRetrieved, partitionKeyDefinition, collectionRid

Two of these are real Spark-adjacent footguns:

  • endLSN — silently lost ⇒ a bounded change feed becomes unbounded after byPage(continuation) resume
  • customSerializer — silently lost ⇒ custom type deserialization breaks across the continuation boundary

Scope caveat (this is 🟡 not 🔴): This is pre-existing; the PR's comment is genuinely useful and the author explicitly scoped broader cleanup as out-of-scope.

Suggested fix (pick one):

  • Minimum: Open a tracking GitHub issue and reference it from the new in-code comment; list the 13 affected fields.
  • Better (incremental win in this PR): Add endLSN and customSerializer to the copy list, plus two parallel tests in CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest mirroring the existing emptyPagesAllowed_isPropagated test.
  • Best (follow-up PR): Invert the default — use the copy constructor new CosmosChangeFeedRequestOptionsImpl(this.actualRequestOptions) and surgically override only fields encoded in the continuation token. Changes the default from "drop unless listed" to "preserve unless overridden by the continuation".
  • Also: consider a reflection-based test that iterates declared fields on the impl and asserts each survives the round-trip.

M3. Test-timeout discipline missing on regression-guard tests — .NET parity gap

File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Severity: major | Category: parity (test-rigor)

The new nextPage_emptyPagesAllowedTrue_* tests use StepVerifier.verifyComplete() with no explicit timeout. .NET's equivalent regression-guard test pins this with a wall-clock timeout so a future refactor reintroducing unbounded repeatWhenEmpty drain fails fast and loudly:

Verified:

$ grep -n "Timeout" Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs
173: [Timeout(5000)]   ... public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges(int partitions)

$ grep -n "timeOut\|Duration.of" sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
# zero hits

Why it matters: Without a timeout, a regression would hang until TestNG's global timeout (often unset on unit suites) and look like CI flake rather than a loud failure. The PR explicitly motivates these tests as defense-in-depth against the very class of regression a wall-clock guard would catch.

Suggested fix: Add timeOut = 10_000 to the @Test annotation on the four nextPage_* tests, e.g.:

@Test(groups = { "unit" }, timeOut = 10_000)

OR replace .verifyComplete() with .expectComplete().verify(Duration.ofSeconds(5)). The isFullyDrained_* tests don't need this (synchronous pure method).

This is the single biggest test-rigor gap vs peer SDKs that Cross SDK Reviewer identified.


Minor findings

m1. Missing javadoc on new isEmptyPagesAllowed() / setEmptyPagesAllowed()

File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:190-197
Category: code_quality

Sibling CosmosQueryRequestOptionsImpl.setEmptyPagesAllowed (line 312-320) documents "Defaults to false" and the param contract; the new methods have none. Three consumer sites (ChangeFeedFetcher plumbing, withCosmosPagedFluxOptions copy, Spark ChangeFeedPartitionReader) currently rely on tribal knowledge.

Suggested fix: 1-3 line javadoc: "When true, ChangeFeedFetcher surfaces 304/noChanges pages to the caller instead of swallowing them via repeatWhenEmpty; defaults to false. Caller iterators must handle empty FeedResponse pages."


m2. Bridge-accessor asymmetry: change-feed side reaches through getImpl() while query side has a wrapper

Files: ImplementationBridgeHelpers.java:300-302 + ImplementationBridgeHelpers.java:425-447 + ChangeFeedPartitionReader.scala:220-222 + ChangeFeedQueryImpl.java:124
Severity: minor | Category: code_quality

TL;DR: The query-side accessor exposes a first-class wrapper for this exact flag; the change-feed accessor doesn't, so the same logical opt-in is reached via two different code shapes. Grep-discoverability and refactor-fragility cost, not a correctness bug.

The two sides, side-by-side

Query side (pre-existing, untouched by this PR):

// ImplementationBridgeHelpers.java:300-302  — CosmosQueryRequestOptionsAccessor interface
void    setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
boolean getAllowEmptyPages(CosmosQueryRequestOptions options);
// ItemsPartitionReader.scala:54  — Spark call site, one-liner via accessor
.setAllowEmptyPages(queryOptions, true)
// ParallelDocumentQueryExecutionContext.java:319  — internal consumer uses the same wrapper
&& !qryOptAccessor().getAllowEmptyPages(this.cosmosQueryRequestOptions)

Change-feed side (added in this PR):

// ImplementationBridgeHelpers.java:425-447  — CosmosChangeFeedRequestOptionsAccessor
// (no setAllowEmptyPages / getAllowEmptyPages method on the interface)
// ChangeFeedPartitionReader.scala:220-222  — Spark call site reaches through getImpl()
.getImpl(options)
.setEmptyPagesAllowed(true)
// ChangeFeedQueryImpl.java:124  — internal consumer also reaches through getImpl()
changeFeedOptionsAccessor().getImpl(this.options).isEmptyPagesAllowed(),

Why it's worth flagging

  1. Grep-discoverability is asymmetric. A future engineer running grep -rn 'setAllowEmptyPages\|getAllowEmptyPages' to answer "where do we opt callers into per-page empty-page surfacing?" gets only the query-side hits — Spark's change-feed call site and ChangeFeedQueryImpl are invisible at the symbol level. The cross-feature semantic equivalence disappears.
  2. Refactor blast radius is asymmetric. Rename setEmptyPagesAllowed → something else later: query side breaks at the wrapper (single update point); change-feed side breaks at every getImpl().setEmptyPagesAllowed(...) call site (Spark + internal consumer).
  3. The accessor interface is the friend-API contract. Once an option type has wrapper-style accessors for some flags and getImpl()-style reachthrough for others, the "what's the right way to access X from a friend module" question stops having a clean answer for that type.
  4. Author has a defensible rationale. Commit 34d3da4fa51 ("Shrink bridge accessor surface") intentionally removed earlier wrapper additions to keep the accessor surface small. Real value worth honoring — but the cost is asymmetry vs. the sibling type that already exposes the same-named wrapper.

Suggested fix (recommend Option A; both are small)

  • Option A (preferred) — mirror the query side. In ImplementationBridgeHelpers.java lines 425-447 add:

    void    setAllowEmptyPages(CosmosChangeFeedRequestOptions options, boolean emptyPagesAllowed);
    boolean getAllowEmptyPages(CosmosChangeFeedRequestOptions options);

    Wire them in CosmosChangeFeedRequestOptions.java (parallel to lines 769-778 of the query options file), and update ChangeFeedPartitionReader.scala:222 and ChangeFeedQueryImpl.java:124 to use the wrapper. Single grep-discoverable symbol across both option types.

  • Option B — document the deliberate divergence. Keep the current shape but add a 2-line comment at the Spark call site explaining why this goes through getImpl() rather than getting its own wrapper, so future engineers don't "fix" the asymmetry by adding the wrapper anyway, and so they understand the surface-shrinking intent.


m3. PR body claims "see in-code comment" — no such comment exists

File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: code_quality

PR body: "deliberately no public setter — see in-code comment". Grep finds nothing.

Verified:

$ grep -rn 'deliberately no public\|public setter' \
   sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java \
   sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java \
   sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
# (no output)

The rationale for keeping the flag internal is the single most important note for future maintainers; should live in code, not the PR description (which decays).

Suggested fix: Add a 2-line comment on setEmptyPagesAllowed:

/* Intentionally not surfaced on the public CosmosChangeFeedRequestOptions API.
 * Caller must verify the iterator handles empty FeedResponse pages without
 * retry loops before opting in. */

m4. Test gap: surfaceOrSwallowNoChangesPage is invoked in BOTH branches, only streaming branch tested

File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Category: testing

The new helper is called from both branches of nextPageInternal:

  • completeAfterAllCurrentChangesRetrieved || endLSN != null branch (line 140)
  • streaming branch (lines 152, 165)

All 8 tests configure endLSN=null. Spark's ChangeFeedPartitionReader.scala:212-215 DOES set endLsn for bounded partitions, then unconditionally sets setEmptyPagesAllowed(true) — so the (endLSN != null, emptyPagesAllowed=true) combination is a real production scenario with zero unit coverage.

Verified:

$ grep -n "endLSN\|completeAfterAllCurrentChangesRetrieved" sdk/cosmos/azure-cosmos-tests/.../ChangeFeedFetcherEmptyPagesTest.java
339: /* completeAfterAllCurrentChangesRetrieved */ false,
341: /* endLSN */ null,

Suggested fix: Add 2 tests:

  • nextPage_endLsnSet_emptyPagesAllowedTrue_surfacesNoChangesUntilHasFetchedAllChanges()
  • nextPage_endLsnSet_emptyPagesAllowedFalse_swallowsNoChangesViaRepeatWhenEmpty() (legacy-mode regression guard)

m5. >4*(size+1) consecutive-304 defense only mock-tested at the fetcher layer

File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java:165-211
Category: testing

Every nextPage_* test mocks handleChangeFeedNotModified directly. The PR's defense-in-depth argument rests on the real FeedRangeCompositeContinuationImpl returning NO_RETRY after the >4*(size+1) threshold. Mock-based contract tests verify "IF NO_RETRY THEN terminate" but not "the real impl DOES return NO_RETRY after threshold".

Verified:

$ grep -n "when(continuation.handleChangeFeedNotModified" .../ChangeFeedFetcherEmptyPagesTest.java
131, 171, 219, 256: (all mocked, no integration with real impl)

Suggested fix: One integration-style test (still unit group, no real Cosmos client): wire a real FeedRangeCompositeContinuationImpl with 2-3 sub-ranges, feed >4*(size+1) consecutive 304 responses, assert shouldFetchMore() flips to false. Catches contract drift between ChangeFeedFetcher and FeedRangeCompositeContinuationImpl.


m6. Spark CHANGELOGs don't disclose per-page-overhead trade-off

Files: sdk/cosmos/azure-cosmos-spark_3-*_2-1*/CHANGELOG.md
Category: perf / agentic disclosure

All 4 Spark CHANGELOG entries announce the fix but don't document the per-page trade-off: workloads with many empty pages will now produce one Spark iterator callback per surfaced page (vs. previously one callback could drain many). Observability dashboards will see "pages per task" metrics spike; LLM cost-attribution tools will misattribute.

Verified:

$ grep -i 'emptyPagesAllowed\|allow.empty.page' sdk/cosmos/azure-cosmos-spark_3-*_*/CHANGELOG.md
# all 4 entries describe the fix but not the surfacing trade-off

Suggested fix: Append to each Spark CHANGELOG entry:

"Note: workloads with many empty pages will now observe one Spark iterator callback per surfaced page where previously a single callback could drain many — this is the intended trade-off for per-page timeout enforcement."


m7. New flag not reachable from public API — forward-looking design statement missing

File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: agentic

Opt-in flag is impl-class-only; a future Flink connector, MCP server, or 3rd-party agent that wants the same per-page timeout enforcement must depend on com.azure.cosmos.implementation bridge accessors.

Suggested fix: Either (a) graduate to a public @Beta method with doc-comment warning callers must handle empty FeedResponse pages, OR (b) explicitly document the forward-looking decision ("permanently internal — Spark connector only") in a code comment so future Cosmos team and 3rd-party consumers know which way the wind blows. (Overlaps with m3 — both fixes can ship in one code comment.)


m8. New surfaceOrSwallow decision invisible to logs/diagnostics/OpenTelemetry

File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java:183-189
Category: agentic

The new surfaceOrSwallowNoChangesPage decision and the new NO_RETRY-on-noChanges branch are invisible to logs/diagnostics. An LLM diagnostician investigating "how many physical pages did this query drain" has no signal beyond the page count itself.

Suggested fix: Add structured trace:

if (logger.isTraceEnabled()) {
    logger.trace("change_feed.empty_page surfacing={} no_retry={} continuation={}",
                 this.emptyPagesAllowed,
                 retryResult == ShouldRetryResult.noRetry(),
                 getContinuationForLogging());
}

Structured key=value form per observability convention.


Nits

n1. nextPageInternal complexity at advisory boundary

File: ChangeFeedFetcher.java:121-176
Category: code_quality

56 lines, 4-deep nesting; the load-bearing noChanges(r) && this.emptyPagesAllowed guard is the 4th nested check. Dense comments mitigate but still right at the advisory boundary.

Suggested (optional): Extract the .flatMap((r) -> { ... }) body into a private applyNoChangesDecision(FeedResponse<T> r) method.


n2. Pre-existing no-op in CosmosEncryptionAsyncContainer:1158

Category: code_quality | Severity: nit (pre-existing, out-of-scope for this PR)

getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions); discards the return value (the method has no side effects on its receiver). Encryption customers don't get even the partial copy-list propagation. Out of scope — file a follow-up.


n3. Test uses reflection where direct package access would work

File: ChangeFeedFetcherEmptyPagesTest.java:639-646
Category: testing

Test class and Fetcher.isFullyDrained live in the same package (com.azure.cosmos.implementation.query); reflection isn't needed. Reflection-based tests don't fail at compile time on rename refactors.

Suggested fix: Replace reflection with a tiny subclass:

class TestableChangeFeedFetcher extends ChangeFeedFetcher {
    boolean callIsFullyDrained(FeedResponse<Document> r) { return isFullyDrained(true, r); }
}

n4. No emulator / E2E test reproduces the actual OperationCancelledException scenario

Category: testing | Tag: unverified-pattern-suggestion

Coverage is unit-mock only. Granular mock coverage is appropriate primary defense, but an emulator test would lock in the fix at the integration boundary.

Verified:

$ grep -rln 'OperationCancelled\|end-to-end timeout\|endToEndTimeout\|emptyPagesAllowed' sdk/cosmos/azure-cosmos-tests/src/test
# only the two new files added in this PR

Suggested follow-up (not for this PR): @Test(groups = { "emulator" }) that creates a sparse multi-partition container with aggressive end-to-end-latency policy and asserts byPage().iterator() change-feed read completes without OCE.


Design alignment (citations to design docs)

cosmos-pr-reviewer raised 9 design-alignment findings, almost all of which are propose-design-doc-extension items.

D1. disableShouldFetchMore() bypasses Fetcher.update() top/maxItemCount machinery

File: ChangeFeedFetcher.java:160-169 | Citation: 12-change-feed.md#key-classes

Direct disableShouldFetchMore() call sidesteps the documented Fetcher base-class state machine. Benign because top is always -1 for change feed, but the bypass deserves an inline comment acknowledging the safety argument.

D2. surfaceOrSwallowNoChangesPage helper conflates snapshot and streaming mode semantics

File: ChangeFeedFetcher.java:183-189 | Citation: 12-change-feed.md#key-classes

DRYing the two original call sites risks silently coupling their swallow-vs-surface semantics. Suggested fix: add a comment stating the helper applies to both modes intentionally.

D3-D9. Propose-design-doc-extension items (for the cosmosdb-design-docs repo, NOT this SDK PR)

ID Title Target chapter Proposed heading
D3 Incremental change-feed termination contract undocumented 12-change-feed.md ## Incremental Change Feed — Termination Contract
D4 >4*(size+1) consecutive-304 defense bail rationale lives only in code 12-change-feed.md append to ### Scenario 4: Breadth-First 304 Cycling
D5 emptyPagesAllowed SDK contract surface (swallow vs surface) undocumented 12-change-feed.md + cross-ref 07-query-engine.md ## Empty-Page Handling — Swallow vs Surface
D6 07-query-engine.md silent on SDK-side ParallelDocumentQueryExecutionContext 07-query-engine.md ## SDK-Side Query Execution Contexts
D7 withCosmosPagedFluxOptions continuation-rebuild field drift is a design rule 12-change-feed.md append ### Continuation-Token Round-Trip Field Drift
D8 17-status-codes-and-sdk-retries.md table missing change-feed 304 semantics cross-ref 17-status-codes-and-sdk-retries.md row 304 entry — add change-feed cross-ref
D9 Merge-amplification of empty pages under emptyPagesAllowed=true not in Scenario 1 12-change-feed.md#scenario-1-duplicate-processing-after-merge extend impact bullets

All 9 are non-blocking for this SDK PR. Treat as follow-up issues on the design-docs repo.


Cross-SDK informational divergences

These are not findings to act on, but the user (and any future cross-SDK consistency reviewer) should know:

Behavior Java (post-PR) .NET Python Note
Default 304 handling on cross-partition CF repeatWhenEmpty (unbounded — pre-existing) Aggregate per cycle, surface one NotModifiedPage while True bounded by should_retry_on_not_modified_response() Java still divergent in default mode — intentional back-compat preservation
Opt-in to surface per-page emptyPagesAllowed=true (internal-only) Not exposed (no equivalent flag) Not exposed (no equivalent kwarg) Verified by grep — zero hits in both peers
Per-operation end-to-end timeout config CosmosEndToEndOperationLatencyPolicyConfig Not present (only per-request AvailabilityStrategy) Not present This is why .NET/Python don't see the timeout bug
isFullyDrained-style termination signal Yes (Java-specific) No (uses cycle-bound IsNextRangeEqualToOriginal) No (uses _initial_no_result_range cycle tracking) The Java regression class has no peer analog
Empty cycle UX Continues polling (Java) Surfaces NotModifiedPage per cycle Terminates iterator entirely (raise StopIteration) Materially different — multi-SDK customers will observe different paging shapes

Cross SDK Reviewer's verdict: Approve. There is no peer SDK to copy the design from; Java's choices are internally consistent with its own pre-existing query-side setAllowEmptyPages and deliberately divergent from peers in a way peers cannot easily match without first shipping a per-operation timeout config.


Verified-negative findings (rigor receipts)

Preserved verbatim — these prove what was checked and explicitly ruled out:

  • "Naming inconsistency emptyPagesAllowed vs allowEmptyPages at field/impl layer" — DROPPED. Field name + impl methods exactly match sibling CosmosQueryRequestOptionsImpl (verified by grep on line 25, 308, 317).
  • "TransientIOErrorsRetryingIteratorSpec.scala only runs against azure-cosmos-spark_3 and not the 4 publishable variants" — DROPPED. All 4 variant pom.xmls reference <source>${basedir}/../azure-cosmos-spark_3/src/test/scala</source> (shared test source dir; auto-runs against all variants).
  • "Coverage gap for >4*(size+1) defense in FeedRangeCompositeContinuationImpl" — Partially dropped. The NO_RETRY contract IS mock-tested at the fetcher layer (nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_terminatesIteration, lines 442-489). Finding m5 above re-raises the integration-against-real-impl gap, which is a complementary concern.
  • "isFullyDrained default-mode regression coverage missing" — DROPPED. Test isFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue (line 379) exists and explicitly verifies short-circuit fires without consulting the continuation (Mockito.verify(continuation, Mockito.never()).isDone() — load-bearing pin).
  • "Behavior change in (completeAfterAllCurrentChangesRetrieved || endLSN != null) branch for emptyPagesAllowed=false callers" — DROPPED. Compared git show HEAD~6:.../ChangeFeedFetcher.java to current: bytecode-equivalent for the legacy path (reEnableShouldFetchMoreForRetry(); return Mono.empty();). Behavior preservation verified.

What we explicitly did NOT flag

  • PR Deep Reviewer confirmed (via 10/10 ✅ table) every claim in the PR body about: isDone() permanently false, handleChangeFeedNotModified NO_RETRY-for-data, only-one-caller verification on Paginator.getChangeFeedQueryResultAsObservable and new ChangeFeedFetcher(...), copy constructor includes new field, disableShouldFetchMore correctly stops Paginator, refactor behavior-preserving for all 4 (branch × flag) combinations.
  • Existing copilot-pull-request-reviewer inline comments (3) — all addressed by author in commit 08f690b, dropped to avoid duplication.
  • Multi-Spark-variant test coverage — verified shared source dir; no per-variant duplication needed.
  • Bridge accessor pattern as language-idiomatic — verified parity with .NET InternalsVisibleTo (lists 20+ friend assemblies in AssemblyInfo.cs:7-44) and Python _module convention. Java's pattern is parity-aligned in spirit; not over-engineering.
  • isFullyDrained regression as a Java-specific architecture issue — verified neither .NET nor Python has an analogous IsFullyDrained/IsDone() termination signal; the bug class has no peer analog.

Generated by the PR Review Router (Tier 3 — Deep) — orchestrating PR Deep Reviewer + Engineering Quality Reviewer + cosmos-pr-reviewer + Cross SDK Reviewer in parallel. Posted as a single review comment with explicit user approval after a human review pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants