Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit when trying to retrieve the next page") for query and change feed reads against workloads that produce long runs of empty pages (for example a cross-partition query that effectively performs a full-table scan and returns only a few documents). The connector now opts into the SDK's `emptyPagesAllowed` behavior so the per-page end-to-end timeout applies to each individual page rather than being exceeded by serial empty-page drains. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit when trying to retrieve the next page") for query and change feed reads against workloads that produce long runs of empty pages (for example a cross-partition query that effectively performs a full-table scan and returns only a few documents). The connector now opts into the SDK's `emptyPagesAllowed` behavior so the per-page end-to-end timeout applies to each individual page rather than being exceeded by serial empty-page drains. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit when trying to retrieve the next page") for query and change feed reads against workloads that produce long runs of empty pages (for example a cross-partition query that effectively performs a full-table scan and returns only a few documents). The connector now opts into the SDK's `emptyPagesAllowed` behavior so the per-page end-to-end timeout applies to each individual page rather than being exceeded by serial empty-page drains. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit when trying to retrieve the next page") for query and change feed reads against workloads that produce long runs of empty pages (for example a cross-partition query that effectively performs a full-table scan and returns only a few documents). The connector now opts into the SDK's `emptyPagesAllowed` behavior so the per-page end-to-end timeout applies to each individual page rather than being exceeded by serial empty-page drains. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ private case class ChangeFeedPartitionReader
.setEndLSN(options, this.partition.endLsn.get)
}

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ChangeFeedFetcher.
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Consistency: Bridge accessor pattern divergence

emptyPagesAllowed is set via getImpl(options).setEmptyPagesAllowed(true), which leaks the CosmosChangeFeedRequestOptionsImpl type at the call site. Sibling internal fields on the same options class use dedicated named bridge methods — e.g., endLSN at line 214 uses setEndLSN(options, ...) through CosmosChangeFeedRequestOptionsAccessor.

If CosmosChangeFeedRequestOptionsImpl is ever refactored (method rename, type change), this getImpl() call site breaks without a compile error, whereas named bridge methods localize the breakage to the accessor implementation.

Suggested action: Consider adding setEmptyPagesAllowed / isEmptyPagesAllowed methods to CosmosChangeFeedRequestOptionsAccessor (matching the query path pattern where CosmosQueryRequestOptionsAccessor.setAllowEmptyPages exists), and using them here instead of getImpl().

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

.getImpl(options)
.setEmptyPagesAllowed(true)

options.setCustomItemSerializer(itemDeserializer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ private case class ItemsPartitionReader
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ParallelDocumentQueryExecutionContext.
ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
.setAllowEmptyPages(queryOptions, true)

private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
ThroughputControlHelper.populateThroughputControlGroupName(
ImplementationBridgeHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
factoryCallCount.get shouldEqual 1
}

"TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
// Regression test for the empty-page drain scenario: when the SDK is configured with
// emptyPagesAllowed=true the iterator must surface many consecutive empty
// pages without busy-waiting beyond the per-page end-to-end timeout. Even
// with hundreds of empty pages followed by real data, the iterator should
// return all real rows.
val emptyLeadingPages = 200
val realPages = 5
val totalPages = emptyLeadingPages + realPages
val iterator = new TransientIOErrorsRetryingIterator(
continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix(
continuationToken, totalPages, emptyLeadingPages),
pageSize,
1,
None,
None
)
iterator.maxRetryIntervalInMs = 5

// 2 producers (Left/Right) each emit realPages * pageSize rows
iterator.count(_ => true) shouldEqual (realPages * pageSize * 2)
}

private def generateMockedCosmosPagedFluxWithEmptyPrefix
(
continuationToken: String,
initialPageCount: Int,
leadingEmptyPageCount: Int
) = {

val leftProducer = generateFeedResponseFluxWithEmptyPrefix(
"Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val rightProducer = generateFeedResponseFluxWithEmptyPrefix(
"Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava
val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2)
UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux)
}

private def generateFeedResponseFluxWithEmptyPrefix
(
prefix: String,
pageCount: Int,
leadingEmptyPageCount: Int,
requestContinuationToken: Option[String]
): Flux[FeedResponse[SparkRowItem]] = {

// generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel.
val emptyPageSentinel = -1
val firstDataPageStartIndex = 1

val responses = Array.range(1, pageCount + 1)
.map(i => generateFeedResponse(
prefix,
i,
if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex))
.filter(response => requestContinuationToken.isEmpty ||
requestContinuationToken.get < response.getContinuationToken)

Flux.fromArray(responses)
}

private val objectMapper = new ObjectMapper

@throws[JsonProcessingException]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.ModelBridgeInternal;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Unit tests for the paged-flux pull continuation path on
* {@link CosmosChangeFeedRequestOptions#withCosmosPagedFluxOptions(CosmosPagedFluxOptions)} (package-visible via
* {@link ModelBridgeInternal#getEffectiveChangeFeedRequestOptions(CosmosChangeFeedRequestOptions, CosmosPagedFluxOptions)}).
*
* <p>That method silently builds a brand-new {@code CosmosChangeFeedRequestOptionsImpl} when the caller supplies a
* continuation token via {@link CosmosPagedFluxOptions}, so any field NOT explicitly copied is dropped. These tests
* lock in the propagation of fields whose loss would silently break a feature.
*/
public class CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest {

@Test(groups = { "unit" })
public void emptyPagesAllowed_isPropagated_whenContinuationTokenSupplied() {
// GIVEN a CosmosChangeFeedRequestOptions with emptyPagesAllowed=true (the value the Spark connector sets)
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(src)
.setEmptyPagesAllowed(true);

// AND a continuation token supplied via the paged-flux pull mechanism
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
pagedFluxOptions.setRequestContinuation(buildContinuationToken());

// WHEN computing the effective options
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);

// THEN emptyPagesAllowed must be preserved on the freshly-built impl
assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(effective)
.isEmptyPagesAllowed())
.describedAs("emptyPagesAllowed must survive the paged-flux pull continuation rebuild")
.isTrue();
}

@Test(groups = { "unit" })
public void emptyPagesAllowedFalse_isPropagated_whenContinuationTokenSupplied() {
// The default value should also round-trip cleanly (sanity check that we're not just hard-coding true).
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());

CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
pagedFluxOptions.setRequestContinuation(buildContinuationToken());

CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);

assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(effective)
.isEmptyPagesAllowed())
.describedAs("emptyPagesAllowed default (false) must survive the paged-flux pull continuation rebuild")
.isFalse();
}

@Test(groups = { "unit" })
public void emptyPagesAllowed_isPreserved_whenNoContinuationTokenSupplied() {
// No continuation → withCosmosPagedFluxOptions returns `this` unchanged.
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(src)
.setEmptyPagesAllowed(true);

CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
pagedFluxOptions.setMaxItemCount(50);

CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);

assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(effective)
.isEmptyPagesAllowed())
.isTrue();
}

private static String buildContinuationToken() {
// Build a real ChangeFeedState so we can serialize a valid (base64-encoded) continuation token.
// We use the state's own toString() which round-trips through createForProcessingFromContinuation.
ChangeFeedStateV1 state = new ChangeFeedStateV1(
"someContainerRid",
FeedRangeEpkImpl.forFullRange(),
ChangeFeedMode.INCREMENTAL,
ChangeFeedStartFromInternal.createFromBeginning(),
null);
return state.toString();
}
}
Loading