-
Notifications
You must be signed in to change notification settings - Fork 53
Introduce WorkItemFilters into worker flow #616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This change adds WorkItemFilters into the grpc worker. This includes builder methods to specify them and the connection into the GetWorkItems flow inside the worker processor. Signed-off-by: Hal Spang <halspang@microsoft.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces WorkItemFilters functionality to the gRPC worker, enabling workers to specify which orchestrations, activities, and entities they can process. This prevents workers from receiving work items they cannot handle.
Changes:
- Added
DurableTaskWorkerWorkItemFiltersclass with filter structs for orchestrations, activities, and entities - Extended builder API with
UseWorkItemFilters()method supporting both explicit and auto-generated filters - Integrated filters into gRPC worker constructor and GetWorkItems request flow
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs | New public API class defining filter structures with support for versioning |
| src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs | Extension method for registering filters with auto-generation from registry |
| src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs | Extension to convert SDK filters to gRPC protobuf types |
| src/Worker/Grpc/GrpcDurableTaskWorker.cs | Constructor updated to accept optional workItemFilters parameter |
| src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs | Integration of filters into GetWorkItems request |
| src/Grpc/orchestrator_service.proto | Protobuf definitions for WorkItemFilters and related filter types |
| test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs | Comprehensive tests for filter registration and auto-generation scenarios |
src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs
Outdated
Show resolved
Hide resolved
src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs
Outdated
Show resolved
Hide resolved
src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs
Outdated
Show resolved
Hide resolved
src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs
Outdated
Show resolved
Hide resolved
| { | ||
| DurableTaskRegistry registry = provider.GetRequiredService<IOptionsMonitor<DurableTaskRegistry>>().Get(builder.Name); | ||
| DurableTaskWorkerOptions? options = provider.GetOptions<DurableTaskWorkerOptions>(builder.Name); | ||
| return new DurableTaskWorkerWorkItemFilters(registry, options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what the difference is between the following two options:
- I build a worker without configuring filtering (basically, what people are doing today)
- I build a worker with
.UseWorkItemFilters()(no arguments)
Intuitively I wouldn't expect there to be a scenario where I'd use filters without any configuration, so I need a bit of help understanding the use case for option 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That auto-generates the filters for you based on what is registered in the DurableTaskRegistry. Would you prefer I had a different method like UseAutoGeneratedWorkItemFitlers()? I combined them into one because I didn't want someone to be able to specify filters and auto-gen at the same time.
| entityActions.Add(new EntityFilter | ||
| { | ||
| // Entity names are normalized to lowercase in the backend. | ||
| Name = entity.Key.ToString().ToLowerInvariant(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the backend is already normalizing these, do we need to lower-case them here? I worry a bit about operations like this since I've frequently seen them lead to subtle bugs. My personal preference is that we instead rely on case-insensitive string comparisons everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll actually make sure I move this logic to the backend only. Right now, the backend is actually what forces the normalization so we can at least keep it all in one place/use case-insensitive comparisons where appropriate.
Signed-off-by: Hal Spang <halspang@microsoft.com>
1e69535 to
787ecb2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
| } | ||
|
|
||
| /// <summary> | ||
| /// Struct specifying an orchestration filter. |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation says 'Struct' but these are declared as structs. However, the summary should use the standard format 'Represents an orchestration filter' or 'Specifies an orchestration filter' without explicitly mentioning 'Struct' since that's already clear from the type declaration itself.
| } | ||
|
|
||
| /// <summary> | ||
| /// Struct specifying an activity filter. |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation says 'Struct' but this is already clear from the type declaration. The summary should use the standard format 'Represents an activity filter' or 'Specifies an activity filter'.
| } | ||
|
|
||
| /// <summary> | ||
| /// Struct specifying an entity filter. |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation says 'Struct' but this is already clear from the type declaration. The summary should use the standard format 'Represents an entity filter' or 'Specifies an entity filter'.
| /// Struct specifying an entity filter. | |
| /// Specifies an entity filter. |
| builder.Services.AddSingleton(provider => | ||
| { | ||
| DurableTaskRegistry registry = provider.GetRequiredService<IOptionsMonitor<DurableTaskRegistry>>().Get(builder.Name); | ||
| DurableTaskWorkerOptions? options = provider.GetOptions<DurableTaskWorkerOptions>(builder.Name); | ||
| return DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, options); | ||
| }); |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AddSingleton registration adds filters without keying them to the builder name, which could cause issues when multiple named workers are configured with different filters. The registration should be keyed to the builder name similar to how other named options are handled. Consider using AddKeyedSingleton or a similar pattern to ensure each worker gets its own filter instance. The test 'UseWorkItemFilters_NamedBuilders_HaveUniqueFilters' (line 147-167 in the test file) may be passing due to test implementation details rather than correct DI registration.
| foreach (var entityFilter in workItemFilter.Entities) | ||
| { | ||
| var grpcEntityFilter = new P.EntityFilter | ||
| { | ||
| Name = entityFilter.Name, | ||
| }; | ||
| grpcWorkItemFilters.Entities.Add(grpcEntityFilter); | ||
| } |
Copilot
AI
Jan 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This foreach loop immediately maps its iteration variable to another variable - consider mapping the sequence explicitly using '.Select(...)'.
Summary
What changed?
This change adds WorkItemFilters into the grpc worker. This includes builder methods to specify them and the connection into the GetWorkItems flow inside the worker processor.
Why is this change needed?
Every orchestration, activity, and entity would go to any worker regardless of their actual ability. This change allows workers to specify what they can work on and receive those changes.
Issues / work items
Project checklist
release_notes.mdAI-assisted code disclosure (required)
Was an AI tool used? (select one)
If AI was used:
AI verification (required if AI was used):
Testing
Automated tests
Manual validation (only if runtime/behavior changed)
Environment (OS, .NET version, components): Windows/Aspire, .NET, Custom test bed
Steps + observed results:
Evidence (optional):
I didn't take screenshots but I can go back if required.
Notes for reviewers