Skip to content

Conversation

@pnowojski
Copy link
Contributor

@pnowojski pnowojski commented Feb 11, 2026

This change depends on #27440, please ignore couple of first commits from Roman.

What is the purpose of the change

Befor this change, when watermark alignment is enabled, it can prevent backlogged jobs
from using all available resources. Inadvertently watermark alignment configured with
maxAllowedWatermarkDrift and updateInterval was de facto capping the backlog processing
speed to maxAllowedWatermarkDrift (event time) / updateInterval (wall clock). For example
when maxAllowedWatermarkDrift=30s and updateInterval=1s, backlog could not be processed
faster than 30s (event time) / 1s (wall clock). In that case, if job had 1 day of records
to process in the backlog (for example after 24h downtime), this backlog could not be
processed more quickly than in 48 minutes, regardless of available resources and number
of actual records.

This change adds SamplingWatermarksRingBuffer that will hide the latency between
SourceOperators and SourceCoordinator. For more information please look into the ticket.

Brief change log

please check individual commits

Verifying this change

PR adds new unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no ****/ don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

rkhachatryan and others added 14 commits January 23, 2026 08:59
…eived

This allows to prioritize processing of recovered records
(when recovering from an unaligned checkpoint)
…inting interval

The check doesn't make sense because checkpointing might be disabled
before recovery; or there might be a manual checkpoint.
Befor this change, when watermark alignment is enabled, it can prevent backlogged jobs
from using all available resources. Inadvertently watermark alignment configured with
maxAllowedWatermarkDrift and updateInterval was de facto capping the backlog processing
speed to maxAllowedWatermarkDrift (event time) / updateInterval (wall clock). For example
when maxAllowedWatermarkDrift=30s and updateInterval=1s, backlog could not be processed
faster than 30s (event time) / 1s (wall clock). In that case, if job had 1 day of records
to process in the backlog (for example after 24h downtime), this backlog could not be
processed more quickly than in 48 minutes, regardless of available resources and number
of actual records.

This change adds SamplingWatermarksRingBuffer that will hide the latency between
SourceOperators and SourceCoordinator. For more information please look into the ticket
and/or FLIP
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 11, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

.booleanType()
.defaultValue(false)
.withDescription(
"Don't pull any data from sources until the first checkpoint is triggered. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we rephrase this to "Only pull data from sources after the first checkpoint is triggered.

.withDescription(
"Don't pull any data from sources until the first checkpoint is triggered. "
+ "This might be helpful in reducing recovery times in cases where "
+ "recovered records from unaligned checkpoint compete with new incoming records for processing. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
from unaligned checkpoint -> from an unaligned checkpoint

"Controls size of the ring buffer used to smooth out watermark alignment "
+ "due to the inherent latency of the alignment process. Allowed watermarks "
+ "are announced at the updateInterval and this means they are often out of date "
+ "after the round trip. To address this problem, when pausing consumption of records, "
Copy link
Contributor

@davidradl davidradl Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does round trip mean in this context?

After sentence means they are often out of date after the round trip , can we extend this sentence with which means that.....

+ "max allowed watermark is not checked against the latest value of the watermark in "
+ "any given split/source, but against the oldest value in the ring buffer, that is "
+ "updated at every updateInterval. This is the config option that controls "
+ "the size of the ring buffer. The default buffer size is 3. Buffer sizes below 2 "
Copy link
Contributor

@davidradl davidradl Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit :
Buffer sizes below 2 -> Buffer sizes of 1. If there is no use case for buffer size 1 - I wonder if we should validate to such that there is a minimum value of 2


@Experimental
public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE =
key("pipeline.watermark-alignment.buffer-size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering about buffer-size, how about ring-buffer-capacity or if you like size then ring-buffer-size.

@@ -136,10 +141,19 @@ private CheckpointCoordinatorConfiguration(
!isUnalignedCheckpointsEnabled || maxConcurrentCheckpoints <= 1,
"maxConcurrentCheckpoints can't be > 1 if UnalignedCheckpoints enabled");

// max "in between duration" can be one year - this is to prevent numeric overflows
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : could you have this number as a constant?

public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE =
key("pipeline.watermark-alignment.buffer-size")
.intType()
.defaultValue(3)
Copy link
Contributor

@davidradl davidradl Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious to what value the user would know to set this to and what guidance we can give.

Is it feasible to dynamically workout how many watermarks we need to be aware of rather than hard code the number in a ring buffer.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Feb 12, 2026
Copy link
Contributor

@Efrat19 Efrat19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these valuable contributions.

With more frequent checkpoints, ids can be duplicated in RestoreUpgradedJobITCase.
This change adds a sipmle deduplication before the assertion.
Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I've updated RestoreUpgradedJobITCase to fix the failure.

Thanks for open-sourcing these changes!

@rkhachatryan
Copy link
Contributor

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants