[FLINK-38825][streaming] Implement retry and timeout strategies for AsyncBatchWaitOperator #27359
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This PR adds retry and timeout strategies to
AsyncBatchWaitOperatorto support AI/ML inference workloads that require:These features are essential for production AI inference pipelines where:
Brief change log
New Classes
AsyncBatchRetryStrategy<OUT>AsyncBatchRetryPredicate<OUT>AsyncBatchTimeoutPolicyAsyncBatchRetryStrategiesRetry Strategies Provided
NoRetryStrategyFixedDelayRetryStrategyExponentialBackoffDelayRetryStrategyTimeout Policies Provided
NO_TIMEOUT_POLICYfailOnTimeout(Duration)allowPartialOnTimeout(Duration)Operator Changes
AsyncBatchWaitOperatorto support:batchRetryCount,batchTimeoutCount)API Changes
AsyncDataStream:unorderedWaitBatch(DataStream, AsyncBatchFunction, int, long, AsyncBatchRetryStrategy, AsyncBatchTimeoutPolicy)- Full configurationunorderedWaitBatchWithRetry(...)- Retry support onlyunorderedWaitBatchWithTimeout(...)- Timeout support onlyExample Usage
Fixed Delay Retry
Exponential Backoff Retry
Timeout Configuration
Combined Retry and Timeout
Verifying this change
This change added tests and can be verified as follows:
Retry Tests
AsyncBatchRetryAndTimeoutTest#testRetryWithFixedDelay- Verifies fixed delay retry works correctlyAsyncBatchRetryAndTimeoutTest#testRetryWithExponentialBackoff- Verifies exponential backoff retryAsyncBatchRetryAndTimeoutTest#testRetryOnResultPredicate- Verifies retry based on result predicateAsyncBatchRetryAndTimeoutTest#testRetryExhausted- Verifies failure after max attempts exhaustedAsyncBatchRetryAndTimeoutTest#testNoRetryForNonMatchingException- Verifies non-matching exceptions are not retriedTimeout Tests
AsyncBatchRetryAndTimeoutTest#testTimeoutWithFailBehavior- Verifies fail-on-timeout behaviorAsyncBatchRetryAndTimeoutTest#testTimeoutWithAllowPartialBehavior- Verifies allow-partial-on-timeout behaviorAsyncBatchRetryAndTimeoutTest#testCompletionBeforeTimeout- Verifies normal completion before timeoutCombined Tests
AsyncBatchRetryAndTimeoutTest#testTimeoutCancelsRetry- Verifies timeout cancels pending retryAsyncBatchRetryAndTimeoutTest#testRetrySucceedsBeforeTimeout- Verifies retry succeeds before timeoutDoes this pull request potentially affect one of the following parts
@Public(Evolving): yes (new@PublicEvolvinginterfaces)Documentation
Design Decisions
1. Strategy Pattern for Retry
We use the Strategy pattern to allow flexible retry configuration, following the existing
AsyncRetryStrategydesign in Flink.2. Timeout via ProcessingTimeService
Timeout is implemented using Flink's
ProcessingTimeServicetimers, ensuring integration with Flink's time handling.3. Atomic State Management
The
BatchResultHandlerusesAtomicBooleanflags to prevent race conditions between timeout, retry, and normal completion.4. Metrics Integration
Added
batchRetryCountandbatchTimeoutCountcounters for monitoring retry and timeout events.Future Work (TODOs in code)