-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-37399][runtime][source] Buffer watermarks for watermark alignment #27589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…eived This allows to prioritize processing of recovered records (when recovering from an unaligned checkpoint)
…inting interval The check doesn't make sense because checkpointing might be disabled before recovery; or there might be a manual checkpoint.
…g for a checkpoint
Befor this change, when watermark alignment is enabled, it can prevent backlogged jobs from using all available resources. Inadvertently watermark alignment configured with maxAllowedWatermarkDrift and updateInterval was de facto capping the backlog processing speed to maxAllowedWatermarkDrift (event time) / updateInterval (wall clock). For example when maxAllowedWatermarkDrift=30s and updateInterval=1s, backlog could not be processed faster than 30s (event time) / 1s (wall clock). In that case, if job had 1 day of records to process in the backlog (for example after 24h downtime), this backlog could not be processed more quickly than in 48 minutes, regardless of available resources and number of actual records. This change adds SamplingWatermarksRingBuffer that will hide the latency between SourceOperators and SourceCoordinator. For more information please look into the ticket and/or FLIP
| .booleanType() | ||
| .defaultValue(false) | ||
| .withDescription( | ||
| "Don't pull any data from sources until the first checkpoint is triggered. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we rephrase this to "Only pull data from sources after the first checkpoint is triggered.
| .withDescription( | ||
| "Don't pull any data from sources until the first checkpoint is triggered. " | ||
| + "This might be helpful in reducing recovery times in cases where " | ||
| + "recovered records from unaligned checkpoint compete with new incoming records for processing. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
from unaligned checkpoint -> from an unaligned checkpoint
| "Controls size of the ring buffer used to smooth out watermark alignment " | ||
| + "due to the inherent latency of the alignment process. Allowed watermarks " | ||
| + "are announced at the updateInterval and this means they are often out of date " | ||
| + "after the round trip. To address this problem, when pausing consumption of records, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does round trip mean in this context?
After sentence means they are often out of date after the round trip , can we extend this sentence with which means that.....
| + "max allowed watermark is not checked against the latest value of the watermark in " | ||
| + "any given split/source, but against the oldest value in the ring buffer, that is " | ||
| + "updated at every updateInterval. This is the config option that controls " | ||
| + "the size of the ring buffer. The default buffer size is 3. Buffer sizes below 2 " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit :
Buffer sizes below 2 -> Buffer sizes of 1. If there is no use case for buffer size 1 - I wonder if we should validate to such that there is a minimum value of 2
|
|
||
| @Experimental | ||
| public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE = | ||
| key("pipeline.watermark-alignment.buffer-size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering about buffer-size, how about ring-buffer-capacity or if you like size then ring-buffer-size.
| @@ -136,10 +141,19 @@ private CheckpointCoordinatorConfiguration( | |||
| !isUnalignedCheckpointsEnabled || maxConcurrentCheckpoints <= 1, | |||
| "maxConcurrentCheckpoints can't be > 1 if UnalignedCheckpoints enabled"); | |||
|
|
|||
| // max "in between duration" can be one year - this is to prevent numeric overflows | |||
| if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : could you have this number as a constant?
| public static final ConfigOption<Integer> WATERMARK_ALIGNMENT_BUFFER_SIZE = | ||
| key("pipeline.watermark-alignment.buffer-size") | ||
| .intType() | ||
| .defaultValue(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious to what value the user would know to set this to and what guidance we can give.
Is it feasible to dynamically workout how many watermarks we need to be aware of rather than hard code the number in a ring buffer.
Efrat19
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for these valuable contributions.
With more frequent checkpoints, ids can be duplicated in RestoreUpgradedJobITCase. This change adds a sipmle deduplication before the assertion.
rkhachatryan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I've updated RestoreUpgradedJobITCase to fix the failure.
Thanks for open-sourcing these changes!
|
@flinkbot run azure |
This change depends on #27440, please ignore couple of first commits from Roman.
What is the purpose of the change
Brief change log
please check individual commits
Verifying this change
PR adds new unit tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation