-
Notifications
You must be signed in to change notification settings - Fork 320
Description
The TaskHubWorker works very well in the beginning, but in few hours, none of the newly submitted orchestrations starts, it can take several hours for these orchestration to start or never start (I'm not very sure). I observed this unexpected behavior several times but I can't figure it out.
We use Azure storage as backend(Microsoft.Azure.DurableTask.AzureStorage 2.1.0), and host TaskHubWorker in Kubernetes pods as .net core hosted worker.
The Azure storage setup is as follows, logging is turned on and exported to Application Insights
var provider = new StorageAccountClientProvider(options.AzureStorage!.AccountName!, DefaultAzureCredential());
AzureStorageOrchestrationServiceSettings azureStorageSettings =
new AzureStorageOrchestrationServiceSettings
{
TaskHubName = options.TaskHubName,
StorageAccountClientProvider = provider,
PartitionCount = 8,
WorkItemQueueVisibilityTimeout = TimeSpan.FromSeconds(120),
ControlQueueVisibilityTimeout = TimeSpan.FromSeconds(120),
MaxConcurrentTaskActivityWorkItems = 100, // Adjust accordingly
MaxConcurrentTaskOrchestrationWorkItems = 150,
LoggerFactory = this._loggerFactory
};
AzureStorageOrchestrationService azureStorageOrchestrationService =
new AzureStorageOrchestrationService(azureStorageSettings);
await azureStorageOrchestrationService.CreateIfNotExistsAsync();
TaskHubWorker worker = new(
orchestrationService,
new GenericObjectManager<TaskOrchestration>(),
new GenericObjectManager<TaskActivity>(),
loggerFactory);
await taskHubWorker.StartAsync();The simplified version of our orchestration is as follows, the orchestration instance id is randomly generated
public class SampleOrchestration : TaskOrchestration<bool, SampleOrchestrationInput>
{
private readonly ILogger<S2TPerfOrchestration> _logger;
public SampleOrchestration(ILogger<SampleOrchestration> logger)
{
this._logger = logger;
}
protected override async Task<bool> Execute(OrchestrationContext context, SampleOrchestrationInput orchestrationInput)
{
// Create a timer that will fire when the overall deadline is reached. This timer must be canceled before this orchestration execution completes.
using (CancellationTokenSource deadlineCancellationTokenSource = new CancellationTokenSource())
{
**Task deadlineTimer = context.CreateTimer(orchestrationInput.DispatchTimeUtc + TimeSpan.FromHours(8) , "OrchestrationDeadline", deadlineCancellationTokenSource.Token);**
try
{
DateTime nextPoll = context.CurrentUtcDateTime.AddSeconds(60);
await context.CreateTimer(nextPoll, "Scan Job polling delay timer");
PollingActivityInput pollingActivityInput = new PollingActivityInput
{
};
RetryOptions getJobStatusRetryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 3)
{
BackoffCoefficient = 2.0,
MaxRetryInterval = TimeSpan.FromSeconds(60),
Handle = exception =>
{
if (exception is TaskFailedException tfe)
{
return tfe.FailureDetails?.IsCausedBy<TaskCanceledException>() == true
|| tfe.FailureDetails?.IsCausedBy<ActivityTransientException>() == true;
}
return false;
}
};
Task<PollingActivityOutput>? pollingTask =
context.ScheduleWithRetry<PollingActivityOutput>(typeof(PollintActivity), getJobStatusRetryOptions, pollingActivityInput);
Task winner = await Task.WhenAny(pollingTask, deadlineTimer);
if (winner == deadlineTimer)
{
this._logger.LogInformation("Orchestration exceeded the deadline");
OrchestrationTimeoutException ote = new OrchestrationTimeoutException();
throw new OrchestrationException("Orchestration deadline exceeded", ote)
{
FailureDetails = new FailureDetails(ote)
};
}
var pollingTaskResult = await pollingTask;
switch (pollingTaskResult.Result)
{
case Completed:
{
this._logger.LogInformation("Orchestration completed");
return true;
}
case Running:
{
this._logger.LogInformation("Job is running");
context.ContinueAsNew(new SampleOrchestrationInput()
{
DispatchTimeUtc = orchestrationInput.DispatchTimeUtc
});
return true;
}
}
return true;
}
catch (OrchestrationException e)
{
this._logger.LogError(e, "Task Activity throws exception");
// exception handling logic
return false;
}
finally
{
deadlineCancellationTokenSource.Cancel(); // MUST NOT use CancelAsync(), it won't work
}
}
}I was conducting throughput test, completes around 20000 orchestrations without any glitch, but in the next day, I continued the throughput test, but none of the orchestrations can start.
I can see the orchestration is in pending state from the log
But searching the same instance id there is not other logs
But when I found new orchestrations are stuch(not starting), there are huge amount unexpected logs
and full of [TimerFired#0] logs when I start another round of throughput test but found orchestrations are not starting
Sorry I can't get the data from Azure Storage Account, I myself don't have access to the storage account content.
I'm not sure if the following facts contribute to this issue
- I'm pretty sure
[TimerFired#0]is the deadlineTimer in the sample Orchestration code, could it be the cause of the stuck orchestrations? - We have 2 pods (not stateful) on Kubernetes, and we deploy these 2 pods every 3 hours, could it be that the dead pods were holding the leases?
The project mentioned above is a Microsoft internal project, thank you for your help in advance for saving my life.