Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationCancelledException;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.rx.TestSuiteBase;
Expand Down Expand Up @@ -258,6 +260,79 @@ public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
Comment thread
mbhaskar marked this conversation as resolved.
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
throw new SkipException("Failure injection only supported for DIRECT mode");
}

CosmosAsyncClient cosmosClient = initializeClient(endToEndOperationLatencyPolicyConfig);
FaultInjectionRule faultInjectionRule = null;
try {
CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig =
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(1))
.build();

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndOperationLatencyPolicyConfig);

faultInjectionRule = injectFailure(createdContainer, FaultInjectionOperationType.READ_FEED_ITEM, null);
CosmosPagedFlux<TestObject> changeFeedPagedFlux =
createdContainer.queryChangeFeed(options, TestObject.class);

StepVerifier.create(changeFeedPagedFlux)
.expectErrorMatches(throwable -> throwable instanceof OperationCancelledException
&& ((OperationCancelledException) throwable).getSubStatusCode()
== HttpConstants.SubStatusCodes.CLIENT_OPERATION_TIMEOUT)
.verify();
} finally {
if (faultInjectionRule != null) {
faultInjectionRule.disable();
}

safeClose(cosmosClient);
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryChangeFeedWithEndToEndTimeoutPolicyAndAvailabilityStrategyShouldTimeout() {
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
throw new SkipException("Failure injection only supported for DIRECT mode");
}

CosmosAsyncClient cosmosClient = initializeClient(endToEndOperationLatencyPolicyConfig);
FaultInjectionRule faultInjectionRule = null;
try {
CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig =
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(1))
.availabilityStrategy(
new ThresholdBasedAvailabilityStrategy(
Duration.ofMillis(100), Duration.ofMillis(200)))
.build();

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndOperationLatencyPolicyConfig);

faultInjectionRule = injectFailure(createdContainer, FaultInjectionOperationType.READ_FEED_ITEM, null);
CosmosPagedFlux<TestObject> changeFeedPagedFlux =
createdContainer.queryChangeFeed(options, TestObject.class);

StepVerifier.create(changeFeedPagedFlux)
.expectErrorMatches(throwable -> throwable instanceof OperationCancelledException
&& ((OperationCancelledException) throwable).getSubStatusCode()
== HttpConstants.SubStatusCodes.CLIENT_OPERATION_TIMEOUT)
.verify();
} finally {
if (faultInjectionRule != null) {
faultInjectionRule.disable();
}

safeClose(cosmosClient);
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeoutWithClientConfig() {
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added support for N-Region synchronous commit feature - See [PR 47757](https://github.com/Azure/azure-sdk-for-java/pull/47757)
* Added support for `CosmosEndToEndOperationLatencyPolicyConfig` on `queryChangeFeed` operations so that an end-to-end operation timeout (and associated availability strategy) configured at the client level, request level, or via `CosmosOperationPolicy` is now honored for change feed queries. - See [PR 48144](https://github.com/Azure/azure-sdk-for-java/pull/48144)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private boolean completeAfterAllCurrentChangesRetrieved;
private Long endLSN;
private ReadConsistencyStrategy readConsistencyStrategy;
private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig;

public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
if (toBeCloned.continuationState != null) {
Expand Down Expand Up @@ -80,6 +81,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
this.endLSN = toBeCloned.endLSN;
this.endToEndOperationLatencyPolicyConfig = toBeCloned.endToEndOperationLatencyPolicyConfig;
}

public CosmosChangeFeedRequestOptionsImpl(
Expand Down Expand Up @@ -296,8 +298,11 @@ public CosmosChangeFeedRequestOptionsImpl setExcludedRegions(List<String> exclud

@Override
public CosmosEndToEndOperationLatencyPolicyConfig getCosmosEndToEndLatencyPolicyConfig() {
// @TODO: Implement this and some of the others below
return null;
return this.endToEndOperationLatencyPolicyConfig;
}

public void setCosmosEndToEndLatencyPolicyConfig(CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig) {
Comment thread
mbhaskar marked this conversation as resolved.
this.endToEndOperationLatencyPolicyConfig = endToEndOperationLatencyPolicyConfig;
}

@Override
Expand Down Expand Up @@ -416,6 +421,9 @@ public void override(CosmosRequestOptions cosmosRequestOptions) {
this.throughputControlGroupName = overrideOption(cosmosRequestOptions.getThroughputControlGroupName(), this.throughputControlGroupName);
this.thresholds = overrideOption(cosmosRequestOptions.getDiagnosticsThresholds(), this.thresholds);
this.keywordIdentifiers = overrideOption(cosmosRequestOptions.getKeywordIdentifiers(), this.keywordIdentifiers);
this.endToEndOperationLatencyPolicyConfig = overrideOption(
cosmosRequestOptions.getCosmosEndToEndLatencyPolicyConfig(),
this.endToEndOperationLatencyPolicyConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,9 @@ private static void applyExceptionToMergedDiagnosticsForQuery(
exception,
mostRecentlyCreatedDiagnostics);
} else {
if (requestOptions == null) {
return;
}
List<CosmosDiagnostics> cancelledRequestDiagnostics =
qryOptAccessor
.getCancelledRequestDiagnosticsTracker(requestOptions);
Expand Down Expand Up @@ -1492,7 +1495,9 @@ private static <T> Flux<FeedResponse<T>> getFeedResponseFluxWithTimeout(
CosmosException cancellationException = getNegativeTimeoutException(null, endToEndTimeout);
cancellationException.setStackTrace(throwable.getStackTrace());

isQueryCancelledOnTimeout.set(true);
if (isQueryCancelledOnTimeout != null) {
isQueryCancelledOnTimeout.set(true);
}

applyExceptionToMergedDiagnosticsForQuery(
requestOptions, cancellationException, diagnosticsClientContext);
Expand All @@ -1510,7 +1515,9 @@ private static <T> Flux<FeedResponse<T>> getFeedResponseFluxWithTimeout(
CosmosException exception = new OperationCancelledException();
exception.setStackTrace(throwable.getStackTrace());

isQueryCancelledOnTimeout.set(true);
if (isQueryCancelledOnTimeout != null) {
isQueryCancelledOnTimeout.set(true);
}

applyExceptionToMergedDiagnosticsForQuery(requestOptions, exception, diagnosticsClientContext);

Expand Down Expand Up @@ -4775,7 +4782,30 @@ public <T> Flux<FeedResponse<T>> queryDocumentChangeFeed(
diagnosticsClientContext,
crossRegionAvailabilityContextForRequest);

return changeFeedQueryImpl.executeAsync();
CosmosChangeFeedRequestOptionsImpl implOptions =
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(requestOptions);

CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig =
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Behavioral Change: PPAF/client-level e2e timeout now applies to Change Feed Processor

With getEffectiveEndToEndOperationLatencyPolicyConfig resolving the effective policy, change feed operations are now subject to:

  1. Client-level cosmosEndToEndOperationLatencyPolicyConfig (if set on the builder)
  2. PPAF-enforced defaults (ppafEnforcedE2ELatencyPolicyConfigForReads) — since OperationType.ReadFeed returns true for isReadOnlyOperation()

Previously, CosmosChangeFeedRequestOptionsImpl.getCosmosEndToEndLatencyPolicyConfig() returned null, so queryDocumentChangeFeed never applied timeout. Now, even when the user doesn't explicitly set a timeout on CosmosChangeFeedRequestOptions, the effective config resolution can produce a non-null timeout.

Why this matters: The Change Feed Processor (CFP) creates its own CosmosChangeFeedRequestOptions without setting e2e timeout. PartitionedByIdCollectionRequestOptionsFactory explicitly disables e2e timeout for lease operations (CosmosItemRequestOptions, CosmosQueryRequestOptions) but does not create a disabled config for the data-path change feed queries. If a user configures client-level e2e timeout (e.g., 5s for point reads) and also uses the Change Feed Processor, CFP's change feed queries could now receive unexpected OperationCancelledException.

Suggested action: Consider whether CFP should explicitly set a disabled/null e2e config on its change feed options, or document this behavioral change so users can adjust their client-level config accordingly.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

implOptions.getCosmosEndToEndLatencyPolicyConfig(),
ResourceType.Document,
OperationType.ReadFeed);

Flux<FeedResponse<T>> feedResponseFlux = changeFeedQueryImpl.executeAsync();

if (endToEndPolicyConfig != null && endToEndPolicyConfig.isEnabled()) {
return getFeedResponseFluxWithTimeout(
feedResponseFlux,
endToEndPolicyConfig,
null,
null,
diagnosticsClientContext);
}

return feedResponseFlux;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnosticsThresholds;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.implementation.CosmosChangeFeedRequestOptionsImpl;
Expand Down Expand Up @@ -116,6 +117,20 @@ public CosmosChangeFeedRequestOptions setReadConsistencyStrategy(ReadConsistency
return this;
}

/**
* Sets the {@link CosmosEndToEndOperationLatencyPolicyConfig} to be used for the request. If the config is already
* set on the client, then this will override the client level config for this request.
*
* @param cosmosEndToEndOperationLatencyPolicyConfig the {@link CosmosEndToEndOperationLatencyPolicyConfig}
* @return the CosmosChangeFeedRequestOptions
*/
public CosmosChangeFeedRequestOptions setCosmosEndToEndOperationLatencyPolicyConfig(
CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig) {

this.actualRequestOptions.setCosmosEndToEndLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
return this;
}

/**
* Gets the maximum number of pages that will be prefetched from the backend asynchronously
* in the background. By pre-fetching these changes the throughput of processing the
Expand Down