[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276tvaron3 wants to merge 6 commits into
Conversation
…meout
Spark partition tasks reading change feed (or executing cross-partition
queries) against a sparse workload could hit OperationCancelledException
("End-to-end timeout hit when trying to retrieve the next page") at the
connector's 65-second per-operation end-to-end timeout. Root cause: with
the default emptyPagesAllowed=false, ParallelDocumentQueryExecutionContext
and ChangeFeedFetcher swallow empty / 304 pages internally — a single
producer-side nextPage() call can keep draining many sub-feedRanges before
emitting one non-empty page. For sparse workloads the cumulative time blows
the per-operation timeout.
Fix:
* Spark ItemsPartitionReader (query path) calls
setAllowEmptyPages(true) on the CosmosQueryRequestOptions so the SDK's
existing emptyPagesAllowed plumbing applies.
* New internal-only emptyPagesAllowed flag on
CosmosChangeFeedRequestOptionsImpl (default false; behavior unchanged
for all other callers) plumbed through Paginator.
getChangeFeedQueryResultAsObservable into ChangeFeedFetcher.
nextPageInternal. When the flag is true, both 304 branches return
Mono.just(r) so empty pages bubble up to the iterator. Surfaced via
new package-private bridge accessor
CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages.
* ChangeFeedFetcher.isFullyDrained no longer short-circuits to true on
noChanges responses (it now consults only continuation.isDone()),
which removes the load-bearing reEnableShouldFetchMoreForRetry()
pattern that was previously needed to undo a base-class decision.
* Spark ChangeFeedPartitionReader opts into the new flag via the bridge
accessor.
* CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptions now also
propagates emptyPagesAllowed when the paged-flux pull mechanism
supplies a continuation token (the freshly-built impl would otherwise
silently lose the flag — comment added flagging the broader drift
hazard).
Tests:
* New ChangeFeedFetcherEmptyPagesTest (5 unit tests): exercises the
isFullyDrained behavior change and asserts that nextPageInternal
surfaces noChanges responses individually when the flag is true and
swallows them via repeatWhenEmpty when the flag is false.
* New CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest (3 unit
tests): locks in the flag propagation through
withCosmosPagedFluxOptions.
* Extended TransientIOErrorsRetryingIteratorSpec with a regression test
that drains hundreds of leading empty pages followed by data without
hitting the end-to-end timeout.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the Cosmos DB Java SDK and Cosmos Spark connector to optionally surface empty (304 / noChanges) change feed pages to callers, preventing Spark partition tasks from exceeding the connector’s 65s per-operation end-to-end timeout when draining sparse, cross-partition workloads.
Changes:
- Adds an internal
emptyPagesAllowedflag for change feed requests and plumbs it throughChangeFeedQueryImpl→Paginator→ChangeFeedFetcher, so empty pages can be emitted instead of swallowed. - Updates Cosmos Spark readers (query + change feed) to opt into “allow empty pages” so each physical page is bounded by the per-page timeout window.
- Adds new unit/regression tests covering the new behavior and the paged-flux continuation rebuild path; updates changelogs accordingly.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java | Preserves emptyPagesAllowed across paged-flux continuation rebuild; exposes internal accessor bridge methods. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java | Adds emptyPagesAllowed parameter and forwards it into ChangeFeedFetcher. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java | Implements “surface empty pages” behavior and simplifies isFullyDrained to rely on continuation completion. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java | Extends change feed request options accessor interface with allow-empty-pages getters/setters. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java | Adds internal emptyPagesAllowed state and cloning support. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java | Passes allow-empty-pages option into the paginator pipeline. |
| sdk/cosmos/azure-cosmos/CHANGELOG.md | Documents the fix and clarifies Spark connector behavior. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java | Adds regression coverage for surfacing vs swallowing empty pages and new isFullyDrained semantics. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java | Ensures allow-empty-pages survives withCosmosPagedFluxOptions continuation rebuild path. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala | Enables “allow empty pages” for Spark query reads. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala | Enables “allow empty pages” for Spark change feed reads. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala | Adds Spark regression test for long empty-page runs without timing out. |
| sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…wed=false
The previous cleanup of ChangeFeedFetcher.isFullyDrained removed the
noChanges short-circuit unconditionally, on the rationale that consulting
continuation.isDone() was simpler. That regressed every non-Spark caller of
the change-feed API:
FeedRangeCompositeContinuationImpl.isDone() returns
compositeContinuationTokens.size() == 0, but moveToNextToken() rotates the
deque via poll() + add() and never shrinks it. So isDone() is permanently
false for normal incremental change-feed iteration. For the default
emptyPagesAllowed=false path:
1. The 304 arrives.
2. updateState calls isFullyDrained -> false (because isDone() is false).
3. nextPageInternal's else-branch sees handleChangeFeedNotModified return
NO_RETRY (single-partition case, multi-partition cycle-complete, or
the >4*(size+1) consecutive-304 defense) and falls through to
Mono.just(r).
4. Paginator's generate-loop checks shouldFetchMore() -> true and calls
nextPage() again -> infinite poll loop.
Customer-visible impact would be: any consumer that drains
queryChangeFeed(...).byPage() to completion (e.g.
.toIterable().iterator(), .collectList(), .blockLast()) hangs forever once
the change feed catches up.
flag is true (Spark path), surface every noChanges to the caller and let
the consumer decide when to stop iterating. When the flag is false (every
other caller, including the SDK's public queryChangeFeed API), preserve
the original termination signal.
Also addressed reviewer feedback:
* Drop unused org.mockito.Mockito import (would break checkstyle's
UnusedImports rule).
* Replace reflective field assignment in stubRequest() with direct field
writes on the public fields RxDocumentServiceRequest.requestContext
and .faultInjectionRequestContext. Mockito only intercepts method
calls; field writes on a mock work directly.
* Add a regression test
isFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue
that locks in the termination signal for the default path.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Defense-in-depth (F1): when emptyPagesAllowed=true the streaming change- feed path takes branch 2 of nextPageInternal. If handleChangeFeedNotModified returns NO_RETRY on a noChanges page (single- partition case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl), the SDK's built-in termination signal was being silently dropped because isFullyDrained() consults only continuation.isDone() in that mode (which is permanently false for incremental change feed). Now we explicitly call disableShouldFetchMore() to preserve the defense-in-depth termination guarantee even for emptyPagesAllowed=true callers. * DRY (F4): extracted the two near-identical 'surface or swallow via repeatWhenEmpty' blocks in nextPageInternal into a private surfaceOrSwallowNoChangesPage(r) helper. The branches now read as a one-line intent instead of seven near-identical lines. * Comment density (F9): tightened the long isFullyDrained comment to a 2-line tl;dr followed by the detailed rationale. * Test (F2): new nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_ terminatesIteration locks in the defense-in-depth fix - asserts that after a terminal NO_RETRY noChanges page, shouldFetchMore() flips to false so Paginator stops calling nextPage(). * Test (F3): added Mockito.verify(continuation, never()).isDone() to the pass-2 regression test so a future refactor that accidentally drops the noChanges short-circuit and falls through to the (permanently-false) continuation.isDone() check fails loudly instead of silently hanging. Test results: ChangeFeedFetcherEmptyPagesTest 7/7, FetcherTest 5/5, CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3, TransientIOErrorsRetryingIteratorSpec 7/7 - all green. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Test (F1): assert callIndex==4 after NO_RETRY termination in the pass-3 defense-in-depth test so a future regression that terminates iteration but still over-fetches is caught. * Test (F2): new nextPage_emptyPagesAllowedTrueWithDataPages_doesNotTerminate pins the production contract that the noChanges(r) guard on the disableShouldFetchMore() arm is load-bearing. In production, FeedRangeCompositeContinuationImpl.handleChangeFeedNotModified returns NO_RETRY for EVERY non-noChanges response (the early branch resets state and falls through). Without the noChanges(r) guard, every data page would silently truncate iteration after the first emission. * Comment (F5): added an inline rationale next to the noChanges(r) guard explaining why it must remain - prevents a future engineer from 'simplifying' away the guard without realizing the production truncation hazard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
* Drop the new CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages
wrapper methods. Callers (ChangeFeedQueryImpl, Spark ChangeFeedPartitionReader,
the propagation unit test) now use the already-exposed
accessor.getImpl(options).{is,set}EmptyPagesAllowed() instead, keeping the
bridge accessor interface at its pre-PR shape.
* Move the azure-cosmos CHANGELOG entry from 'Bugs Fixed' to 'Other Changes'
and reword: this PR adds an internal-only field on
CosmosChangeFeedRequestOptionsImpl that pure SDK consumers cannot reach
without going through getImpl(). The customer-facing fix lives in the Spark
connector CHANGELOGs (which keep their 'Bugs Fixed' entries).
Test results: ChangeFeedFetcherEmptyPagesTest 8/8, FetcherTest 5/5,
CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3,
TransientIOErrorsRetryingIteratorSpec 7/7 - all green.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
@sdkReviewAgent |
kushagraThapar
left a comment
There was a problem hiding this comment.
Automated multi-reviewer code review — Tier 3 (Deep)
Reviewers dispatched in parallel: PR Deep Reviewer (correctness) + Engineering Quality Reviewer (code quality / testing / perf / agentic) + cosmos-pr-reviewer (Cosmos design alignment) + Cross SDK Reviewer (peers: dotnet, python).
Risk score 8 (public_api +4, query+retry capped +6, source/test-ratio −2). Design-doc clone synced fresh against tvaron3/cosmosdb-design-docs@main. Existing inline comments from copilot-pull-request-reviewer were de-duped — all 3 were already addressed by the author in commit 08f690b.
Overall verdict
Approve with suggestions. Correctness story is solid — PR Deep Reviewer independently verified every claim in the PR body against the sandbox (10 claims, all ✅). Cross SDK Reviewer confirmed neither .NET nor Python has an equivalent flag (because neither ships CosmosEndToEndOperationLatencyPolicyConfig per-operation timeout) — Java is deliberately and correctly divergent. cosmos-pr-reviewer found no domain-correctness regressions, only design-doc gaps.
Findings cluster into four addressable themes: (1) stale PR description [major], (2) broader drift hazard in continuation-rebuild copy list [major], (3) test-timeout discipline to match .NET parity [major], (4) various minor API hygiene / observability / javadoc gaps.
Blockers
(none)
Major findings
M1. PR description references removed bridge accessor methods — stale and misleading
Files: PR body (sections 2 & 5)
Severity: major | Category: code_quality
PR body claims:
"Exposed via new package-private bridge accessor
CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages"
But commit 34d3da4fa51 ("Shrink bridge accessor surface") removed those wrapper methods. Spark now uses accessor.getImpl(options).setEmptyPagesAllowed(true) — the existing getImpl() escape hatch.
Verified:
$ grep -n "setEmptyPagesAllowed\|isEmptyPagesAllowed\|setAllowEmptyPages" \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
300: void setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
302: boolean getAllowEmptyPages(CosmosQueryRequestOptions options);
# only the pre-existing QUERY-side accessor; no change-feed-side wrapper
Why it matters: Future archaeologists triaging regressions will grep for accessor methods that don't exist.
Suggested fix: Update PR description items 2 & 5 to:
"Exposed via the existing
accessor.getImpl(options).setEmptyPagesAllowed(...)escape hatch — no new bridge accessor wrappers are added (see commit34d3da4fa51for the rationale on keeping the accessor surface small for internal-only flags)."
M2. Drift hazard in withCosmosPagedFluxOptions extends well beyond emptyPagesAllowed
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java:444-458
Severity: major | Category: design_alignment (with code-quality angle)
The PR adds a great drift-hazard comment ("this hand-maintained copy list is a known drift hazard") and propagates one field (emptyPagesAllowed) — but the same code path silently drops ~13 other fields when a user supplies a continuation token via byPage(savedContinuation):
endLSN, customSerializer, keywordIdentifiers, excludeRegions, readConsistencyStrategy, quotaInfoEnabled, customOptions, operationContextAndListenerTuple, thresholds, properties, isSplitHandlingDisabled, completeAfterAllCurrentChangesRetrieved, partitionKeyDefinition, collectionRid
Two of these are real Spark-adjacent footguns:
endLSN— silently lost ⇒ a bounded change feed becomes unbounded afterbyPage(continuation)resumecustomSerializer— silently lost ⇒ custom type deserialization breaks across the continuation boundary
Scope caveat (this is 🟡 not 🔴): This is pre-existing; the PR's comment is genuinely useful and the author explicitly scoped broader cleanup as out-of-scope.
Suggested fix (pick one):
- Minimum: Open a tracking GitHub issue and reference it from the new in-code comment; list the 13 affected fields.
- Better (incremental win in this PR): Add
endLSNandcustomSerializerto the copy list, plus two parallel tests inCosmosChangeFeedRequestOptionsWithPagedFluxOptionsTestmirroring the existingemptyPagesAllowed_isPropagatedtest. - Best (follow-up PR): Invert the default — use the copy constructor
new CosmosChangeFeedRequestOptionsImpl(this.actualRequestOptions)and surgically override only fields encoded in the continuation token. Changes the default from "drop unless listed" to "preserve unless overridden by the continuation". - Also: consider a reflection-based test that iterates declared fields on the impl and asserts each survives the round-trip.
M3. Test-timeout discipline missing on regression-guard tests — .NET parity gap
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Severity: major | Category: parity (test-rigor)
The new nextPage_emptyPagesAllowedTrue_* tests use StepVerifier.verifyComplete() with no explicit timeout. .NET's equivalent regression-guard test pins this with a wall-clock timeout so a future refactor reintroducing unbounded repeatWhenEmpty drain fails fast and loudly:
Verified:
$ grep -n "Timeout" Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs
173: [Timeout(5000)] ... public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges(int partitions)
$ grep -n "timeOut\|Duration.of" sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
# zero hits
Why it matters: Without a timeout, a regression would hang until TestNG's global timeout (often unset on unit suites) and look like CI flake rather than a loud failure. The PR explicitly motivates these tests as defense-in-depth against the very class of regression a wall-clock guard would catch.
Suggested fix: Add timeOut = 10_000 to the @Test annotation on the four nextPage_* tests, e.g.:
@Test(groups = { "unit" }, timeOut = 10_000)OR replace .verifyComplete() with .expectComplete().verify(Duration.ofSeconds(5)). The isFullyDrained_* tests don't need this (synchronous pure method).
This is the single biggest test-rigor gap vs peer SDKs that Cross SDK Reviewer identified.
Minor findings
m1. Missing javadoc on new isEmptyPagesAllowed() / setEmptyPagesAllowed()
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:190-197
Category: code_quality
Sibling CosmosQueryRequestOptionsImpl.setEmptyPagesAllowed (line 312-320) documents "Defaults to false" and the param contract; the new methods have none. Three consumer sites (ChangeFeedFetcher plumbing, withCosmosPagedFluxOptions copy, Spark ChangeFeedPartitionReader) currently rely on tribal knowledge.
Suggested fix: 1-3 line javadoc: "When true, ChangeFeedFetcher surfaces 304/noChanges pages to the caller instead of swallowing them via repeatWhenEmpty; defaults to false. Caller iterators must handle empty FeedResponse pages."
m2. Bridge-accessor asymmetry: change-feed side reaches through getImpl() while query side has a wrapper
Files: ImplementationBridgeHelpers.java:300-302 + ImplementationBridgeHelpers.java:425-447 + ChangeFeedPartitionReader.scala:220-222 + ChangeFeedQueryImpl.java:124
Severity: minor | Category: code_quality
TL;DR: The query-side accessor exposes a first-class wrapper for this exact flag; the change-feed accessor doesn't, so the same logical opt-in is reached via two different code shapes. Grep-discoverability and refactor-fragility cost, not a correctness bug.
The two sides, side-by-side
Query side (pre-existing, untouched by this PR):
// ImplementationBridgeHelpers.java:300-302 — CosmosQueryRequestOptionsAccessor interface
void setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
boolean getAllowEmptyPages(CosmosQueryRequestOptions options);// ItemsPartitionReader.scala:54 — Spark call site, one-liner via accessor
.setAllowEmptyPages(queryOptions, true)// ParallelDocumentQueryExecutionContext.java:319 — internal consumer uses the same wrapper
&& !qryOptAccessor().getAllowEmptyPages(this.cosmosQueryRequestOptions)Change-feed side (added in this PR):
// ImplementationBridgeHelpers.java:425-447 — CosmosChangeFeedRequestOptionsAccessor
// (no setAllowEmptyPages / getAllowEmptyPages method on the interface)// ChangeFeedPartitionReader.scala:220-222 — Spark call site reaches through getImpl()
.getImpl(options)
.setEmptyPagesAllowed(true)// ChangeFeedQueryImpl.java:124 — internal consumer also reaches through getImpl()
changeFeedOptionsAccessor().getImpl(this.options).isEmptyPagesAllowed(),Why it's worth flagging
- Grep-discoverability is asymmetric. A future engineer running
grep -rn 'setAllowEmptyPages\|getAllowEmptyPages'to answer "where do we opt callers into per-page empty-page surfacing?" gets only the query-side hits — Spark's change-feed call site andChangeFeedQueryImplare invisible at the symbol level. The cross-feature semantic equivalence disappears. - Refactor blast radius is asymmetric. Rename
setEmptyPagesAllowed→ something else later: query side breaks at the wrapper (single update point); change-feed side breaks at everygetImpl().setEmptyPagesAllowed(...)call site (Spark + internal consumer). - The accessor interface is the friend-API contract. Once an option type has wrapper-style accessors for some flags and
getImpl()-style reachthrough for others, the "what's the right way to access X from a friend module" question stops having a clean answer for that type. - Author has a defensible rationale. Commit
34d3da4fa51("Shrink bridge accessor surface") intentionally removed earlier wrapper additions to keep the accessor surface small. Real value worth honoring — but the cost is asymmetry vs. the sibling type that already exposes the same-named wrapper.
Suggested fix (recommend Option A; both are small)
-
Option A (preferred) — mirror the query side. In
ImplementationBridgeHelpers.javalines 425-447 add:void setAllowEmptyPages(CosmosChangeFeedRequestOptions options, boolean emptyPagesAllowed); boolean getAllowEmptyPages(CosmosChangeFeedRequestOptions options);
Wire them in
CosmosChangeFeedRequestOptions.java(parallel to lines 769-778 of the query options file), and updateChangeFeedPartitionReader.scala:222andChangeFeedQueryImpl.java:124to use the wrapper. Single grep-discoverable symbol across both option types. -
Option B — document the deliberate divergence. Keep the current shape but add a 2-line comment at the Spark call site explaining why this goes through
getImpl()rather than getting its own wrapper, so future engineers don't "fix" the asymmetry by adding the wrapper anyway, and so they understand the surface-shrinking intent.
m3. PR body claims "see in-code comment" — no such comment exists
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: code_quality
PR body: "deliberately no public setter — see in-code comment". Grep finds nothing.
Verified:
$ grep -rn 'deliberately no public\|public setter' \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
# (no output)
The rationale for keeping the flag internal is the single most important note for future maintainers; should live in code, not the PR description (which decays).
Suggested fix: Add a 2-line comment on setEmptyPagesAllowed:
/* Intentionally not surfaced on the public CosmosChangeFeedRequestOptions API.
* Caller must verify the iterator handles empty FeedResponse pages without
* retry loops before opting in. */m4. Test gap: surfaceOrSwallowNoChangesPage is invoked in BOTH branches, only streaming branch tested
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Category: testing
The new helper is called from both branches of nextPageInternal:
completeAfterAllCurrentChangesRetrieved || endLSN != nullbranch (line 140)- streaming branch (lines 152, 165)
All 8 tests configure endLSN=null. Spark's ChangeFeedPartitionReader.scala:212-215 DOES set endLsn for bounded partitions, then unconditionally sets setEmptyPagesAllowed(true) — so the (endLSN != null, emptyPagesAllowed=true) combination is a real production scenario with zero unit coverage.
Verified:
$ grep -n "endLSN\|completeAfterAllCurrentChangesRetrieved" sdk/cosmos/azure-cosmos-tests/.../ChangeFeedFetcherEmptyPagesTest.java
339: /* completeAfterAllCurrentChangesRetrieved */ false,
341: /* endLSN */ null,
Suggested fix: Add 2 tests:
nextPage_endLsnSet_emptyPagesAllowedTrue_surfacesNoChangesUntilHasFetchedAllChanges()nextPage_endLsnSet_emptyPagesAllowedFalse_swallowsNoChangesViaRepeatWhenEmpty()(legacy-mode regression guard)
m5. >4*(size+1) consecutive-304 defense only mock-tested at the fetcher layer
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java:165-211
Category: testing
Every nextPage_* test mocks handleChangeFeedNotModified directly. The PR's defense-in-depth argument rests on the real FeedRangeCompositeContinuationImpl returning NO_RETRY after the >4*(size+1) threshold. Mock-based contract tests verify "IF NO_RETRY THEN terminate" but not "the real impl DOES return NO_RETRY after threshold".
Verified:
$ grep -n "when(continuation.handleChangeFeedNotModified" .../ChangeFeedFetcherEmptyPagesTest.java
131, 171, 219, 256: (all mocked, no integration with real impl)
Suggested fix: One integration-style test (still unit group, no real Cosmos client): wire a real FeedRangeCompositeContinuationImpl with 2-3 sub-ranges, feed >4*(size+1) consecutive 304 responses, assert shouldFetchMore() flips to false. Catches contract drift between ChangeFeedFetcher and FeedRangeCompositeContinuationImpl.
m6. Spark CHANGELOGs don't disclose per-page-overhead trade-off
Files: sdk/cosmos/azure-cosmos-spark_3-*_2-1*/CHANGELOG.md
Category: perf / agentic disclosure
All 4 Spark CHANGELOG entries announce the fix but don't document the per-page trade-off: workloads with many empty pages will now produce one Spark iterator callback per surfaced page (vs. previously one callback could drain many). Observability dashboards will see "pages per task" metrics spike; LLM cost-attribution tools will misattribute.
Verified:
$ grep -i 'emptyPagesAllowed\|allow.empty.page' sdk/cosmos/azure-cosmos-spark_3-*_*/CHANGELOG.md
# all 4 entries describe the fix but not the surfacing trade-off
Suggested fix: Append to each Spark CHANGELOG entry:
"Note: workloads with many empty pages will now observe one Spark iterator callback per surfaced page where previously a single callback could drain many — this is the intended trade-off for per-page timeout enforcement."
m7. New flag not reachable from public API — forward-looking design statement missing
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: agentic
Opt-in flag is impl-class-only; a future Flink connector, MCP server, or 3rd-party agent that wants the same per-page timeout enforcement must depend on com.azure.cosmos.implementation bridge accessors.
Suggested fix: Either (a) graduate to a public @Beta method with doc-comment warning callers must handle empty FeedResponse pages, OR (b) explicitly document the forward-looking decision ("permanently internal — Spark connector only") in a code comment so future Cosmos team and 3rd-party consumers know which way the wind blows. (Overlaps with m3 — both fixes can ship in one code comment.)
m8. New surfaceOrSwallow decision invisible to logs/diagnostics/OpenTelemetry
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java:183-189
Category: agentic
The new surfaceOrSwallowNoChangesPage decision and the new NO_RETRY-on-noChanges branch are invisible to logs/diagnostics. An LLM diagnostician investigating "how many physical pages did this query drain" has no signal beyond the page count itself.
Suggested fix: Add structured trace:
if (logger.isTraceEnabled()) {
logger.trace("change_feed.empty_page surfacing={} no_retry={} continuation={}",
this.emptyPagesAllowed,
retryResult == ShouldRetryResult.noRetry(),
getContinuationForLogging());
}Structured key=value form per observability convention.
Nits
n1. nextPageInternal complexity at advisory boundary
File: ChangeFeedFetcher.java:121-176
Category: code_quality
56 lines, 4-deep nesting; the load-bearing noChanges(r) && this.emptyPagesAllowed guard is the 4th nested check. Dense comments mitigate but still right at the advisory boundary.
Suggested (optional): Extract the .flatMap((r) -> { ... }) body into a private applyNoChangesDecision(FeedResponse<T> r) method.
n2. Pre-existing no-op in CosmosEncryptionAsyncContainer:1158
Category: code_quality | Severity: nit (pre-existing, out-of-scope for this PR)
getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions); discards the return value (the method has no side effects on its receiver). Encryption customers don't get even the partial copy-list propagation. Out of scope — file a follow-up.
n3. Test uses reflection where direct package access would work
File: ChangeFeedFetcherEmptyPagesTest.java:639-646
Category: testing
Test class and Fetcher.isFullyDrained live in the same package (com.azure.cosmos.implementation.query); reflection isn't needed. Reflection-based tests don't fail at compile time on rename refactors.
Suggested fix: Replace reflection with a tiny subclass:
class TestableChangeFeedFetcher extends ChangeFeedFetcher {
boolean callIsFullyDrained(FeedResponse<Document> r) { return isFullyDrained(true, r); }
}n4. No emulator / E2E test reproduces the actual OperationCancelledException scenario
Category: testing | Tag: unverified-pattern-suggestion
Coverage is unit-mock only. Granular mock coverage is appropriate primary defense, but an emulator test would lock in the fix at the integration boundary.
Verified:
$ grep -rln 'OperationCancelled\|end-to-end timeout\|endToEndTimeout\|emptyPagesAllowed' sdk/cosmos/azure-cosmos-tests/src/test
# only the two new files added in this PR
Suggested follow-up (not for this PR): @Test(groups = { "emulator" }) that creates a sparse multi-partition container with aggressive end-to-end-latency policy and asserts byPage().iterator() change-feed read completes without OCE.
Design alignment (citations to design docs)
cosmos-pr-reviewer raised 9 design-alignment findings, almost all of which are propose-design-doc-extension items.
D1. disableShouldFetchMore() bypasses Fetcher.update() top/maxItemCount machinery
File: ChangeFeedFetcher.java:160-169 | Citation: 12-change-feed.md#key-classes
Direct disableShouldFetchMore() call sidesteps the documented Fetcher base-class state machine. Benign because top is always -1 for change feed, but the bypass deserves an inline comment acknowledging the safety argument.
D2. surfaceOrSwallowNoChangesPage helper conflates snapshot and streaming mode semantics
File: ChangeFeedFetcher.java:183-189 | Citation: 12-change-feed.md#key-classes
DRYing the two original call sites risks silently coupling their swallow-vs-surface semantics. Suggested fix: add a comment stating the helper applies to both modes intentionally.
D3-D9. Propose-design-doc-extension items (for the cosmosdb-design-docs repo, NOT this SDK PR)
| ID | Title | Target chapter | Proposed heading |
|---|---|---|---|
| D3 | Incremental change-feed termination contract undocumented | 12-change-feed.md |
## Incremental Change Feed — Termination Contract |
| D4 | >4*(size+1) consecutive-304 defense bail rationale lives only in code |
12-change-feed.md |
append to ### Scenario 4: Breadth-First 304 Cycling |
| D5 | emptyPagesAllowed SDK contract surface (swallow vs surface) undocumented |
12-change-feed.md + cross-ref 07-query-engine.md |
## Empty-Page Handling — Swallow vs Surface |
| D6 | 07-query-engine.md silent on SDK-side ParallelDocumentQueryExecutionContext |
07-query-engine.md |
## SDK-Side Query Execution Contexts |
| D7 | withCosmosPagedFluxOptions continuation-rebuild field drift is a design rule |
12-change-feed.md |
append ### Continuation-Token Round-Trip Field Drift |
| D8 | 17-status-codes-and-sdk-retries.md table missing change-feed 304 semantics cross-ref |
17-status-codes-and-sdk-retries.md |
row 304 entry — add change-feed cross-ref |
| D9 | Merge-amplification of empty pages under emptyPagesAllowed=true not in Scenario 1 |
12-change-feed.md#scenario-1-duplicate-processing-after-merge |
extend impact bullets |
All 9 are non-blocking for this SDK PR. Treat as follow-up issues on the design-docs repo.
Cross-SDK informational divergences
These are not findings to act on, but the user (and any future cross-SDK consistency reviewer) should know:
| Behavior | Java (post-PR) | .NET | Python | Note |
|---|---|---|---|---|
| Default 304 handling on cross-partition CF | repeatWhenEmpty (unbounded — pre-existing) |
Aggregate per cycle, surface one NotModifiedPage |
while True bounded by should_retry_on_not_modified_response() |
Java still divergent in default mode — intentional back-compat preservation |
| Opt-in to surface per-page | emptyPagesAllowed=true (internal-only) |
Not exposed (no equivalent flag) | Not exposed (no equivalent kwarg) | Verified by grep — zero hits in both peers |
| Per-operation end-to-end timeout config | CosmosEndToEndOperationLatencyPolicyConfig |
Not present (only per-request AvailabilityStrategy) |
Not present | This is why .NET/Python don't see the timeout bug |
isFullyDrained-style termination signal |
Yes (Java-specific) | No (uses cycle-bound IsNextRangeEqualToOriginal) |
No (uses _initial_no_result_range cycle tracking) |
The Java regression class has no peer analog |
| Empty cycle UX | Continues polling (Java) | Surfaces NotModifiedPage per cycle |
Terminates iterator entirely (raise StopIteration) |
Materially different — multi-SDK customers will observe different paging shapes |
Cross SDK Reviewer's verdict: Approve. There is no peer SDK to copy the design from; Java's choices are internally consistent with its own pre-existing query-side setAllowEmptyPages and deliberately divergent from peers in a way peers cannot easily match without first shipping a per-operation timeout config.
Verified-negative findings (rigor receipts)
Preserved verbatim — these prove what was checked and explicitly ruled out:
- ❌ "Naming inconsistency
emptyPagesAllowedvsallowEmptyPagesat field/impl layer" — DROPPED. Field name + impl methods exactly match siblingCosmosQueryRequestOptionsImpl(verified by grep on line 25, 308, 317). - ❌ "
TransientIOErrorsRetryingIteratorSpec.scalaonly runs againstazure-cosmos-spark_3and not the 4 publishable variants" — DROPPED. All 4 variantpom.xmls reference<source>${basedir}/../azure-cosmos-spark_3/src/test/scala</source>(shared test source dir; auto-runs against all variants). - ❌ "Coverage gap for
>4*(size+1)defense inFeedRangeCompositeContinuationImpl" — Partially dropped. The NO_RETRY contract IS mock-tested at the fetcher layer (nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_terminatesIteration, lines 442-489). Finding m5 above re-raises the integration-against-real-impl gap, which is a complementary concern. - ❌ "
isFullyDraineddefault-mode regression coverage missing" — DROPPED. TestisFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue(line 379) exists and explicitly verifies short-circuit fires without consulting the continuation (Mockito.verify(continuation, Mockito.never()).isDone()— load-bearing pin). - ❌ "Behavior change in
(completeAfterAllCurrentChangesRetrieved || endLSN != null)branch foremptyPagesAllowed=falsecallers" — DROPPED. Comparedgit show HEAD~6:.../ChangeFeedFetcher.javato current: bytecode-equivalent for the legacy path (reEnableShouldFetchMoreForRetry(); return Mono.empty();). Behavior preservation verified.
What we explicitly did NOT flag
- PR Deep Reviewer confirmed (via 10/10 ✅ table) every claim in the PR body about:
isDone()permanently false,handleChangeFeedNotModifiedNO_RETRY-for-data, only-one-caller verification onPaginator.getChangeFeedQueryResultAsObservableandnew ChangeFeedFetcher(...), copy constructor includes new field,disableShouldFetchMorecorrectly stopsPaginator, refactor behavior-preserving for all 4 (branch × flag) combinations. - Existing
copilot-pull-request-reviewerinline comments (3) — all addressed by author in commit08f690b, dropped to avoid duplication. - Multi-Spark-variant test coverage — verified shared source dir; no per-variant duplication needed.
- Bridge accessor pattern as language-idiomatic — verified parity with .NET
InternalsVisibleTo(lists 20+ friend assemblies inAssemblyInfo.cs:7-44) and Python_moduleconvention. Java's pattern is parity-aligned in spirit; not over-engineering. isFullyDrainedregression as a Java-specific architecture issue — verified neither .NET nor Python has an analogousIsFullyDrained/IsDone()termination signal; the bug class has no peer analog.
Generated by the PR Review Router (Tier 3 — Deep) — orchestrating PR Deep Reviewer + Engineering Quality Reviewer + cosmos-pr-reviewer + Cross SDK Reviewer in parallel. Posted as a single review comment with explicit user approval after a human review pass.
Summary
Spark partition tasks reading change feed (or executing cross-partition queries) against a sparse workload could hit
OperationCancelledException("End-to-end timeout hit when trying to retrieve the next page") at the connector's 65-second per-operation end-to-end timeout.Root cause: with the default
emptyPagesAllowed=false,ParallelDocumentQueryExecutionContextandChangeFeedFetcherswallow empty / 304 pages internally — a single producer-sidenextPage()call can keep draining many sub-feedRanges before emitting one non-empty page. For sparse cross-partition workloads the cumulative wall time blows the per-operation timeout.Fix
Spark
ItemsPartitionReader(query path) callssetAllowEmptyPages(true)onCosmosQueryRequestOptionsso the SDK's existing emptyPagesAllowed plumbing applies — each physical page surfaces independently and is bound by the per-page timeout window rather than the per-operation one.New internal-only
emptyPagesAllowedflag onCosmosChangeFeedRequestOptionsImpl(defaultfalse— behavior unchanged for all other callers). Plumbed throughPaginator.getChangeFeedQueryResultAsObservableintoChangeFeedFetcher.nextPageInternal. When the flag is true, both 304/noChangesbranches returnMono.just(r)so empty pages bubble up to the iterator. Exposed via new package-private bridge accessorCosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages(deliberately no public setter — see in-code comment).ChangeFeedFetcher.isFullyDrainedcorrectness fix. The original cleanup (consult onlycontinuation.isDone()) regressed every non-Spark caller becauseFeedRangeCompositeContinuationImpl.isDone()returnscompositeContinuationTokens.size() == 0, which is permanentlyfalsefor incremental change feed (moveToNextToken()rotates the deque, never shrinks it). ThenoChanges → trueshort-circuit is now gated on!emptyPagesAllowed, preserving the original termination signal for every default-mode caller while still letting Spark surface empty pages.Defense-in-depth
NO_RETRYarm. WithemptyPagesAllowed=trueand a streaming change feed (noendLSN), ifhandleChangeFeedNotModifiedreturnsNO_RETRYon a noChanges page (single-partition case, multi-partition full-cycle complete, or the>4*(size+1)consecutive-304 defense inFeedRangeCompositeContinuationImpl), the SDK's built-in termination signal would otherwise be silently dropped.nextPageInternalnow explicitly callsdisableShouldFetchMore()when the SDK signals NO_RETRY on a noChanges page so the defense-in-depth termination guarantee survives for the new flag too. ThenoChanges(r) && emptyPagesAllowedguard is load-bearing becausehandleChangeFeedNotModifiedreturnsNO_RETRYfor every data page too — without the guard, every data page would silently truncate iteration.Spark
ChangeFeedPartitionReaderopts into the new flag via the bridge accessor.CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptionsalso propagatesemptyPagesAllowedwhen the paged-flux pull mechanism supplies a continuation token (the freshly-built impl would otherwise silently lose the flag — added a comment flagging the broader drift hazard in that copy list).DRY: extracted the two near-identical
surface or swallow via repeatWhenEmptyblocks into asurfaceOrSwallowNoChangesPage(r)helper.Behavior matrix
nextPage()blocks > 65 s, throws OCEemptyPagesAlloweddefaults tofalse)