Skip to content

Conversation

@njnu-seafish
Copy link
Contributor

Purpose of the pull request

close #17795

Brief change log

Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available

Verify this pull request

Successfully tested and verified in a real-world environment.

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md

* Maximum time allowed for a task to be successfully dispatched.
* Default: 5 minutes.
*/
private Duration dispatchTimeout = Duration.ofMinutes(5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make it configurable by users in the configuration file and it can be turned off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make it configurable by users in the configuration file and it can be turned off.

good idea!

@SbloodyS SbloodyS added the improvement make more easy to user or prompt friendly label Dec 16, 2025
@SbloodyS SbloodyS added this to the 3.4.0 milestone Dec 16, 2025
* Maximum time allowed for a task to be successfully dispatched.
* Default: 5 minutes.
*/
private Duration dispatchTimeout = Duration.ofMinutes(5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Duration dispatchTimeout = Duration.ofMinutes(5);
private Duration maxTaskDispatchDuration;

dispatchTimeout might confuse with single RPC timeout when dispatch task, and the default value should be null or a large duration, should compatibility with the previous behavior.

And you need to add this in document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxTaskDispatchDuration

ok

private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
final int taskId = taskExecutionRunnable.getId();
final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext();
final long timeoutMs = this.dispatchTimeout.toMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.

Using a duration format like '5m' is more concise and readable. Most other configurations I’ve seen also avoid millisecond-level granularity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can translate the duration into milliseconds in the constructor.

}

private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
final int taskId = taskExecutionRunnable.getId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final int taskId = taskExecutionRunnable.getId();
final int taskExecutionRunnableId = taskExecutionRunnable.getId();

Avoid confuse with task definition id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskExecutionRunnableId

final int taskInstanceId = taskExecutionRunnable.getId(); is it better?

/**
* Get the task instance id.
*

Need to know the id might change since the task instance might be regenerated.
*/
default int getId() {
return getTaskInstance().getId();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use taskExecutionRunnableId , since the outer layer is unaware that this is the taskInstanceId.

* @param elapsed the time (in milliseconds) already spent attempting to dispatch the task
* @param timeoutMs the configured dispatch timeout threshold (in milliseconds)
*/
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.

ok


public WorkerGroupNotFoundException(String workerGroup) {
super("Cannot find worker group: " + workerGroup);
public WorkerGroupNotFoundException(String message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this change?

restore, remove the task info


public class WorkerNotFoundException extends TaskDispatchException {

public WorkerNotFoundException(String message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public WorkerNotFoundException(String message) {
public NoAvailableWorkerException(String workerGroup) {
super("Cannot find available worker under worker group: " + workerGroup);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 50 to 51
final MasterConfig masterConfig = new MasterConfig();
dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getDispatchTimeout());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a validated test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a validated test case.

ok, add test for dispatch timeout checker

Copy link
Contributor Author

@njnu-seafish njnu-seafish Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a validated test case.

I've already validated and tested it in a real environment.
The LogicFakeTask type runs on the Master and does not involve WorkerGroup dispatching.
I track the fix in a separate issue: #17872

Comment on lines 129 to 132
/**
* Timestamp (ms) when the task was first enqueued for dispatch.
*/
private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Timestamp (ms) when the task was first enqueued for dispatch.
*/
private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis();
private final long firstDispatchTime = System.currentTimeMillis();

This is more like the creation time of the taskExecutionContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like the creation time of the taskExecutionContext.

ok

@ruanwenjun ruanwenjun added the feature new feature label Dec 16, 2025
@SbloodyS SbloodyS changed the title [Improvement-17795][Master]Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available [Improvement-17795][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available Dec 29, 2025
@njnu-seafish
Copy link
Contributor Author

@SbloodyS When you have time, could you please help trigger the CI pipeline? I'm unable to run the integration tests locally. Thank you so much!

@SbloodyS
Copy link
Member

@SbloodyS When you have time, could you please help trigger the CI pipeline? I'm unable to run the integration tests locally. Thank you so much!

Done.

@njnu-seafish
Copy link
Contributor Author

@ruanwenjun @SbloodyS Whenever you have time, I’d be grateful for your review. Thanks so much!

Comment on lines 103 to 112
TaskDispatchPolicy dispatchPolicy = masterConfig.getTaskDispatchPolicy();
if (dispatchPolicy != null && dispatchPolicy.isDispatchTimeoutFailedEnabled()) {
if (dispatchPolicy.getMaxTaskDispatchDuration() == null) {
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
"must be specified when dispatch timeout checker is enabled");
} else if (dispatchPolicy.getMaxTaskDispatchDuration().toMillis() <= 0) {
errors.rejectValue("dispatch-timeout-checker.max-task-dispatch-duration", null,
"must be a positive duration (e.g., '2m', '5m', '30m')");
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can dispatchPolicy be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can dispatchPolicy be null?
It indeed cannot be null.
private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy();

} catch (Exception ex) {
if (taskDispatchPolicy.isDispatchTimeoutFailedEnabled()) {
// If a dispatch timeout occurs, the task will not be put back into the queue.
long timeoutMs = this.taskDispatchPolicy.getMaxTaskDispatchDuration().toMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Marks the specified task as fatally failed due to an unrecoverable dispatch error,such as timeout
* Once this method is called, the task is considered permanently failed and will not be retried.
*/
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,
private void onDispatchTimeout(ITaskExecutionRunnable taskExecutionRunnable, Exception ex,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onDispatchTimeout

better

Comment on lines 138 to 158
if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) {
log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
} else if (ExceptionUtils.isNoAvailableWorkerException(ex)) {
log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
} else {
log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (ExceptionUtils.isWorkerGroupNotFoundException(ex)) {
log.error("Dispatch fail, taskName: {}, Worker group not found.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
} else if (ExceptionUtils.isNoAvailableWorkerException(ex)) {
log.error("Dispatch fail, taskName: {}, No available worker.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
} else {
log.error("Dispatch fail, taskName: {}, Unexpected dispatch error.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
log.error("Task: {} dispatch timeout.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error("Task: {} dispatch timeout.", taskName, ex);
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);

ok

# When enabled, tasks not dispatched within this duration are marked as failed.
task-dispatch-policy:
dispatch-timeout-failed-enabled: false
max-task-dispatch-duration: 5m
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
max-task-dispatch-duration: 5m
max-task-dispatch-duration: 1h

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the max-task-dispatch-duration here, since these is not used when dispatch-timeout-failed-enabled is false, only need to add these on the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1h

ok

/**
* Indicates whether the dispatch timeout checking mechanism is enabled.
*/
private boolean dispatchTimeoutFailedEnabled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private boolean dispatchTimeoutFailedEnabled = false;
private boolean dispatchTimeoutEnabled = false;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatchTimeoutEnabled

better

@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend feature new feature improvement make more easy to user or prompt friendly test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement][Master] Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available.

3 participants