-
Notifications
You must be signed in to change notification settings - Fork 53
Introduce WorkItemFilters into worker flow #616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,99 @@ | ||||||
| // Copyright (c) Microsoft Corporation. | ||||||
| // Licensed under the MIT License. | ||||||
|
|
||||||
| namespace Microsoft.DurableTask.Worker; | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend | ||||||
| /// and only work items matching the filters will be processed by the worker. If no filters are provided, | ||||||
| /// the worker will process all work items. | ||||||
| /// </summary> | ||||||
| public class DurableTaskWorkerWorkItemFilters | ||||||
| { | ||||||
| /// <summary> | ||||||
| /// Gets or initializes the orchestration filters. | ||||||
| /// </summary> | ||||||
| public IReadOnlyList<OrchestrationFilter> Orchestrations { get; init; } = []; | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Gets or initializes the activity filters. | ||||||
| /// </summary> | ||||||
| public IReadOnlyList<ActivityFilter> Activities { get; init; } = []; | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Gets or initializes the entity filters. | ||||||
| /// </summary> | ||||||
| public IReadOnlyList<EntityFilter> Entities { get; init; } = []; | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Creates a new instance of the <see cref="DurableTaskWorkerWorkItemFilters"/> class. | ||||||
| /// </summary> | ||||||
| /// <param name="registry"><see cref="DurableTaskRegistry"/> to construct the filter from.</param> | ||||||
| /// <param name="workerOptions"><see cref="DurableTaskWorkerOptions"/> that optionally provides versioning information.</param> | ||||||
| /// <returns>A new instance of <see cref="DurableTaskWorkerWorkItemFilters"/> constructed from the provided registry.</returns> | ||||||
| internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) | ||||||
| { | ||||||
| // TODO: Support multiple versions per orchestration/activity. For now, grab the worker version from the options. | ||||||
| return new DurableTaskWorkerWorkItemFilters | ||||||
| { | ||||||
| Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter | ||||||
| { | ||||||
| Name = orchestration.Key, | ||||||
| Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], | ||||||
| }).ToList(), | ||||||
| Activities = registry.Activities.Select(activity => new ActivityFilter | ||||||
| { | ||||||
| Name = activity.Key, | ||||||
| Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], | ||||||
| }).ToList(), | ||||||
| Entities = registry.Entities.Select(entity => new EntityFilter | ||||||
| { | ||||||
| // Entity names are normalized to lowercase in the backend. | ||||||
| Name = entity.Key.ToString().ToLowerInvariant(), | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the backend is already normalizing these, do we need to lower-case them here? I worry a bit about operations like this since I've frequently seen them lead to subtle bugs. My personal preference is that we instead rely on case-insensitive string comparisons everywhere.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll actually make sure I move this logic to the backend only. Right now, the backend is actually what forces the normalization so we can at least keep it all in one place/use case-insensitive comparisons where appropriate. |
||||||
| }).ToList(), | ||||||
| }; | ||||||
| } | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Struct specifying an orchestration filter. | ||||||
|
||||||
| /// </summary> | ||||||
| public struct OrchestrationFilter | ||||||
| { | ||||||
| /// <summary> | ||||||
| /// Gets or initializes the name of the orchestration to filter. | ||||||
| /// </summary> | ||||||
| public string Name { get; init; } | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Gets or initializes the versions of the orchestration to filter. | ||||||
| /// </summary> | ||||||
| public IReadOnlyList<string> Versions { get; init; } | ||||||
| } | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Struct specifying an activity filter. | ||||||
|
||||||
| /// </summary> | ||||||
| public struct ActivityFilter | ||||||
| { | ||||||
| /// <summary> | ||||||
| /// Gets or initializes the name of the activity to filter. | ||||||
| /// </summary> | ||||||
| public string Name { get; init; } | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Gets or initializes the versions of the activity to filter. | ||||||
| /// </summary> | ||||||
| public IReadOnlyList<string> Versions { get; init; } | ||||||
| } | ||||||
|
|
||||||
| /// <summary> | ||||||
| /// Struct specifying an entity filter. | ||||||
|
||||||
| /// Struct specifying an entity filter. | |
| /// Specifies an entity filter. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using P = Microsoft.DurableTask.Protobuf; | ||
|
|
||
| namespace Microsoft.DurableTask.Worker.Grpc.Internal; | ||
|
|
||
| /// <summary> | ||
| /// Extension for <see cref="DurableTaskWorkerWorkItemFilters"/> to convert to gRPC types. | ||
| /// </summary> | ||
| public static class DurableTaskWorkerWorkItemFiltersExtension | ||
| { | ||
| /// <summary> | ||
| /// Converts a <see cref="DurableTaskWorkerWorkItemFilters"/> to a gRPC <see cref="P.WorkItemFilters"/>. | ||
| /// </summary> | ||
| /// <param name="workItemFilter">The <see cref="DurableTaskWorkerWorkItemFilters"/> to convert.</param> | ||
| /// <returns>A gRPC <see cref="P.WorkItemFilters"/>.</returns> | ||
| public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter) | ||
| { | ||
halspang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Check.NotNull(workItemFilter); | ||
| var grpcWorkItemFilters = new P.WorkItemFilters(); | ||
| foreach (var orchestrationFilter in workItemFilter.Orchestrations) | ||
| { | ||
| var grpcOrchestrationFilter = new P.OrchestrationFilter | ||
| { | ||
| Name = orchestrationFilter.Name, | ||
| }; | ||
| grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions); | ||
| grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter); | ||
| } | ||
|
|
||
| foreach (var activityFilter in workItemFilter.Activities) | ||
| { | ||
| var grpcActivityFilter = new P.ActivityFilter | ||
| { | ||
| Name = activityFilter.Name, | ||
| }; | ||
| grpcActivityFilter.Versions.AddRange(activityFilter.Versions); | ||
| grpcWorkItemFilters.Activities.Add(grpcActivityFilter); | ||
| } | ||
|
|
||
| foreach (var entityFilter in workItemFilter.Entities) | ||
| { | ||
| var grpcEntityFilter = new P.EntityFilter | ||
| { | ||
| Name = entityFilter.Name, | ||
| }; | ||
| grpcWorkItemFilters.Entities.Add(grpcEntityFilter); | ||
| } | ||
|
Comment on lines
+42
to
+49
|
||
|
|
||
| return grpcWorkItemFilters; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
AddSingletonregistration adds filters without keying them to the builder name, which could cause issues when multiple named workers are configured with different filters. The registration should be keyed to the builder name similar to how other named options are handled. Consider usingAddKeyedSingletonor a similar pattern to ensure each worker gets its own filter instance. The test 'UseWorkItemFilters_NamedBuilders_HaveUniqueFilters' (line 147-167 in the test file) may be passing due to test implementation details rather than correct DI registration.