Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
525 changes: 364 additions & 161 deletions python/packages/core/agent_framework/_workflows/_workflow_builder.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext[Agent
# Build workflow: start -> agent1 (no output) -> agent2 (output_response=True)
workflow = (
WorkflowBuilder()
.register_executor(lambda: start_executor, "start")
.register_executors({"start": lambda: start_executor})
.register_agent(lambda: MockAgent("agent1", "Agent1 output - should NOT appear"), "agent1")
.register_agent(lambda: MockAgent("agent2", "Agent2 output - SHOULD appear"), "agent2")
.set_start_executor("start")
Expand Down Expand Up @@ -728,7 +728,7 @@ async def start_executor(messages: list[ChatMessage], ctx: WorkflowContext[Agent
# Build workflow with single agent
workflow = (
WorkflowBuilder()
.register_executor(lambda: start_executor, "start")
.register_executors({"start": lambda: start_executor})
.register_agent(lambda: MockAgent("agent", "Unique response text"), "agent")
.set_start_executor("start")
.add_edge("start", "agent")
Expand Down
142 changes: 108 additions & 34 deletions python/packages/core/tests/workflow/test_workflow_builder.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests for:

  • mixed sync+async factories with build_async()?
  • coroutine .close() actually called on sync build with async factory?
  • build_async() when async factory raises exception?
  • multiple register_executors() calls with non-overlapping names?
  • whitespace-only names (like " ") failing as rejected?

Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,62 @@ def test_add_agent_duplicate_id_raises_error():
builder.set_start_executor(agent1).add_edge(agent1, agent2).build()


# Tests for build() with async executor factories


async def test_build_async_with_async_executor_factories():
"""Test that build_async() works with async executor factories."""

async def create_executor_a() -> MockExecutor:
return MockExecutor(id="executor_a")

async def create_executor_b() -> MockExecutor:
return MockExecutor(id="executor_b")

# Build workflow with async executor factories
workflow = await (
WorkflowBuilder()
.register_executors({
"ExecutorA": create_executor_a,
"ExecutorB": create_executor_b,
})
.set_start_executor("ExecutorA")
.add_edge("ExecutorA", "ExecutorB")
.build_async()
)

assert workflow.start_executor_id == "executor_a"
assert len(workflow.executors) == 2
assert "executor_a" in workflow.executors
assert "executor_b" in workflow.executors


def test_build_raises_error_with_async_executor_factories():
"""Test that build() raises ValueError when async executor factories are detected."""

async def create_async_executor() -> MockExecutor:
return MockExecutor(id="executor_async")

builder = (
WorkflowBuilder()
.register_executors({"AsyncExecutor": create_async_executor})
.set_start_executor("AsyncExecutor")
)

# Attempting to build synchronously with async factories should raise an error
with pytest.raises(ValueError, match="Async executor factories were detected."):
builder.build()


# Tests for new executor registration patterns


def test_register_executor_basic():
def test_register_executors_basic():
"""Test basic executor registration with lazy initialization."""
builder = WorkflowBuilder()

# Register an executor factory - ID must match the registered name
result = builder.register_executor(lambda: MockExecutor(id="TestExecutor"), name="TestExecutor")
result = builder.register_executors({"TestExecutor": lambda: MockExecutor(id="TestExecutor")})

# Verify that register returns the builder for chaining
assert result is builder
Expand All @@ -154,15 +201,18 @@ def test_register_executor_basic():


def test_register_multiple_executors():
"""Test registering multiple executors and connecting them with edges."""
"""Test registering multiple executors with a mapping of factories and connecting them with edges."""
builder = WorkflowBuilder()

# Register multiple executors - IDs must match registered names
builder.register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorA")
builder.register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorB")
builder.register_executor(lambda: MockExecutor(id="ExecutorC"), name="ExecutorC")
# IDs must match registered names
result = builder.register_executors({
"ExecutorA": lambda: MockExecutor(id="ExecutorA"),
"ExecutorB": lambda: MockExecutor(id="ExecutorB"),
"ExecutorC": lambda: MockExecutor(id="ExecutorC"),
})

assert result is builder

# Build workflow with edges using registered names
workflow = (
builder
.set_start_executor("ExecutorA")
Expand All @@ -171,13 +221,26 @@ def test_register_multiple_executors():
.build()
)

# Verify all executors are present
assert "ExecutorA" in workflow.executors
assert "ExecutorB" in workflow.executors
assert "ExecutorC" in workflow.executors
assert workflow.start_executor_id == "ExecutorA"


def test_register_executors_rejects_empty_inputs():
"""Test that empty executor mappings and entries are rejected."""
builder = WorkflowBuilder()

with pytest.raises(ValueError, match="cannot be empty"):
builder.register_executors({})

with pytest.raises(ValueError, match="name cannot be empty"):
builder.register_executors({"": lambda: MockExecutor(id="ExecutorA")})

with pytest.raises(TypeError, match="must be callable"):
builder.register_executors({"ExecutorA": None}) # type: ignore[arg-type]


def test_register_with_multiple_names():
"""Test registering the same factory function under multiple names."""
builder = WorkflowBuilder()
Expand All @@ -190,7 +253,7 @@ def make_executor():
counter["val"] += 1
return MockExecutor(id="ExecutorA" if counter["val"] == 1 else "ExecutorB")

builder.register_executor(make_executor, name=["ExecutorA", "ExecutorB"])
builder.register_executors({"ExecutorA": make_executor, "ExecutorB": make_executor})

# Set up workflow
workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build()
Expand All @@ -206,20 +269,19 @@ def test_register_duplicate_name_raises_error():
builder = WorkflowBuilder()

# Register first executor
builder.register_executor(lambda: MockExecutor(id="executor_1"), name="MyExecutor")
builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_1")})

# Registering second executor with same name should raise ValueError
with pytest.raises(ValueError, match="already registered"):
builder.register_executor(lambda: MockExecutor(id="executor_2"), name="MyExecutor")
builder.register_executors({"MyExecutor": lambda: MockExecutor(id="executor_2")})


def test_register_duplicate_id_raises_error():
"""Test that registering duplicate id raises an error."""
builder = WorkflowBuilder()

# Register first executor
builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor1")
builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor2")
builder = WorkflowBuilder().register_executors({
"MyExecutor1": lambda: MockExecutor(id="executor"),
"MyExecutor2": lambda: MockExecutor(id="executor"),
})
builder.set_start_executor("MyExecutor1")

# Registering second executor with same ID should raise ValueError
Expand Down Expand Up @@ -281,8 +343,10 @@ def test_register_and_add_edge_with_strings():
builder = WorkflowBuilder()

# Register executors
builder.register_executor(lambda: MockExecutor(id="source"), name="Source")
builder.register_executor(lambda: MockExecutor(id="target"), name="Target")
builder.register_executors({
"Source": lambda: MockExecutor(id="source"),
"Target": lambda: MockExecutor(id="target"),
})

# Add edge using string names
workflow = builder.set_start_executor("Source").add_edge("Source", "Target").build()
Expand Down Expand Up @@ -316,9 +380,11 @@ def test_register_with_fan_out_edges():
builder = WorkflowBuilder()

# Register executors - IDs must match registered names
builder.register_executor(lambda: MockExecutor(id="Source"), name="Source")
builder.register_executor(lambda: MockExecutor(id="Target1"), name="Target1")
builder.register_executor(lambda: MockExecutor(id="Target2"), name="Target2")
builder.register_executors({
"Source": lambda: MockExecutor(id="Source"),
"Target1": lambda: MockExecutor(id="Target1"),
"Target2": lambda: MockExecutor(id="Target2"),
})

# Add fan-out edges using registered names
workflow = builder.set_start_executor("Source").add_fan_out_edges("Source", ["Target1", "Target2"]).build()
Expand All @@ -334,9 +400,11 @@ def test_register_with_fan_in_edges():
builder = WorkflowBuilder()

# Register executors - IDs must match registered names
builder.register_executor(lambda: MockExecutor(id="Source1"), name="Source1")
builder.register_executor(lambda: MockExecutor(id="Source2"), name="Source2")
builder.register_executor(lambda: MockAggregator(id="Aggregator"), name="Aggregator")
builder.register_executors({
"Source1": lambda: MockExecutor(id="Source1"),
"Source2": lambda: MockExecutor(id="Source2"),
"Aggregator": lambda: MockAggregator(id="Aggregator"),
})

# Add fan-in edges using registered names
# Both Source1 and Source2 need to be reachable, so connect Source1 to Source2
Expand All @@ -359,9 +427,11 @@ def test_register_with_chain():
builder = WorkflowBuilder()

# Register executors - IDs must match registered names
builder.register_executor(lambda: MockExecutor(id="Step1"), name="Step1")
builder.register_executor(lambda: MockExecutor(id="Step2"), name="Step2")
builder.register_executor(lambda: MockExecutor(id="Step3"), name="Step3")
builder.register_executors({
"Step1": lambda: MockExecutor(id="Step1"),
"Step2": lambda: MockExecutor(id="Step2"),
"Step3": lambda: MockExecutor(id="Step3"),
})

# Add chain using registered names
workflow = builder.add_chain(["Step1", "Step2", "Step3"]).set_start_executor("Step1").build()
Expand All @@ -383,7 +453,7 @@ def factory():
return MockExecutor(id="Test")

builder = WorkflowBuilder()
builder.register_executor(factory, name="Test")
builder.register_executors({"Test": factory})

# Factory should not be called yet
assert call_count == 0
Expand All @@ -410,7 +480,7 @@ def test_mixing_eager_and_lazy_initialization_error():
eager_executor = MockExecutor(id="eager")

# Register a lazy executor
builder.register_executor(lambda: MockExecutor(id="Lazy"), name="Lazy")
builder.register_executors({"Lazy": lambda: MockExecutor(id="Lazy")})

# Mixing eager and lazy should raise an error during add_edge
with pytest.raises(
Expand All @@ -431,8 +501,10 @@ def condition_func(msg: MockMessage) -> bool:
return msg.data > 0

# Register executors - IDs must match registered names
builder.register_executor(lambda: MockExecutor(id="Source"), name="Source")
builder.register_executor(lambda: MockExecutor(id="Target"), name="Target")
builder.register_executors({
"Source": lambda: MockExecutor(id="Source"),
"Target": lambda: MockExecutor(id="Target"),
})

# Add edge with condition
workflow = builder.set_start_executor("Source").add_edge("Source", "Target", condition=condition_func).build()
Expand Down Expand Up @@ -513,8 +585,10 @@ def test_with_output_from_with_registered_names():
"""Test with_output_from with registered factory names (strings)."""
workflow = (
WorkflowBuilder()
.register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorAFactory")
.register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorBFactory")
.register_executors({
"ExecutorAFactory": lambda: MockExecutor(id="ExecutorA"),
"ExecutorBFactory": lambda: MockExecutor(id="ExecutorB"),
})
.set_start_executor("ExecutorAFactory")
.add_edge("ExecutorAFactory", "ExecutorBFactory")
.with_output_from(["ExecutorBFactory"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

import asyncio

from agent_framework import (
AgentResponseUpdate,
ChatAgent,
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
executor,
handler,
)
from agent_framework import (AgentResponseUpdate, ChatAgent, Executor,
WorkflowBuilder, WorkflowContext,
WorkflowOutputEvent, executor, handler)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

Expand Down Expand Up @@ -68,15 +61,19 @@ def create_agent() -> ChatAgent:
async def main():
"""Build and run a simple 2-step workflow using the fluent builder API."""
# Build the workflow using a fluent pattern:
# 1) register_executor(factory, name) registers an executor factory
# 1) register_executors({name: factory}) registers executor factories
# 2) register_agent(factory, name) registers an agent factory
# 3) add_chain([node_names]) adds a sequence of nodes to the workflow
# 4) set_start_executor(node) declares the entry point
# 5) build() finalizes and returns an immutable Workflow object
workflow = (
WorkflowBuilder()
.register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase")
.register_executor(lambda: reverse_text, name="ReverseText")
.register_executors(
{
"UpperCase": lambda: UpperCase(id="upper_case_executor"),
"ReverseText": lambda: reverse_text,
}
)
.register_agent(create_agent, name="DecoderAgent")
.add_chain(["UpperCase", "ReverseText", "DecoderAgent"])
.set_start_executor("UpperCase")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ async def main() -> None:
WorkflowBuilder()
.register_agent(factory_func=lambda: writer, name="writer", agent_thread=shared_thread)
.register_agent(factory_func=lambda: reviewer, name="reviewer", agent_thread=shared_thread)
.register_executor(
factory_func=lambda: intercept_agent_response,
name="intercept_agent_response",
)
.register_executors({"intercept_agent_response": lambda: intercept_agent_response})
.add_chain(["writer", "intercept_agent_response", "reviewer"])
.set_start_executor("writer")
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@

import asyncio

from agent_framework import (
ChatAgent,
ChatMessage,
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
)
from agent_framework import (ChatAgent, ChatMessage, Executor, WorkflowBuilder,
WorkflowContext, handler)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ async def main() -> None:
# and escalation paths for human review.
agent = (
WorkflowBuilder()
.register_executor(
lambda: Worker(
id="sub-worker",
chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()),
),
name="worker",
)
.register_executor(
lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"),
name="reviewer",
.register_executors(
{
"worker": lambda: Worker(
id="sub-worker",
chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()),
),
"reviewer": lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"),
}
)
.add_edge("worker", "reviewer") # Worker sends requests to Reviewer
.add_edge("reviewer", "worker") # Reviewer sends feedback to Worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,11 @@ async def main() -> None:
print("Building workflow with Worker ↔ Reviewer cycle...")
agent = (
WorkflowBuilder()
.register_executor(
lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")),
name="worker",
)
.register_executor(
lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")),
name="reviewer",
.register_executors(
{
"worker": lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")),
"reviewer": lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")),
}
)
.add_edge("worker", "reviewer") # Worker sends responses to Reviewer
.add_edge("reviewer", "worker") # Reviewer provides feedback to Worker
Expand Down
Loading