Skip to content

Conversation

@featzhang
Copy link
Member

What is the purpose of the change

This PR adds retry and timeout strategies to AsyncBatchWaitOperator to support AI/ML inference workloads that require:

  • Retry on transient failures - Automatically retry failed batch async operations with configurable backoff strategies
  • Timeout handling - Prevent indefinite waiting for async operations with configurable timeout behavior (fail or allow partial results)

These features are essential for production AI inference pipelines where:

  • External model serving endpoints may experience transient failures
  • Network issues or service degradation require graceful retry mechanisms
  • Hard timeout limits are needed to prevent unbounded latency

Brief change log

New Classes

Class Description
AsyncBatchRetryStrategy<OUT> Interface defining retry strategy for batch operations
AsyncBatchRetryPredicate<OUT> Interface for predicates that determine when to retry
AsyncBatchTimeoutPolicy Configuration for batch-level timeout behavior
AsyncBatchRetryStrategies Utility class with built-in retry strategies

Retry Strategies Provided

Strategy Description
NoRetryStrategy Never retries (default behavior)
FixedDelayRetryStrategy Retries with fixed delay between attempts
ExponentialBackoffDelayRetryStrategy Retries with exponentially increasing delays

Timeout Policies Provided

Policy Description
NO_TIMEOUT_POLICY Timeout disabled (default)
failOnTimeout(Duration) Fail the operator when timeout occurs
allowPartialOnTimeout(Duration) Allow partial results when timeout occurs

Operator Changes

  • Modified AsyncBatchWaitOperator to support:
    • Retry logic based on exception or result predicates
    • Timeout tracking with processing time timers
    • Metrics for retry and timeout events (batchRetryCount, batchTimeoutCount)

API Changes

  • Added new overloads to AsyncDataStream:
    • unorderedWaitBatch(DataStream, AsyncBatchFunction, int, long, AsyncBatchRetryStrategy, AsyncBatchTimeoutPolicy) - Full configuration
    • unorderedWaitBatchWithRetry(...) - Retry support only
    • unorderedWaitBatchWithTimeout(...) - Timeout support only

Example Usage

Fixed Delay Retry

// Retry up to 3 times with 100ms delay on IOException
AsyncBatchRetryStrategy<String> retryStrategy = 
    new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder<String>(3, 100L)
        .ifException(e -> e instanceof IOException)
        .build();

AsyncDataStream.unorderedWaitBatchWithRetry(
    inputStream,
    batchFunction,
    maxBatchSize,
    batchTimeoutMs,
    retryStrategy
);

Exponential Backoff Retry

// Retry up to 5 times: initial 100ms, max 10s, multiplier 2.0
AsyncBatchRetryStrategy<String> retryStrategy = 
    new AsyncBatchRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<String>(
            5, 100L, 10000L, 2.0)
        .ifException(e -> e instanceof TimeoutException)
        .build();

Timeout Configuration

// Fail after 5 seconds
AsyncBatchTimeoutPolicy timeoutPolicy = 
    AsyncBatchTimeoutPolicy.failOnTimeout(Duration.ofSeconds(5));

// Or allow partial results after 10 seconds
AsyncBatchTimeoutPolicy timeoutPolicy = 
    AsyncBatchTimeoutPolicy.allowPartialOnTimeout(Duration.ofSeconds(10));

AsyncDataStream.unorderedWaitBatchWithTimeout(
    inputStream,
    batchFunction,
    maxBatchSize,
    batchTimeoutMs,
    timeoutPolicy
);

Combined Retry and Timeout

AsyncDataStream.unorderedWaitBatch(
    inputStream,
    batchFunction,
    maxBatchSize,
    batchTimeoutMs,
    retryStrategy,
    timeoutPolicy
);

Verifying this change

This change added tests and can be verified as follows:

Retry Tests

  • AsyncBatchRetryAndTimeoutTest#testRetryWithFixedDelay - Verifies fixed delay retry works correctly
  • AsyncBatchRetryAndTimeoutTest#testRetryWithExponentialBackoff - Verifies exponential backoff retry
  • AsyncBatchRetryAndTimeoutTest#testRetryOnResultPredicate - Verifies retry based on result predicate
  • AsyncBatchRetryAndTimeoutTest#testRetryExhausted - Verifies failure after max attempts exhausted
  • AsyncBatchRetryAndTimeoutTest#testNoRetryForNonMatchingException - Verifies non-matching exceptions are not retried

Timeout Tests

  • AsyncBatchRetryAndTimeoutTest#testTimeoutWithFailBehavior - Verifies fail-on-timeout behavior
  • AsyncBatchRetryAndTimeoutTest#testTimeoutWithAllowPartialBehavior - Verifies allow-partial-on-timeout behavior
  • AsyncBatchRetryAndTimeoutTest#testCompletionBeforeTimeout - Verifies normal completion before timeout

Combined Tests

  • AsyncBatchRetryAndTimeoutTest#testTimeoutCancelsRetry - Verifies timeout cancels pending retry
  • AsyncBatchRetryAndTimeoutTest#testRetrySucceedsBeforeTimeout - Verifies retry succeeds before timeout

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (new @PublicEvolving interfaces)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (retry/timeout tracking in batch processing)
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Design Decisions

1. Strategy Pattern for Retry

We use the Strategy pattern to allow flexible retry configuration, following the existing AsyncRetryStrategy design in Flink.

2. Timeout via ProcessingTimeService

Timeout is implemented using Flink's ProcessingTimeService timers, ensuring integration with Flink's time handling.

3. Atomic State Management

The BatchResultHandler uses AtomicBoolean flags to prevent race conditions between timeout, retry, and normal completion.

4. Metrics Integration

Added batchRetryCount and batchTimeoutCount counters for monitoring retry and timeout events.

Future Work (TODOs in code)

  • Event-time based batching support
  • Ordered batch operations with retry/timeout
  • Circuit breaker pattern for repeated failures
  • Retry with different batch splitting strategies

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants