-
Notifications
You must be signed in to change notification settings - Fork 320
Extended sessions for entities in .NET isolated #1265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fe43673
c21ab15
ca82e63
850aaa1
9bf8aa8
5a53c89
9dfea22
699ff9c
402e3de
81d06bc
1294b28
0ffeccd
4e4c7b0
3ec7f8f
02ec8ea
6dcef7e
0bce0b1
3fcca42
bb217e9
cc03f08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,37 +131,48 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) | |
| if (workItem.Session == null) | ||
| { | ||
| // Legacy behavior | ||
| await this.OnProcessWorkItemAsync(workItem); | ||
| await this.OnProcessWorkItemAsync(workItem, null); | ||
| return; | ||
| } | ||
|
|
||
| var isExtendedSession = false; | ||
|
|
||
| var concurrencyLockAcquired = false; | ||
| var processCount = 0; | ||
| SchedulerState schedulerState = null; | ||
| try | ||
| { | ||
| while (true) | ||
| { | ||
| // While the work item contains messages that need to be processed, execute them. | ||
| if (workItem.NewMessages?.Count > 0) | ||
| { | ||
| bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem); | ||
| if (isCompletedOrInterrupted) | ||
| // We only need to acquire the lock on the first execution within the extended session | ||
| if (!concurrencyLockAcquired) | ||
| { | ||
| concurrencyLockAcquired = this.concurrentSessionLock.Acquire(); | ||
| } | ||
| workItem.IsExtendedSession = concurrencyLockAcquired; | ||
| // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item. | ||
| // If we failed to acquire it, we will end the extended session after this execution. | ||
| schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState); | ||
|
|
||
| // The entity has been deleted, so we end the extended session. | ||
| if (this.EntityIsDeleted(schedulerState)) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| // When extended sessions are enabled, the handler caches the entity state after the first execution of the extended session, so there | ||
| // is no need to retain a reference to it here. | ||
| // We set the local reference to null so that the entity state can be garbage collected while we wait for more messages to arrive. | ||
| schedulerState.EntityState = null; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the entity state is still kept in memory by the middleware, do we actually gain anything by setting this property to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, in the sense that the entity state isn't kept in memory redundantly in multiple places (for example for isolated, we don't want it kept in-memory in both the worker process and the host process)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But your comment mentions that you have this state also cached in middleware. What middleware are you referring to? We have middleware that runs here in the host and also middleware that runs in the language worker, so this is a bit ambiguous to me. Assuming you're talking about the middleware in the external worker process, is the intent to allow this state to be garbage collector a bit sooner since we only need the state in the first loop iteration but not subsequent iterations?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh sorry, I was being intentionally vague to avoid leaking implementation details in this repo. I mean specifically the language worker middleware, yes. Am I allowed to add that specificity here? And yes, the intent is to allow GC to run on subsequent loop iterations seeing as we only need the state in the first iteration. That being said, it will be "regenerated" every time we commit the state to storage here, based on the result of the entity batch operation. But the intent is to allow it to be garbage collected after this point while we wait for the next batch of messages in the extended session.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what you can say instead is that we only send the entity state just once and expect the handler (which may be in another process) to do their own caching as appropriate. We then set the local reference here to null so that the garbage collector can reclaim it as soon as it detects it's not being used rather than always waiting for the session loop to complete (which could theoretically take some time). I think we have to mention that the entity might be running externally, otherwise setting this to null doesn't really make sense (which could lead someone to misunderstand the intent).
sophiatev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| processCount++; | ||
| } | ||
|
|
||
| // Fetches beyond the first require getting an extended session lock, used to prevent starvation. | ||
| if (processCount > 0 && !isExtendedSession) | ||
| // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item | ||
| if (processCount > 0 && !concurrencyLockAcquired) | ||
| { | ||
| isExtendedSession = this.concurrentSessionLock.Acquire(); | ||
| if (!isExtendedSession) | ||
| { | ||
| break; | ||
| } | ||
| break; | ||
| } | ||
|
|
||
| Stopwatch timer = Stopwatch.StartNew(); | ||
|
|
@@ -179,7 +190,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) | |
| } | ||
| finally | ||
| { | ||
| if (isExtendedSession) | ||
| if (concurrencyLockAcquired) | ||
| { | ||
| this.concurrentSessionLock.Release(); | ||
| } | ||
|
|
@@ -208,7 +219,9 @@ internal class WorkItemEffects | |
| /// Method to process a new work item | ||
| /// </summary> | ||
| /// <param name="workItem">The work item to process</param> | ||
| protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem) | ||
| /// <param name="schedulerState">If extended sessions are enabled, the scheduler state that is being cached across executions. | ||
| /// If they are not enabled, or if this is the first execution from within an extended session, this parameter is null.</param> | ||
| private async Task<SchedulerState> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState) | ||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
sophiatev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; | ||
|
|
||
|
|
@@ -245,19 +258,20 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work | |
| } | ||
| else | ||
| { | ||
| bool firstExecutionIfExtendedSession = schedulerState == null; | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
|
|
||
sophiatev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // we start with processing all the requests and figuring out which ones to execute now | ||
| // results can depend on whether the entity is locked, what the maximum batch size is, | ||
| // and whether the messages arrived out of order | ||
|
|
||
| this.DetermineWork(workItem.OrchestrationRuntimeState, | ||
| out SchedulerState schedulerState, | ||
| ref schedulerState, | ||
| out Work workToDoNow); | ||
|
|
||
| if (workToDoNow.OperationCount > 0) | ||
| { | ||
| // execute the user-defined operations on this entity, via the middleware | ||
| var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState); | ||
| var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecutionIfExtendedSession); | ||
sophiatev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| var operationResults = result.Results!; | ||
|
|
||
| // if we encountered an error, record it as the result of the operations | ||
|
|
@@ -417,7 +431,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( | |
| workItem.OrchestrationRuntimeState = runtimeState; | ||
| } | ||
|
|
||
| return true; | ||
| return schedulerState; | ||
| } | ||
|
|
||
| void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) | ||
|
|
@@ -445,7 +459,7 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, | |
|
|
||
| string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) | ||
| { | ||
| if (this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended) | ||
| if (this.EntityIsDeleted(schedulerState)) | ||
| { | ||
| // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage | ||
| // we convey this to the durability provider by issuing a continue-as-new with null input | ||
|
|
@@ -460,10 +474,11 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) | |
|
|
||
| #region Preprocess to determine work | ||
|
|
||
| void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch) | ||
| void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch) | ||
| { | ||
| string instanceId = runtimeState.OrchestrationInstance.InstanceId; | ||
| schedulerState = new SchedulerState(); | ||
| bool deserializeState = schedulerState == null; | ||
| schedulerState ??= new(); | ||
| batch = new Work(); | ||
|
|
||
| Queue<RequestMessage> lockHolderMessages = null; | ||
|
|
@@ -474,8 +489,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc | |
| { | ||
| case EventType.ExecutionStarted: | ||
|
|
||
|
|
||
| if (runtimeState.Input != null) | ||
| // Only attempt to deserialize the scheduler state if we don't already have it in memory. | ||
| // This occurs on the first execution within an extended session, or when extended sessions are disabled. | ||
| if (runtimeState.Input != null && deserializeState) | ||
| { | ||
| try | ||
| { | ||
|
|
@@ -624,6 +640,11 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc | |
| } | ||
| } | ||
|
|
||
| bool EntityIsDeleted(SchedulerState schedulerState) | ||
| { | ||
| return schedulerState != null && this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended; | ||
| } | ||
|
|
||
| class Work | ||
| { | ||
| List<RequestMessage> operationBatch; // a (possibly empty) sequence of operations to be executed on the entity | ||
|
|
@@ -931,7 +952,7 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt | |
|
|
||
| #endregion | ||
|
|
||
| async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState) | ||
| async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState) | ||
| { | ||
| var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId); | ||
| // the request object that will be passed to the worker | ||
|
|
@@ -954,6 +975,7 @@ async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, Orches | |
|
|
||
| var dispatchContext = new DispatchMiddlewareContext(); | ||
| dispatchContext.SetProperty(request); | ||
| dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = isExtendedSession, IncludeState = includeEntityState }); | ||
|
|
||
| await this.dispatchPipeline.RunAsync(dispatchContext, async _ => | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.