-
Notifications
You must be signed in to change notification settings - Fork 792
Implemented FiringMode.Serial as a separate worker task #640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Implemented FiringMode.Serial as a separate worker task #640
Conversation
|
@gao-artur What do you think about this implementation? This should fix the issue raised by you on the previous PR in regards to locking the initial caller to Fire. However exception handling becomes a bit iffy. |
|
Too many gotcha's. IMO, in the current form, this is just not usable. The user will have to write a significant amount of code to make this mode work as expected, which defeats the original proposal's idea of simplifying concurrent usage. Additionally, you have added a new |
Would be curios to see your implementation because I am really unsure what you are going for. Like what does concretely "as expected" mean to you? In the original issue you proposed a solution with a SemaphoreSlim which deadlocks on recursive Fire calls. |
|
Yes, public class MyStateMachine: IAsyncDisposable
{
private readonly StateMachine<State, Trigger> _machine = new StateMachine<State, Trigger>(State.Stopped, FiringMode.Queued);
private readonly BlockingPriorityQueue<FiringTask, int> _firingQueue = new();
private readonly CancellationTokenSource _queueWorkerCanceler = new();
private readonly Task _queueWorkerTask;
public MyStateMachine()
{
_queueWorkerTask = Task.Run(async () => await QueueWorker(_queueWorkerCanceler.Token));
}
public async Task StartAsync(CancellationToken token)
{
await WaitForExecutionAsync(
() => _machine.FireAsync(_startTrigger, token),
_startTrigger.Trigger);
}
public async ValueTask DisposeAsync()
{
_queueWorkerCanceler.Cancel();
await _queueWorkerTask;
}
private async Task WaitForExecutionAsync(Func<Task> task, Trigger trigger)
{
var item = new FiringTask
{
Task = task,
Trigger = trigger,
TaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)
};
await _firingQueue.Enqueue(item, trigger.Priority);
await item.TaskCompletionSource.Task;
}
private async Task QueueWorker(CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
var item = await _firingQueue.Dequeue(token);
try
{
await item.Task();
item.TaskCompletionSource.SetResult();
}
catch (OperationCanceledException e)
{
item.TaskCompletionSource.SetCanceled(e.CancellationToken);
}
catch (Exception e)
{
item.TaskCompletionSource.SetException(e);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Queue worker cancelled");
}
catch (Exception e)
{
_logger.LogError(e, "Queue worker failed");
}
}
}The goal of the public async Task StartAsync(CancellationToken token)
{
await _machine.FireAsync(_startTrigger, token);
}Priority queue support is a different story. |
|
What I'm seeing is something that has a baked in _startTrigger trigger. I assume this is is part of a very specific SM you built and you mean something generic would actually expose parameters allowing you to pass in the trigger + args. I understand that is probably a snippet you stripped from a specific implementation so I won't pick on the details, but I think what I'm seeing would still deadlock on recursive calls on WaitForExecutionAsync (because the current Firing task would be stuck on waiting the next firing task to complete, but that can't complete because the QueueWorker is waiting for the current task to finish). Other than that, I don't see how this is much different from this PR (aside from having the queue locking details moved to a probably custom class called BlockingPriorityQueue). I very intentionally made it so that the default Fire method always returns early and implemented the separate |
|
Another reason why I chose to make Fire/FireAsync return right after enqueueing rather than after the trigger has been processed (which is what FireAndWaitAsync does) was because I find it to be consistent with how In the sense that |
… in SerialModeThreadSafetyFixture.
That's correct.
Recursive calls are handled by the _eventQueue, not by the
Honestly, I didn't review your implementation, just looked at a list of gotchas in the description.
Yes, I understood that. But this decision introduces an additional gotcha that complicates the usage of the mode. I also understand that it was done to be consistent with the |
Through which mechanism do they end up on the _eventQueue? If all calls to Fire pass through QueueWorker, then I don't see how it won't deadlock because Are you mixing calls to WaitForExecutionAsync with direct calls to the State Machine's FireAsync method, like calling WaitForExecutionAsync from the outside of the SM and only direct calls to FireAsync from within the SM? If so, I don't see how you would integrate that into the existing Stateless API without creating new methods specific for the Serial mode.
I agree that that part is confusing, but to be honest I don't see how they could've done it any other way. Because at the end of the day it comes to tradeoffs, and you fundamentally can't make FireAsync await for the actual execution of the queued event, unless you are willing to accept the tradeoff that users will always deadlock in recursive calls unless they do fire and forget.
I mean, honestly the only 2 gotcha I see are:
It's true that FireAndWaitAsync only works as expected on Immediate and Serial modes, and on Queued mode it's a lie because it either returns too late or too early. But introducing the TaskCompletionSource pattern to Queued mode too would've required adding some overhead and I wasn't sure if the maintainers would like that (as I got the vibe that they don't want to affect the performance in existing modes, otherwise we could've just directly added the locks to the Queued mode).
Same here. I'm curious about your opinion because you made some valid points so far. However I think making a perfect design is pretty much impossible because trade-offs need to be made and ultimately it's going to be up to the maintainers to decide which path would make the most sense for this particular library (if they're going to decide on going forward with Serial mode at all, that is). |
DropUnprocessedEventsOnErrorInSerialModefrom the previous PR.await FireAndWaitAsync(trigger). Warning, if you call and awaitFireAndWaitAsyncinside a state transition event, you risk deadlocking (because the current transition now waits for a future transition to be completed).await GetSerialEventsWorkerTask().await FireAndWaitAsync(trigger)).CancelPendingTriggers().Resolves #527