Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,37 +131,48 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
await this.OnProcessWorkItemAsync(workItem, null);
return;
}

var isExtendedSession = false;

var concurrencyLockAcquired = false;
var processCount = 0;
SchedulerState schedulerState = null;
try
{
while (true)
{
// While the work item contains messages that need to be processed, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
// We only need to acquire the lock on the first execution within the extended session
if (!concurrencyLockAcquired)
{
concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
}
workItem.IsExtendedSession = concurrencyLockAcquired;
// Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
// If we failed to acquire it, we will end the extended session after this execution.
schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);

// The entity has been deleted, so we end the extended session.
if (this.EntityIsDeleted(schedulerState))
{
break;
}

// When extended sessions are enabled, the handler caches the entity state after the first execution of the extended session, so there
// is no need to retain a reference to it here.
// We set the local reference to null so that the entity state can be garbage collected while we wait for more messages to arrive.
schedulerState.EntityState = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the entity state is still kept in memory by the middleware, do we actually gain anything by setting this property to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in the sense that the entity state isn't kept in memory redundantly in multiple places (for example for isolated, we don't want it kept in-memory in both the worker process and the host process)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But your comment mentions that you have this state also cached in middleware. What middleware are you referring to? We have middleware that runs here in the host and also middleware that runs in the language worker, so this is a bit ambiguous to me.

Assuming you're talking about the middleware in the external worker process, is the intent to allow this state to be garbage collector a bit sooner since we only need the state in the first loop iteration but not subsequent iterations?

Copy link
Contributor Author

@sophiatev sophiatev Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I was being intentionally vague to avoid leaking implementation details in this repo. I mean specifically the language worker middleware, yes. Am I allowed to add that specificity here?

And yes, the intent is to allow GC to run on subsequent loop iterations seeing as we only need the state in the first iteration. That being said, it will be "regenerated" every time we commit the state to storage here, based on the result of the entity batch operation. But the intent is to allow it to be garbage collected after this point while we wait for the next batch of messages in the extended session.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you can say instead is that we only send the entity state just once and expect the handler (which may be in another process) to do their own caching as appropriate. We then set the local reference here to null so that the garbage collector can reclaim it as soon as it detects it's not being used rather than always waiting for the session loop to complete (which could theoretically take some time).

I think we have to mention that the entity might be running externally, otherwise setting this to null doesn't really make sense (which could lead someone to misunderstand the intent).


processCount++;
}

// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
// If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
if (processCount > 0 && !concurrencyLockAcquired)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
break;
}
break;
}

Stopwatch timer = Stopwatch.StartNew();
Expand All @@ -179,7 +190,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
}
finally
{
if (isExtendedSession)
if (concurrencyLockAcquired)
{
this.concurrentSessionLock.Release();
}
Expand Down Expand Up @@ -208,7 +219,9 @@ internal class WorkItemEffects
/// Method to process a new work item
/// </summary>
/// <param name="workItem">The work item to process</param>
protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
/// <param name="schedulerState">If extended sessions are enabled, the scheduler state that is being cached across executions.
/// If they are not enabled, or if this is the first execution from within an extended session, this parameter is null.</param>
private async Task<SchedulerState> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState)
{
OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;

Expand Down Expand Up @@ -245,19 +258,20 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
}
else
{
bool firstExecutionIfExtendedSession = schedulerState == null;

// we start with processing all the requests and figuring out which ones to execute now
// results can depend on whether the entity is locked, what the maximum batch size is,
// and whether the messages arrived out of order

this.DetermineWork(workItem.OrchestrationRuntimeState,
out SchedulerState schedulerState,
ref schedulerState,
out Work workToDoNow);

if (workToDoNow.OperationCount > 0)
{
// execute the user-defined operations on this entity, via the middleware
var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecutionIfExtendedSession);
var operationResults = result.Results!;

// if we encountered an error, record it as the result of the operations
Expand Down Expand Up @@ -417,7 +431,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem.OrchestrationRuntimeState = runtimeState;
}

return true;
return schedulerState;
}

void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request)
Expand Down Expand Up @@ -445,7 +459,7 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState,

string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState)
{
if (this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended)
if (this.EntityIsDeleted(schedulerState))
{
// this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
// we convey this to the durability provider by issuing a continue-as-new with null input
Expand All @@ -460,10 +474,11 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState)

#region Preprocess to determine work

void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch)
void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch)
{
string instanceId = runtimeState.OrchestrationInstance.InstanceId;
schedulerState = new SchedulerState();
bool deserializeState = schedulerState == null;
schedulerState ??= new();
batch = new Work();

Queue<RequestMessage> lockHolderMessages = null;
Expand All @@ -474,8 +489,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc
{
case EventType.ExecutionStarted:


if (runtimeState.Input != null)
// Only attempt to deserialize the scheduler state if we don't already have it in memory.
// This occurs on the first execution within an extended session, or when extended sessions are disabled.
if (runtimeState.Input != null && deserializeState)
{
try
{
Expand Down Expand Up @@ -624,6 +640,11 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc
}
}

bool EntityIsDeleted(SchedulerState schedulerState)
{
return schedulerState != null && this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended;
}

class Work
{
List<RequestMessage> operationBatch; // a (possibly empty) sequence of operations to be executed on the entity
Expand Down Expand Up @@ -931,7 +952,7 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt

#endregion

async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState)
async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState)
{
var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId);
// the request object that will be passed to the worker
Expand All @@ -954,6 +975,7 @@ async Task<EntityBatchResult> ExecuteViaMiddlewareAsync(Work workToDoNow, Orches

var dispatchContext = new DispatchMiddlewareContext();
dispatchContext.SetProperty(request);
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = isExtendedSession, IncludeState = includeEntityState });

await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
{
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ async Task<OrchestrationExecutionCursor> ExecuteOrchestrationAsync(Orchestration
dispatchContext.SetProperty(workItem);
dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState));
dispatchContext.SetProperty(this.entityParameters);
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true });
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludeState = true });

TaskOrchestrationExecutor? executor = null;

Expand Down Expand Up @@ -833,7 +833,7 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem)
dispatchContext.SetProperty(cursor.TaskOrchestration);
dispatchContext.SetProperty(cursor.RuntimeState);
dispatchContext.SetProperty(workItem);
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false });
dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludeState = false });

cursor.LatestDecisions = Enumerable.Empty<OrchestratorAction>();
await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
Expand Down
7 changes: 4 additions & 3 deletions src/DurableTask.Core/WorkItemMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ public class WorkItemMetadata
public bool IsExtendedSession { get; set; }

/// <summary>
/// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware.
/// This assumes that the middleware is able to handle extended sessions and does not require history for replays.
/// Gets or sets whether or not to include instance state when executing the work item via middleware.
/// When false, this assumes that the middleware is able to handle extended sessions and has already cached
/// the instance state from a previous execution, so it does not need to be included again.
/// </summary>
public bool IncludePastEvents { get; set; }
public bool IncludeState { get; set; }
}
}
Loading