-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-17795][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available #17796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it configurable by users in the configuration file and it can be turned off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it configurable by users in the configuration file and it can be turned off.
good idea!
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Duration dispatchTimeout = Duration.ofMinutes(5); | |
| private Duration maxTaskDispatchDuration; |
dispatchTimeout might confuse with single RPC timeout when dispatch task, and the default value should be null or a large duration, should compatibility with the previous behavior.
And you need to add this in document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxTaskDispatchDuration
ok
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); | ||
| final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext(); | ||
| final long timeoutMs = this.dispatchTimeout.toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute
dispatchTimeout.toMillis()here.
Using a duration format like '5m' is more concise and readable. Most other configurations I’ve seen also avoid millisecond-level granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can translate the duration into milliseconds in the constructor.
| } | ||
|
|
||
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final int taskId = taskExecutionRunnable.getId(); | |
| final int taskExecutionRunnableId = taskExecutionRunnable.getId(); |
Avoid confuse with task definition id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskExecutionRunnableId
final int taskInstanceId = taskExecutionRunnable.getId(); is it better?
/**
* Get the task instance id.
*
Need to know the id might change since the task instance might be regenerated.
*/
default int getId() {
return getTaskInstance().getId();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use taskExecutionRunnableId , since the outer layer is unaware that this is the taskInstanceId.
| * @param elapsed the time (in milliseconds) already spent attempting to dispatch the task | ||
| * @param timeoutMs the configured dispatch timeout threshold (in milliseconds) | ||
| */ | ||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the
taskIdandworkflowIdhere, all ids should already be added by MDC. We should only need to print the exception here, theexceptionalready contains failure message.
ok
|
|
||
| public WorkerGroupNotFoundException(String workerGroup) { | ||
| super("Cannot find worker group: " + workerGroup); | ||
| public WorkerGroupNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this change?
restore, remove the task info
|
|
||
| public class WorkerNotFoundException extends TaskDispatchException { | ||
|
|
||
| public WorkerNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public WorkerNotFoundException(String message) { | |
| public NoAvailableWorkerException(String workerGroup) { | |
| super("Cannot find available worker under worker group: " + workerGroup); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| final MasterConfig masterConfig = new MasterConfig(); | ||
| dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getDispatchTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a validated test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a validated test case.
ok, add test for dispatch timeout checker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a validated test case.
I've already validated and tested it in a real environment.
The LogicFakeTask type runs on the Master and does not involve WorkerGroup dispatching.
I track the fix in a separate issue: #17872
| /** | ||
| * Timestamp (ms) when the task was first enqueued for dispatch. | ||
| */ | ||
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** | |
| * Timestamp (ms) when the task was first enqueued for dispatch. | |
| */ | |
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); | |
| private final long firstDispatchTime = System.currentTimeMillis(); |
This is more like the creation time of the taskExecutionContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like the creation time of the
taskExecutionContext.
ok
877db06 to
d7bf4fa
Compare
|
@SbloodyS When you have time, could you please help trigger the CI pipeline? I'm unable to run the integration tests locally. Thank you so much! |
Done. |
|
@ruanwenjun @SbloodyS Whenever you have time, I’d be grateful for your review. Thanks so much! |
| TaskDispatchPolicy dispatchPolicy = masterConfig.getTaskDispatchPolicy(); | ||
| if (dispatchPolicy != null && dispatchPolicy.isDispatchTimeoutFailedEnabled()) { | ||
| if (dispatchPolicy.getMaxTaskDispatchDuration() == null) { | ||
| errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null, | ||
| "must be specified when dispatch timeout checker is enabled"); | ||
| } else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) { | ||
| errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null, | ||
| "must be a positive duration (e.g., '2m', '5m', '30m')"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can dispatchPolicy be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can dispatchPolicy be null?
It indeed cannot be null.
private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();
| } catch (Exception ex) { | ||
| if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) { | ||
| // If a dispatch timeout occurs, the task will not be put back into the queue. | ||
| long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Marks the specified task as fatally failed due to an unrecoverable dispatch error,such as timeout | ||
| * Once this method is called, the task is considered permanently failed and will not be retried. | ||
| */ | ||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, | |
| private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onDispatchTimeout
better
| if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) { | ||
| log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | ||
| } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) { | ||
| log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | ||
| } else { | ||
| log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex); | ||
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | ||
| .taskExecutionRunnable(taskExecutionRunnable) | ||
| .endTime(new Date()) | ||
| .build(); | ||
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else if (ExceptionUtils.isNoAvailableWorkerException(ex)) { | |
| log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| } else { | |
| log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); | |
| log.error("Task: {} dispatch timeout.", taskName, ex); | |
| final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder() | |
| .taskExecutionRunnable(taskExecutionRunnable) | |
| .endTime(new Date()) | |
| .build(); | |
| taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("Task: {} dispatch timeout.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
ok
| # When enabled, tasks not dispatched within this duration are marked as failed. | ||
| task-dispatch-policy: | ||
| dispatch-timeout-failed-enabled: false | ||
| max-task-dispatch-duration: 5m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| max-task-dispatch-duration: 5m | |
| max-task-dispatch-duration: 1h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the max-task-dispatch-duration here, since these is not used when dispatch-timeout-failed-enabled is false, only need to add these on the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1h
ok
| /** | ||
| * Indicates whether the dispatch timeout checking mechanism is enabled. | ||
| */ | ||
| private boolean dispatchTimeoutFailedEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private boolean dispatchTimeoutFailedEnabled = false; | |
| private boolean dispatchTimeoutEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dispatchTimeoutEnabled
better
|



Purpose of the pull request
close #17795
Brief change log
Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available
Verify this pull request
Successfully tested and verified in a real-world environment.
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md