Skip to content

Conversation

@sophiatev
Copy link
Contributor

@sophiatev sophiatev commented Nov 17, 2025

This PR introduces the extended sessions feature for entities in .NET isolated. Most of the framework was already in place, just a few tweaks needed to be made to the TaskEntityDispatcher to ensure that metadata related to the extended session is retained by the host process while the session is active (and also sent to the worker process via the WorkItemMetadata object). The other changes are simply to make the WorkItemMetadata fields more generic since this object is now being used for both orchestration and entity execution.

Note: I had to change the TaskEntityDispatcher.OnProcessWorkItemAsync method from protected to private since it now accepts a SchedulerState parameter, which is "less visible than the accessibility of the method". Alternatively, I could have changed the visibility of the SchedulerState class. Is this change acceptable?


// We do this to avoid keeping the entity state in memory between executions within the extended session.
// The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session.
schedulerState.EntityState = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the entity state is still kept in memory by the middleware, do we actually gain anything by setting this property to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in the sense that the entity state isn't kept in memory redundantly in multiple places (for example for isolated, we don't want it kept in-memory in both the worker process and the host process)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But your comment mentions that you have this state also cached in middleware. What middleware are you referring to? We have middleware that runs here in the host and also middleware that runs in the language worker, so this is a bit ambiguous to me.

Assuming you're talking about the middleware in the external worker process, is the intent to allow this state to be garbage collector a bit sooner since we only need the state in the first loop iteration but not subsequent iterations?

Copy link
Contributor Author

@sophiatev sophiatev Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I was being intentionally vague to avoid leaking implementation details in this repo. I mean specifically the language worker middleware, yes. Am I allowed to add that specificity here?

And yes, the intent is to allow GC to run on subsequent loop iterations seeing as we only need the state in the first iteration. That being said, it will be "regenerated" every time we commit the state to storage here, based on the result of the entity batch operation. But the intent is to allow it to be garbage collected after this point while we wait for the next batch of messages in the extended session.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you can say instead is that we only send the entity state just once and expect the handler (which may be in another process) to do their own caching as appropriate. We then set the local reference here to null so that the garbage collector can reclaim it as soon as it detects it's not being used rather than always waiting for the session loop to complete (which could theoretically take some time).

I think we have to mention that the entity might be running externally, otherwise setting this to null doesn't really make sense (which could lead someone to misunderstand the intent).

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces extended sessions support for entities in the .NET isolated worker model. Extended sessions allow entities to process multiple batches of operations without re-fetching state from storage between batches, improving performance by leveraging middleware caching. The implementation mirrors the existing extended sessions pattern used for orchestrations.

Key changes:

  • Modified TaskEntityDispatcher to cache SchedulerState across executions within an extended session, only deserializing it once on the first execution
  • Renamed WorkItemMetadata.IncludePastEvents to IncludeState to make the property more generic and applicable to both orchestrations and entities
  • Changed OnProcessWorkItemAsync from protected to private to accept the internal SchedulerState parameter needed for session state caching

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
src/DurableTask.Core/WorkItemMetadata.cs Renamed IncludePastEvents to IncludeState to generalize the property for both orchestration history and entity state, with updated documentation
src/DurableTask.Core/TaskOrchestrationDispatcher.cs Updated to use the renamed IncludeState property instead of IncludePastEvents
src/DurableTask.Core/TaskEntityDispatcher.cs Added extended session support by caching SchedulerState between executions, setting WorkItemMetadata for middleware, and handling session termination when entities are deleted
Comments suppressed due to low confidence (2)

src/DurableTask.Core/TaskEntityDispatcher.cs:196

  • The new extended sessions functionality for entities lacks test coverage. Consider adding tests to verify: 1) Entity state is correctly passed to middleware on first execution and not on subsequent executions within an extended session, 2) The concurrency lock is properly acquired and released, 3) Extended sessions end when an entity is deleted, 4) EntityState is correctly nulled out between executions to save memory while the middleware caches it, 5) WorkItemMetadata.IsExtendedSession and WorkItemMetadata.IncludeState are set correctly throughout the extended session lifecycle.
        async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
        {
            try
            {
                if (workItem.Session == null)
                {
                    // Legacy behavior
                    await this.OnProcessWorkItemAsync(workItem, null);
                    return;
                }

                var concurrencyLockAcquired = false;
                var processCount = 0;
                SchedulerState schedulerState = null;
                try
                {
                    while (true)
                    {
                        // While the work item contains messages that need to be processed, execute them.
                        if (workItem.NewMessages?.Count > 0)
                        {
                            // We only need to acquire the lock on the first execution within the extended session
                            if (!concurrencyLockAcquired)
                            {
                                concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
                            }
                            workItem.IsExtendedSession = concurrencyLockAcquired;
                            // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
                            // If we failed to acquire it, we will end the extended session after this execution.
                            schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);

                            // The entity has been deleted, so we end the extended session.
                            if (this.EntityIsDeleted(schedulerState))
                            {
                                break;
                            }

                            // We do this to avoid keeping the entity state in memory between executions within the extended session.
                            // The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session.
                            schedulerState.EntityState = null;

                            processCount++;
                        }

                        // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
                        if (processCount > 0 && !concurrencyLockAcquired)
                        {
                            break;
                        }

                        Stopwatch timer = Stopwatch.StartNew();

                        // Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
                        // until either new messages are available or until a provider-specific timeout has expired.
                        workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
                        if (workItem.NewMessages == null)
                        {
                            break;
                        }

                        workItem.OrchestrationRuntimeState.NewEvents.Clear();
                    }
                }
                finally
                {
                    if (concurrencyLockAcquired)
                    {
                        this.concurrentSessionLock.Release();
                    }
                }

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings December 17, 2025 19:24
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

src/DurableTask.Core/TaskEntityDispatcher.cs:196

  • The extended session functionality for entities is a significant feature addition, but there don't appear to be any tests covering this new behavior. Consider adding tests that verify: 1) scheduler state is properly cached across executions within an extended session, 2) the WorkItemMetadata is correctly set with IsExtendedSession and IncludeState flags, 3) entity state is properly nulled after the first execution, and 4) the extended session ends correctly when the concurrency lock cannot be acquired or when an entity is deleted.
                var concurrencyLockAcquired = false;
                var processCount = 0;
                SchedulerState schedulerState = null;
                try
                {
                    while (true)
                    {
                        // While the work item contains messages that need to be processed, execute them.
                        if (workItem.NewMessages?.Count > 0)
                        {
                            // We only need to acquire the lock on the first execution within the extended session
                            if (!concurrencyLockAcquired)
                            {
                                concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
                            }
                            workItem.IsExtendedSession = concurrencyLockAcquired;
                            // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
                            // If we failed to acquire it, we will end the extended session after this execution.
                            schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);

                            // The entity has been deleted, so we end the extended session.
                            if (this.EntityIsDeleted(schedulerState))
                            {
                                break;
                            }

                            // We do this to avoid keeping the entity state in memory between executions within the extended session.
                            // The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session.
                            schedulerState.EntityState = null;

                            processCount++;
                        }

                        // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
                        if (processCount > 0 && !concurrencyLockAcquired)
                        {
                            break;
                        }

                        Stopwatch timer = Stopwatch.StartNew();

                        // Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
                        // until either new messages are available or until a provider-specific timeout has expired.
                        workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
                        if (workItem.NewMessages == null)
                        {
                            break;
                        }

                        workItem.OrchestrationRuntimeState.NewEvents.Clear();
                    }
                }
                finally
                {
                    if (concurrencyLockAcquired)
                    {
                        this.concurrentSessionLock.Release();
                    }
                }

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings December 17, 2025 22:46
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

src/DurableTask.Core/TaskEntityDispatcher.cs:205

  • The extended sessions feature for entities lacks test coverage. While there are comprehensive tests for orchestrations with extended sessions, no tests were added to verify that the entity extended session behavior works correctly. Consider adding tests that verify:
  1. Entities can execute multiple operations within an extended session
  2. The scheduler state is properly cached across executions
  3. Entity state is not unnecessarily re-transmitted to middleware after the first execution
  4. The session terminates correctly when the concurrency lock cannot be acquired
  5. The session terminates correctly when an entity is deleted
        async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
        {
            try
            {
                if (workItem.Session == null)
                {
                    // Legacy behavior
                    await this.OnProcessWorkItemAsync(workItem, null);
                    return;
                }

                var concurrencyLockAcquired = false;
                var processCount = 0;
                SchedulerState schedulerState = null;
                try
                {
                    while (true)
                    {
                        // While the work item contains messages that need to be processed, execute them.
                        if (workItem.NewMessages?.Count > 0)
                        {
                            // We only need to acquire the lock on the first execution within the extended session
                            if (!concurrencyLockAcquired)
                            {
                                concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
                            }
                            workItem.IsExtendedSession = concurrencyLockAcquired;
                            // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
                            // If we failed to acquire it, we will end the extended session after this execution.
                            schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);

                            // The entity has been deleted, so we end the extended session.
                            if (this.EntityIsDeleted(schedulerState))
                            {
                                break;
                            }

                            // We do this to avoid keeping the entity state in memory between executions within the extended session.
                            // The middleware will keep the entity state cached, so the scheduler state does not need to retain the entity state to fulfill requests within the extended session.
                            schedulerState.EntityState = null;

                            processCount++;
                        }

                        // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
                        if (processCount > 0 && !concurrencyLockAcquired)
                        {
                            break;
                        }

                        Stopwatch timer = Stopwatch.StartNew();

                        // Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
                        // until either new messages are available or until a provider-specific timeout has expired.
                        workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
                        if (workItem.NewMessages == null)
                        {
                            break;
                        }

                        workItem.OrchestrationRuntimeState.NewEvents.Clear();
                    }
                }
                finally
                {
                    if (concurrencyLockAcquired)
                    {
                        this.concurrentSessionLock.Release();
                    }
                }
            }
            catch (SessionAbortedException e)
            {
                // Either the orchestration or the orchestration service explicitly abandoned the session.
                OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
                this.logHelper.OrchestrationAborted(instance, e.Message);
                await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
            }
        }

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


// We do this to avoid keeping the entity state in memory between executions within the extended session.
// The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session.
schedulerState.EntityState = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But your comment mentions that you have this state also cached in middleware. What middleware are you referring to? We have middleware that runs here in the host and also middleware that runs in the language worker, so this is a bit ambiguous to me.

Assuming you're talking about the middleware in the external worker process, is the intent to allow this state to be garbage collector a bit sooner since we only need the state in the first loop iteration but not subsequent iterations?

Copilot AI review requested due to automatic review settings December 18, 2025 19:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings December 18, 2025 20:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

src/DurableTask.Core/TaskEntityDispatcher.cs:196

  • The extended sessions logic for entities lacks test coverage. The changes in this method handle critical scenarios such as:
  • Acquiring and managing the concurrency lock
  • Maintaining scheduler state across multiple executions
  • Handling entity deletion during an extended session
  • Nullifying entity state to avoid memory retention

Consider adding tests to verify:

  1. Extended sessions work correctly when the concurrency lock is successfully acquired
  2. The session terminates after one execution if the lock cannot be acquired
  3. Scheduler state is properly preserved across executions within a session
  4. Entity state is correctly nullified after each execution
  5. The session terminates when an entity is deleted

Tests for extended sessions exist for orchestrations (e.g., in AzureStorageScenarioTests), but there don't appear to be equivalent tests for entities.

        async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
        {
            try
            {
                if (workItem.Session == null)
                {
                    // Legacy behavior
                    await this.OnProcessWorkItemAsync(workItem, null);
                    return;
                }

                var concurrencyLockAcquired = false;
                var processCount = 0;
                SchedulerState schedulerState = null;
                try
                {
                    while (true)
                    {
                        // While the work item contains messages that need to be processed, execute them.
                        if (workItem.NewMessages?.Count > 0)
                        {
                            // We only need to acquire the lock on the first execution within the extended session
                            if (!concurrencyLockAcquired)
                            {
                                concurrencyLockAcquired = this.concurrentSessionLock.Acquire();
                            }
                            workItem.IsExtendedSession = concurrencyLockAcquired;
                            // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
                            // If we failed to acquire it, we will end the extended session after this execution.
                            schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);

                            // The entity has been deleted, so we end the extended session.
                            if (this.EntityIsDeleted(schedulerState))
                            {
                                break;
                            }

                            // We do this to avoid keeping the entity state in memory between executions within the extended session.
                            // The middleware will keep the entity state cached, so the scheduler state does not need to retain the entity state to fulfill requests within the extended session.
                            schedulerState.EntityState = null;

                            processCount++;
                        }

                        // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
                        if (processCount > 0 && !concurrencyLockAcquired)
                        {
                            break;
                        }

                        Stopwatch timer = Stopwatch.StartNew();

                        // Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
                        // until either new messages are available or until a provider-specific timeout has expired.
                        workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
                        if (workItem.NewMessages == null)
                        {
                            break;
                        }

                        workItem.OrchestrationRuntimeState.NewEvents.Clear();
                    }
                }
                finally
                {
                    if (concurrencyLockAcquired)
                    {
                        this.concurrentSessionLock.Release();
                    }
                }

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings December 18, 2025 20:23
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

src/DurableTask.Core/TaskEntityDispatcher.cs:471

  • Both branches of this 'if' statement return - consider using '?' to express intent better.
            if (this.EntityIsDeleted(schedulerState))
            {
                // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
                // we convey this to the durability provider by issuing a continue-as-new with null input
                return null;
            }
            else
            {
                // we persist the state of the entity scheduler and entity
                return JsonConvert.SerializeObject(schedulerState, typeof(SchedulerState), Serializer.InternalSerializerSettings);
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants