Skip to content

.NET: [Bug]: FanIn Aggregation_Executor Never work #3724

@strikene

Description

@strikene

Description

I have three concurrent executors, one for start and one for aggregation.
I looked at the example and found that the aggregator should accept List, but my aggregator was never called with HandleAsync when accepting List.
I don't know if I missed something, or if this is just by design in my usage.
I just want to simply run through the entire workflow that uses the actuators, but that doesn't mean I have to use the AI functions when using MAF. I also tried changing the aggregator's input to String.
The HandleAsync of such an aggregator will be called three times. Of course, I can add a condition to check the output of HandleAsync, for example
mes.Add(message);
if (mes.Count >= 3)
{
await context.SendMessageAsync("3", cancellationToken: cancellationToken).ConfigureAwait(false);
}
Even so, the workflow output still does not show 'workflowoutput‘
Similarly, I also made the actuator have a return value, for example <codeinfo, string>, but even so it doesn't work.
Once the aggregator accepts using List, HandleAsync still will not be triggered.

Code Sample

public sealed partial class Starter_Executor() : Executor<CodeInfo, CodeInfo>(nameof(Starter_Executor))
 {
         public override ValueTask<CodeInfo> HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
     {
         return ValueTask.FromResult(message);
     }
 }


 public sealed partial class Executor1(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor1))
 {
     [MessageHandler]
     public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
     {
         await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
         // Broadcast the turn token to kick off the agents.
         await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
         //return ValueTask.FromResult("1");
     }
 } 
public sealed partial class Executor2(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor2))
 {
     [MessageHandler]
     public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
     {
         await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
         // Broadcast the turn token to kick off the agents.
         await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
         //return ValueTask.FromResult("2");
     }
 }
public sealed partial class Executor3(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor3))
 {
     [MessageHandler]
     public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
     {
         await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
         // Broadcast the turn token to kick off the agents.
         await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
         //return ValueTask.FromResult("2");
     }
 }
public sealed partial class Aggregation_Executor() : Executor<List<String>>(nameof(Aggregation_Executor))
{
    private readonly JsonSerializerOptions sds = new JsonSerializerOptions
    {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase
    };
    private  List<String> mes = [];
    [MessageHandler]
    public override async ValueTask HandleAsync(List<String> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        mes=(message);
        if (mes.Count >= 3)
        {
            await context.SendMessageAsync("3", cancellationToken: cancellationToken).ConfigureAwait(false);
        }
       
    }
}
WorkflowBuilder wfb = new WorkflowBuilder(starter)
    .AddFanOutEdge(starter, [Executor1, Executor2, Executor3], targetSelector: null)
    .AddFanInEdge([Executor1, Executor2, Executor3], Aggregation_Executor)
    .WithOutputFrom(Aggregation_Executor);
Workflow wf = wfb.Build();
await InProcessExecution.RunAsync(wf, inpiutobject)........

Error Messages / Stack Traces

Package Versions

Microsoft.Agents.AI.OpenAI:1.0.0-preview.260205.1 ,Microsoft.Agents.AI.Workflows:1.0.0-preview.260205.1,Microsoft.Extensions.AI:10.2.0

.NET Version

.NET 10

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions