-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Description
I have three concurrent executors, one for start and one for aggregation.
I looked at the example and found that the aggregator should accept List, but my aggregator was never called with HandleAsync when accepting List.
I don't know if I missed something, or if this is just by design in my usage.
I just want to simply run through the entire workflow that uses the actuators, but that doesn't mean I have to use the AI functions when using MAF. I also tried changing the aggregator's input to String.
The HandleAsync of such an aggregator will be called three times. Of course, I can add a condition to check the output of HandleAsync, for example
mes.Add(message);
if (mes.Count >= 3)
{
await context.SendMessageAsync("3", cancellationToken: cancellationToken).ConfigureAwait(false);
}
Even so, the workflow output still does not show 'workflowoutput‘
Similarly, I also made the actuator have a return value, for example <codeinfo, string>, but even so it doesn't work.
Once the aggregator accepts using List, HandleAsync still will not be triggered.
Code Sample
public sealed partial class Starter_Executor() : Executor<CodeInfo, CodeInfo>(nameof(Starter_Executor))
{
public override ValueTask<CodeInfo> HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
return ValueTask.FromResult(message);
}
}
public sealed partial class Executor1(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor1))
{
[MessageHandler]
public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
//return ValueTask.FromResult("1");
}
}
public sealed partial class Executor2(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor2))
{
[MessageHandler]
public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
//return ValueTask.FromResult("2");
}
}
public sealed partial class Executor3(AgentBuilder agentBuilder) : Executor<CodeInfo>(nameof(Executor3))
{
[MessageHandler]
public override async ValueTask HandleAsync(CodeInfo message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
await context.SendMessageAsync("Summary", cancellationToken: cancellationToken).ConfigureAwait(false);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken).ConfigureAwait(false);
//return ValueTask.FromResult("2");
}
}
public sealed partial class Aggregation_Executor() : Executor<List<String>>(nameof(Aggregation_Executor))
{
private readonly JsonSerializerOptions sds = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
private List<String> mes = [];
[MessageHandler]
public override async ValueTask HandleAsync(List<String> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
mes=(message);
if (mes.Count >= 3)
{
await context.SendMessageAsync("3", cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}
WorkflowBuilder wfb = new WorkflowBuilder(starter)
.AddFanOutEdge(starter, [Executor1, Executor2, Executor3], targetSelector: null)
.AddFanInEdge([Executor1, Executor2, Executor3], Aggregation_Executor)
.WithOutputFrom(Aggregation_Executor);
Workflow wf = wfb.Build();
await InProcessExecution.RunAsync(wf, inpiutobject)........Error Messages / Stack Traces
Package Versions
Microsoft.Agents.AI.OpenAI:1.0.0-preview.260205.1 ,Microsoft.Agents.AI.Workflows:1.0.0-preview.260205.1,Microsoft.Extensions.AI:10.2.0
.NET Version
.NET 10
Additional Context
No response