-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-38939][runtime] Pause sources until the 1st checkpoint to prioritize processing recovered records #27440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| <td><h5>pipeline.sources.pause-until-first-checkpoint</h5></td> | ||
| <td style="word-wrap: break-word;">true</td> | ||
| <td>Boolean</td> | ||
| <td>Don't pull any data from sources until the first checkpoint is triggered. This might be helpful in reducing recovery times. Incompatible 0 value for execution.checkpointing.interval-during-backlog</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest expanding on "This might be helpful in reducing recovery times." to be specific about the scenarios that this would be helpful in. Are there scenarios it would not be helpful in? If there are no downsides to this change - then do we need a config option?
I am not sure what Incompatible 0 value for execution.checkpointing.interval-during-backlog means .
| && checkpointConfig.isPauseSourcesUntilFirstCheckpoint()) { | ||
| throw new IllegalArgumentException( | ||
| "Pausing sources until first checkpoint is incompatible with disabling checkpoints during backlog processing. " | ||
| + "Please consult " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "Please consult ...." -> "Please review and choose whether you require + CheckpointingOptions.PAUSE_SOURCES_UNTIL_FIRST_CHECKPOINT.key()
+ " or"
+ CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG.key());
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rkhachatryan , thanks for the PR, I have several questions about this approach.
Pause sources until the 1st checkpoint to prioritize processing recovered records
Don't pull any data from sources until the first checkpoint is triggered.
If so, the source does not work even if all recovered buffers are consumed, right?
Let me understand the existing issue and the current approach:
- The task will be switched from INITIALIZATION to RUNNING once all recovered input buffers and output buffers are consumed.
- The recovered buffer of some input channels are fully consumed, and there are some new buffers is coming. The recovered buffer of rest of channels are not fully consumed.
Issue: If task starts consume new buffers before all recovered buffer are consumed, it will be switched to running later.
IIUC,, the purpose of pause source is avoid new buffers are generated. Is it correct?
If so, I do not think it works perfect since new buffers can be generated from the recovered buffers of upstream task. Of course, pause source could avoid new buffers from outside of flink during recovery.
Blocking channels whose recovered buffers are fully consumed maybe more fine-grained that pausing source, it allows task consumes recovered buffers before new buffers, as well as the upstream tasks and source are not blocked as early as possible.
Also, FLIP-547 part 4.6 will introduce fine-grained blocking mechanism. Not sure whether pausing source is still needed if new mechanism will be introduced in the near future?
Looking forward to your opinion, thanks
|
Yes @1996fanrui, you're right. The purpose of this change is to prevent new input records from delaying the switch of the downstream tasks to RUNNING. In a sense, this is a lightweight alternative to FLIP-547. |
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rkhachatryan for the comment!
Sounds make sense for considering this approach as a lightweight alternative first. I only left one comment, please take a look when you are available, thanks
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
Outdated
Show resolved
Hide resolved
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming CI is green
…eived This allows to prioritize processing of recovered records (when recovering from an unaligned checkpoint)
…inting interval The check doesn't make sense because checkpointing might be disabled before recovery; or there might be a manual checkpoint.
| private JobAllocationsInformation getJobAllocationsInformationFromGraphAndState( | ||
| @Nullable final ExecutionGraph previousExecutionGraph) { | ||
|
|
||
| CompletedCheckpoint latestCompletedCheckpoint = null; | ||
| if (jobGraph.isCheckpointingEnabled()) { | ||
| latestCompletedCheckpoint = completedCheckpointStore.getLatestCheckpoint(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After after minimizing the delay for automatic checkpoints, LocalRecoveryTest#testStateSizeIsConsideredForLocalRecoveryOnRestart starts failing because there's a race condition between a manual and automatic checkpoints.
So I disabled automatic checkpoints in test and removed if (jobGraph.isCheckpointingEnabled()) check (in prod code).
This should be fine - completedCheckpointStore should never be null.
Besides of test, I think checkpointing can be disabled before recovering the job, and this branch should still be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it can be DeactivatedCheckpointCompletedCheckpointStore 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rkhachatryan for looking into. May I know is there any risk for production jobs?
…g for a checkpoint
| public long getInitialTriggeringDelay() { | ||
| return pauseSourcesUntilFirstCheckpoint | ||
| ? ThreadLocalRandom.current() | ||
| .nextLong(minPauseBetweenCheckpoints, minPauseBetweenCheckpoints * 2 + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe add comment / a variable to reason about this math?
| private JobGraph createJobGraph(ExecutionMode mode) { | ||
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
| env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); | ||
| // todo: review the test and timings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo
Efrat19
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for these valuable contributions
| return Arrays.asList(true, false); | ||
| } | ||
|
|
||
| @Parameter public boolean pauseSourcesUntilCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit property declaration to precede method
|
Thanks for the reviews! |
No description provided.