-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout #49276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tvaron3
wants to merge
6
commits into
Azure:main
Choose a base branch
from
tvaron3:tvaron3/spark-allow-empty-pages
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
2e43e14
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end ti…
tvaron3 8616057
Add PR link to CHANGELOG entries
tvaron3 08f690b
Restore noChanges short-circuit in isFullyDrained when emptyPagesAllo…
tvaron3 48a9e99
Address pass-3 review: defense-in-depth on NO_RETRY + DRY cleanup
tvaron3 0304dfe
Address pass-4 review: lock contract on data-page non-termination
tvaron3 34d3da4
Shrink bridge accessor surface; reclassify CHANGELOG entry
tvaron3 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 108 additions & 0 deletions
108
...m/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos.implementation; | ||
|
|
||
| import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; | ||
| import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; | ||
| import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; | ||
| import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; | ||
| import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; | ||
| import com.azure.cosmos.models.ModelBridgeInternal; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| /** | ||
| * Unit tests for the paged-flux pull continuation path on | ||
| * {@link CosmosChangeFeedRequestOptions#withCosmosPagedFluxOptions(CosmosPagedFluxOptions)} (package-visible via | ||
| * {@link ModelBridgeInternal#getEffectiveChangeFeedRequestOptions(CosmosChangeFeedRequestOptions, CosmosPagedFluxOptions)}). | ||
| * | ||
| * <p>That method silently builds a brand-new {@code CosmosChangeFeedRequestOptionsImpl} when the caller supplies a | ||
| * continuation token via {@link CosmosPagedFluxOptions}, so any field NOT explicitly copied is dropped. These tests | ||
| * lock in the propagation of fields whose loss would silently break a feature. | ||
| */ | ||
| public class CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest { | ||
|
|
||
| @Test(groups = { "unit" }) | ||
| public void emptyPagesAllowed_isPropagated_whenContinuationTokenSupplied() { | ||
| // GIVEN a CosmosChangeFeedRequestOptions with emptyPagesAllowed=true (the value the Spark connector sets) | ||
| CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions | ||
| .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); | ||
| ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper | ||
| .getCosmosChangeFeedRequestOptionsAccessor() | ||
| .getImpl(src) | ||
| .setEmptyPagesAllowed(true); | ||
|
|
||
| // AND a continuation token supplied via the paged-flux pull mechanism | ||
| CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); | ||
| pagedFluxOptions.setRequestContinuation(buildContinuationToken()); | ||
|
|
||
| // WHEN computing the effective options | ||
| CosmosChangeFeedRequestOptions effective = ModelBridgeInternal | ||
| .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); | ||
|
|
||
| // THEN emptyPagesAllowed must be preserved on the freshly-built impl | ||
| assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper | ||
| .getCosmosChangeFeedRequestOptionsAccessor() | ||
| .getImpl(effective) | ||
| .isEmptyPagesAllowed()) | ||
| .describedAs("emptyPagesAllowed must survive the paged-flux pull continuation rebuild") | ||
| .isTrue(); | ||
| } | ||
|
|
||
| @Test(groups = { "unit" }) | ||
| public void emptyPagesAllowedFalse_isPropagated_whenContinuationTokenSupplied() { | ||
| // The default value should also round-trip cleanly (sanity check that we're not just hard-coding true). | ||
| CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions | ||
| .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); | ||
|
|
||
| CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); | ||
| pagedFluxOptions.setRequestContinuation(buildContinuationToken()); | ||
|
|
||
| CosmosChangeFeedRequestOptions effective = ModelBridgeInternal | ||
| .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); | ||
|
|
||
| assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper | ||
| .getCosmosChangeFeedRequestOptionsAccessor() | ||
| .getImpl(effective) | ||
| .isEmptyPagesAllowed()) | ||
| .describedAs("emptyPagesAllowed default (false) must survive the paged-flux pull continuation rebuild") | ||
| .isFalse(); | ||
| } | ||
|
|
||
| @Test(groups = { "unit" }) | ||
| public void emptyPagesAllowed_isPreserved_whenNoContinuationTokenSupplied() { | ||
| // No continuation → withCosmosPagedFluxOptions returns `this` unchanged. | ||
| CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions | ||
| .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); | ||
| ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper | ||
| .getCosmosChangeFeedRequestOptionsAccessor() | ||
| .getImpl(src) | ||
| .setEmptyPagesAllowed(true); | ||
|
|
||
| CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); | ||
| pagedFluxOptions.setMaxItemCount(50); | ||
|
|
||
| CosmosChangeFeedRequestOptions effective = ModelBridgeInternal | ||
| .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); | ||
|
|
||
| assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper | ||
| .getCosmosChangeFeedRequestOptionsAccessor() | ||
| .getImpl(effective) | ||
| .isEmptyPagesAllowed()) | ||
| .isTrue(); | ||
| } | ||
|
|
||
| private static String buildContinuationToken() { | ||
| // Build a real ChangeFeedState so we can serialize a valid (base64-encoded) continuation token. | ||
| // We use the state's own toString() which round-trips through createForProcessingFromContinuation. | ||
| ChangeFeedStateV1 state = new ChangeFeedStateV1( | ||
| "someContainerRid", | ||
| FeedRangeEpkImpl.forFullRange(), | ||
| ChangeFeedMode.INCREMENTAL, | ||
| ChangeFeedStartFromInternal.createFromBeginning(), | ||
| null); | ||
| return state.toString(); | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 Suggestion — Consistency: Bridge accessor pattern divergence
emptyPagesAllowedis set viagetImpl(options).setEmptyPagesAllowed(true), which leaks theCosmosChangeFeedRequestOptionsImpltype at the call site. Sibling internal fields on the same options class use dedicated named bridge methods — e.g.,endLSNat line 214 usessetEndLSN(options, ...)throughCosmosChangeFeedRequestOptionsAccessor.If
CosmosChangeFeedRequestOptionsImplis ever refactored (method rename, type change), thisgetImpl()call site breaks without a compile error, whereas named bridge methods localize the breakage to the accessor implementation.Suggested action: Consider adding
setEmptyPagesAllowed/isEmptyPagesAllowedmethods toCosmosChangeFeedRequestOptionsAccessor(matching the query path pattern whereCosmosQueryRequestOptionsAccessor.setAllowEmptyPagesexists), and using them here instead ofgetImpl().