Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;

repeated WorkerCapability capabilities = 10;
WorkItemFilters workItemFilters = 11;
}

enum WorkerCapability {
Expand All @@ -844,6 +845,26 @@ enum WorkerCapability {
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}

message WorkItemFilters {
repeated OrchestrationFilter orchestrations = 1;
repeated ActivityFilter activities = 2;
repeated EntityFilter entities = 3;
}

message OrchestrationFilter {
string name = 1;
repeated string versions = 2;
}

message ActivityFilter {
string name = 1;
repeated string versions = 2;
}

message EntityFilter {
string name = 1;
}

message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
where TTarget : DurableTaskWorker
where TOptions : DurableTaskWorkerOptions
{
builder.UseBuildTarget(typeof(TTarget));

Check warning on line 81 in src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Prefer the generic overload 'Microsoft.DurableTask.Worker.DurableTaskWorkerBuilderExtensions.UseBuildTarget<TTarget>(Microsoft.DurableTask.Worker.IDurableTaskWorkerBuilder)' instead of 'Microsoft.DurableTask.Worker.DurableTaskWorkerBuilderExtensions.UseBuildTarget(Microsoft.DurableTask.Worker.IDurableTaskWorkerBuilder, System.Type)' (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2263)
builder.Services.AddOptions<TOptions>(builder.Name)
.PostConfigure<IOptionsMonitor<DurableTaskWorkerOptions>>((options, baseOptions) =>
{
Expand Down Expand Up @@ -137,4 +137,32 @@
builder.Services.AddSingleton(filter);
return builder;
}

/// <summary>
/// Adds <see cref="DurableTaskWorkerWorkItemFilters"/> to the specified <see cref="IDurableTaskWorkerBuilder"/>.
/// </summary>
/// <param name="builder">The builder to set the builder target for.</param>
/// <param name="workItemFilters">The instance of a <see cref="DurableTaskWorkerWorkItemFilters"/> to use.</param>
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
/// <remarks>If this is called without specified filters, the filters will be constructed from the registered orchestrations, activities, and entities.</remarks>
public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters = null)
{
Check.NotNull(builder);
if (workItemFilters != null)
{
builder.Services.AddSingleton(workItemFilters);
}
else
{
// Auto-generate the filters from registered orchestrations, activities, and entities.
builder.Services.AddSingleton(provider =>
{
DurableTaskRegistry registry = provider.GetRequiredService<IOptionsMonitor<DurableTaskRegistry>>().Get(builder.Name);
DurableTaskWorkerOptions? options = provider.GetOptions<DurableTaskWorkerOptions>(builder.Name);
return DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, options);
});
Comment on lines +158 to +163
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AddSingleton registration adds filters without keying them to the builder name, which could cause issues when multiple named workers are configured with different filters. The registration should be keyed to the builder name similar to how other named options are handled. Consider using AddKeyedSingleton or a similar pattern to ensure each worker gets its own filter instance. The test 'UseWorkItemFilters_NamedBuilders_HaveUniqueFilters' (line 147-167 in the test file) may be passing due to test implementation details rather than correct DI registration.

Copilot uses AI. Check for mistakes.
}

return builder;
}
}
99 changes: 99 additions & 0 deletions src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Worker;

/// <summary>
/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend
/// and only work items matching the filters will be processed by the worker. If no filters are provided,
/// the worker will process all work items.
/// </summary>
public class DurableTaskWorkerWorkItemFilters
{
/// <summary>
/// Gets or initializes the orchestration filters.
/// </summary>
public IReadOnlyList<OrchestrationFilter> Orchestrations { get; init; } = [];

/// <summary>
/// Gets or initializes the activity filters.
/// </summary>
public IReadOnlyList<ActivityFilter> Activities { get; init; } = [];

/// <summary>
/// Gets or initializes the entity filters.
/// </summary>
public IReadOnlyList<EntityFilter> Entities { get; init; } = [];

/// <summary>
/// Creates a new instance of the <see cref="DurableTaskWorkerWorkItemFilters"/> class.
/// </summary>
/// <param name="registry"><see cref="DurableTaskRegistry"/> to construct the filter from.</param>
/// <param name="workerOptions"><see cref="DurableTaskWorkerOptions"/> that optionally provides versioning information.</param>
/// <returns>A new instance of <see cref="DurableTaskWorkerWorkItemFilters"/> constructed from the provided registry.</returns>
internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions)
{
// TODO: Support multiple versions per orchestration/activity. For now, grab the worker version from the options.
return new DurableTaskWorkerWorkItemFilters
{
Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter
{
Name = orchestration.Key,
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
}).ToList(),
Activities = registry.Activities.Select(activity => new ActivityFilter
{
Name = activity.Key,
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
}).ToList(),
Entities = registry.Entities.Select(entity => new EntityFilter
{
// Entity names are normalized to lowercase in the backend.
Name = entity.Key.ToString().ToLowerInvariant(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the backend is already normalizing these, do we need to lower-case them here? I worry a bit about operations like this since I've frequently seen them lead to subtle bugs. My personal preference is that we instead rely on case-insensitive string comparisons everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll actually make sure I move this logic to the backend only. Right now, the backend is actually what forces the normalization so we can at least keep it all in one place/use case-insensitive comparisons where appropriate.

}).ToList(),
};
}

/// <summary>
/// Struct specifying an orchestration filter.
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation says 'Struct' but these are declared as structs. However, the summary should use the standard format 'Represents an orchestration filter' or 'Specifies an orchestration filter' without explicitly mentioning 'Struct' since that's already clear from the type declaration itself.

Copilot uses AI. Check for mistakes.
/// </summary>
public struct OrchestrationFilter
{
/// <summary>
/// Gets or initializes the name of the orchestration to filter.
/// </summary>
public string Name { get; init; }

/// <summary>
/// Gets or initializes the versions of the orchestration to filter.
/// </summary>
public IReadOnlyList<string> Versions { get; init; }
}

/// <summary>
/// Struct specifying an activity filter.
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation says 'Struct' but this is already clear from the type declaration. The summary should use the standard format 'Represents an activity filter' or 'Specifies an activity filter'.

Copilot uses AI. Check for mistakes.
/// </summary>
public struct ActivityFilter
{
/// <summary>
/// Gets or initializes the name of the activity to filter.
/// </summary>
public string Name { get; init; }

/// <summary>
/// Gets or initializes the versions of the activity to filter.
/// </summary>
public IReadOnlyList<string> Versions { get; init; }
}

/// <summary>
/// Struct specifying an entity filter.
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation says 'Struct' but this is already clear from the type declaration. The summary should use the standard format 'Represents an entity filter' or 'Specifies an entity filter'.

Suggested change
/// Struct specifying an entity filter.
/// Specifies an entity filter.

Copilot uses AI. Check for mistakes.
/// </summary>
public struct EntityFilter
{
/// <summary>
/// Gets or initializes the name of the entity to filter.
/// </summary>
public string Name { get; init; }
}
}
2 changes: 2 additions & 0 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.DurableTask.Abstractions;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Tracing;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -255,6 +256,7 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
MaxConcurrentEntityWorkItems =
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
Capabilities = { this.worker.grpcOptions.Capabilities },
WorkItemFilters = this.worker.workItemFilters?.ToGrpcWorkItemFilters(),
},
cancellationToken: cancellation);
}
Expand Down
6 changes: 5 additions & 1 deletion src/Worker/Grpc/GrpcDurableTaskWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
readonly ILoggerFactory loggerFactory;
readonly ILogger logger;
readonly IOrchestrationFilter? orchestrationFilter;
readonly DurableTaskWorkerWorkItemFilters? workItemFilters;

/// <summary>
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
Expand All @@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
/// <param name="loggerFactory">The logger.</param>
/// <param name="orchestrationFilter">The optional <see cref="IOrchestrationFilter"/> used to filter orchestration execution.</param>
/// <param name="exceptionPropertiesProvider">The custom exception properties provider that help build failure details.</param>
/// <param name="workItemFilters">The optional <see cref="DurableTaskWorkerWorkItemFilters"/> used to filter work items in the backend.</param>
public GrpcDurableTaskWorker(
string name,
IDurableTaskFactory factory,
Expand All @@ -38,7 +40,8 @@ public GrpcDurableTaskWorker(
IServiceProvider services,
ILoggerFactory loggerFactory,
IOrchestrationFilter? orchestrationFilter = null,
IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
IExceptionPropertiesProvider? exceptionPropertiesProvider = null,
DurableTaskWorkerWorkItemFilters? workItemFilters = null)
: base(name, factory)
{
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
Expand All @@ -48,6 +51,7 @@ public GrpcDurableTaskWorker(
this.logger = CreateLogger(loggerFactory, this.workerOptions);
this.orchestrationFilter = orchestrationFilter;
this.ExceptionPropertiesProvider = exceptionPropertiesProvider;
this.workItemFilters = workItemFilters;
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.Worker.Grpc.Internal;

/// <summary>
/// Extension for <see cref="DurableTaskWorkerWorkItemFilters"/> to convert to gRPC types.
/// </summary>
public static class DurableTaskWorkerWorkItemFiltersExtension
{
/// <summary>
/// Converts a <see cref="DurableTaskWorkerWorkItemFilters"/> to a gRPC <see cref="P.WorkItemFilters"/>.
/// </summary>
/// <param name="workItemFilter">The <see cref="DurableTaskWorkerWorkItemFilters"/> to convert.</param>
/// <returns>A gRPC <see cref="P.WorkItemFilters"/>.</returns>
public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter)
{
Check.NotNull(workItemFilter);
var grpcWorkItemFilters = new P.WorkItemFilters();
foreach (var orchestrationFilter in workItemFilter.Orchestrations)
{
var grpcOrchestrationFilter = new P.OrchestrationFilter
{
Name = orchestrationFilter.Name,
};
grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions);
grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter);
}

foreach (var activityFilter in workItemFilter.Activities)
{
var grpcActivityFilter = new P.ActivityFilter
{
Name = activityFilter.Name,
};
grpcActivityFilter.Versions.AddRange(activityFilter.Versions);
grpcWorkItemFilters.Activities.Add(grpcActivityFilter);
}

foreach (var entityFilter in workItemFilter.Entities)
{
var grpcEntityFilter = new P.EntityFilter
{
Name = entityFilter.Name,
};
grpcWorkItemFilters.Entities.Add(grpcEntityFilter);
}
Comment on lines +42 to +49
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach loop immediately maps its iteration variable to another variable - consider mapping the sequence explicitly using '.Select(...)'.

Copilot uses AI. Check for mistakes.

return grpcWorkItemFilters;
}
}
Loading
Loading