cosmos: Enable endToEndTimeout for queryDocumentChangeFeed operation#48144
cosmos: Enable endToEndTimeout for queryDocumentChangeFeed operation#48144mbhaskar wants to merge 2 commits into
Conversation
- Add endToEndOperationLatencyPolicyConfig field to CosmosChangeFeedRequestOptionsImpl and implement the previously stubbed getCosmosEndToEndLatencyPolicyConfig() method - Add public setCosmosEndToEndOperationLatencyPolicyConfig() API to CosmosChangeFeedRequestOptions mirroring CosmosQueryRequestOptions - Add getChangeFeedResponseFluxWithTimeout() helper in RxDocumentClientImpl for wrapping change feed flux with e2e timeout logic - Apply e2e timeout in queryDocumentChangeFeed() by extracting the config from request options and wrapping the flux when enabled - Add queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout test to EndToEndTimeOutValidationTests using READ_FEED_ITEM fault injection Fixes Azure#40507 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR enables end-to-end timeout support for change feed query operations by implementing the previously stubbed getCosmosEndToEndLatencyPolicyConfig() method in CosmosChangeFeedRequestOptionsImpl and wiring the timeout logic into the queryDocumentChangeFeed() method in RxDocumentClientImpl.
Changes:
- Added
endToEndOperationLatencyPolicyConfigfield and public setter to enable end-to-end timeout configuration on change feed requests - Implemented timeout wrapping for change feed response flux using a new helper method that mirrors the pattern used for query operations
- Added test coverage for timeout behavior using fault injection to verify
OperationCancelledExceptionwithCLIENT_OPERATION_TIMEOUTsubstatus
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
CosmosChangeFeedRequestOptions.java |
Added public API method setCosmosEndToEndOperationLatencyPolicyConfig() to allow configuring end-to-end timeout policy on a per-request basis |
CosmosChangeFeedRequestOptionsImpl.java |
Added private field, getter, setter, and copy constructor support for endToEndOperationLatencyPolicyConfig |
RxDocumentClientImpl.java |
Added getChangeFeedResponseFluxWithTimeout() helper method and integrated it into queryDocumentChangeFeed() to apply timeout wrapping when enabled |
EndToEndTimeOutValidationTests.java |
Added queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout() test using READ_FEED_ITEM fault injection to verify timeout behavior |
|
@sdkReviewAgent-2 |
|
⏳ PR Review Agent — Starting review... |
| .getImpl(requestOptions); | ||
|
|
||
| CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig = | ||
| this.getEffectiveEndToEndOperationLatencyPolicyConfig( |
There was a problem hiding this comment.
🟡 Recommendation — Behavioral Change: PPAF/client-level e2e timeout now applies to Change Feed Processor
With getEffectiveEndToEndOperationLatencyPolicyConfig resolving the effective policy, change feed operations are now subject to:
- Client-level
cosmosEndToEndOperationLatencyPolicyConfig(if set on the builder) - PPAF-enforced defaults (
ppafEnforcedE2ELatencyPolicyConfigForReads) — sinceOperationType.ReadFeedreturnstrueforisReadOnlyOperation()
Previously, CosmosChangeFeedRequestOptionsImpl.getCosmosEndToEndLatencyPolicyConfig() returned null, so queryDocumentChangeFeed never applied timeout. Now, even when the user doesn't explicitly set a timeout on CosmosChangeFeedRequestOptions, the effective config resolution can produce a non-null timeout.
Why this matters: The Change Feed Processor (CFP) creates its own CosmosChangeFeedRequestOptions without setting e2e timeout. PartitionedByIdCollectionRequestOptionsFactory explicitly disables e2e timeout for lease operations (CosmosItemRequestOptions, CosmosQueryRequestOptions) but does not create a disabled config for the data-path change feed queries. If a user configures client-level e2e timeout (e.g., 5s for point reads) and also uses the Change Feed Processor, CFP's change feed queries could now receive unexpected OperationCancelledException.
Suggested action: Consider whether CFP should explicitly set a disabled/null e2e config on its change feed options, or document this behavioral change so users can adjust their client-level config accordingly.
- Add CHANGELOG entry for end-to-end timeout on queryChangeFeed. - Propagate endToEndOperationLatencyPolicyConfig in CosmosChangeFeedRequestOptionsImpl.override(CosmosRequestOptions) so CosmosOperationPolicy-set timeouts are honored for change feed operations (mirrors CosmosQueryRequestOptionsBase.override()). - Generalize getFeedResponseFluxWithTimeout to accept nullable CosmosQueryRequestOptions / isQueryCancelledOnTimeout and delete the duplicate getChangeFeedResponseFluxWithTimeout helper. Guard applyExceptionToMergedDiagnosticsForQuery against null requestOptions. - Add queryChangeFeedWithEndToEndTimeoutPolicyAndAvailabilityStrategyShouldTimeout test to cover change feed with a ThresholdBasedAvailabilityStrategy. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Description
Fixes #40507
endToEndTimeoutis supported for read and query operations viaCosmosEndToEndOperationLatencyPolicyConfig, but thequeryDocumentChangeFeedoperation previously ignored this configuration entirely.Changes
CosmosChangeFeedRequestOptionsImpl: AddedendToEndOperationLatencyPolicyConfigfield and implemented the previously stubbedgetCosmosEndToEndLatencyPolicyConfig()getter. Added corresponding setter.CosmosChangeFeedRequestOptions(public API): AddedsetCosmosEndToEndOperationLatencyPolicyConfig()method, mirroring the same API onCosmosQueryRequestOptions.RxDocumentClientImpl: AddedgetChangeFeedResponseFluxWithTimeout()helper and wired it intoqueryDocumentChangeFeed()- extracting the e2e policy config from request options and wrapping the change feed flux with timeout when enabled.EndToEndTimeOutValidationTests: AddedqueryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout()test usingREAD_FEED_ITEMfault injection, verifyingOperationCancelledExceptionwithCLIENT_OPERATION_TIMEOUTsubstatus is thrown.