Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ def _build_workflow(self, agent: AgentProtocol) -> Workflow:
request_info_executor = AgentRequestInfoExecutor(id="agent_request_info_executor")

return (
WorkflowBuilder()
WorkflowBuilder(start_executor=agent_executor)
# Create a loop between agent executor and request info executor
.add_edge(agent_executor, request_info_executor)
.add_edge(request_info_executor, agent_executor)
.set_start_executor(agent_executor)
.build()
)

Expand Down
4 changes: 2 additions & 2 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class Workflow(DictConvertible):
Checkpointing can be configured at build time or runtime:

Build-time (via WorkflowBuilder):
workflow = WorkflowBuilder().with_checkpointing(storage).build()
workflow = WorkflowBuilder(checkpoint_storage=storage).build()

Runtime (via run/run_stream parameters):
result = await workflow.run(message, checkpoint_storage=runtime_storage)
Expand Down Expand Up @@ -430,7 +430,7 @@ async def _execute_with_message_or_checkpoint(
if not has_checkpointing and checkpoint_storage is None:
raise ValueError(
"Cannot restore from checkpoint: either provide checkpoint_storage parameter "
"or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)."
"or build workflow with WorkflowBuilder(checkpoint_storage=checkpoint_storage)."
)

await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage)
Expand Down
209 changes: 32 additions & 177 deletions python/packages/core/agent_framework/_workflows/_workflow_builder.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions python/packages/core/tests/workflow/test_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None:
executor = AgentExecutor(initial_agent, agent_thread=initial_thread)

# Build workflow with checkpointing enabled
wf = SequentialBuilder().participants([executor]).with_checkpointing(storage).build()
wf = SequentialBuilder(checkpoint_storage=storage).participants([executor]).build()

# Run the workflow with a user message
first_run_output: AgentExecutorResponse | None = None
Expand Down Expand Up @@ -122,7 +122,7 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None:
assert restored_agent.call_count == 0

# Build new workflow with the restored executor
wf_resume = SequentialBuilder().participants([restored_executor]).with_checkpointing(storage).build()
wf_resume = SequentialBuilder(checkpoint_storage=storage).participants([restored_executor]).build()

# Resume from checkpoint
resumed_output: AgentExecutorResponse | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def test_agent_executor_emits_tool_calls_in_streaming_mode() -> None:
agent = _ToolCallingAgent(id="tool_agent", name="ToolAgent")
agent_exec = AgentExecutor(agent, id="tool_exec")

workflow = WorkflowBuilder().set_start_executor(agent_exec).build()
workflow = WorkflowBuilder(start_executor=agent_exec).build()

# Act: run in streaming mode
events: list[WorkflowOutputEvent] = []
Expand Down Expand Up @@ -231,11 +231,7 @@ async def test_agent_executor_tool_call_with_approval() -> None:
)

workflow = (
WorkflowBuilder()
.set_start_executor(agent)
.add_edge(agent, test_executor)
.with_output_from([test_executor])
.build()
WorkflowBuilder(start_executor=agent, output_executors=[test_executor]).add_edge(agent, test_executor).build()
)

# Act
Expand Down Expand Up @@ -268,7 +264,7 @@ async def test_agent_executor_tool_call_with_approval_streaming() -> None:
tools=[mock_tool_requiring_approval],
)

workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
workflow = WorkflowBuilder(start_executor=agent).add_edge(agent, test_executor).build()

# Act
request_info_events: list[RequestInfoEvent] = []
Expand Down Expand Up @@ -306,11 +302,7 @@ async def test_agent_executor_parallel_tool_call_with_approval() -> None:
)

workflow = (
WorkflowBuilder()
.set_start_executor(agent)
.add_edge(agent, test_executor)
.with_output_from([test_executor])
.build()
WorkflowBuilder(start_executor=agent, output_executors=[test_executor]).add_edge(agent, test_executor).build()
)

# Act
Expand Down Expand Up @@ -345,7 +337,7 @@ async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> No
tools=[mock_tool_requiring_approval],
)

workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
workflow = WorkflowBuilder(start_executor=agent).add_edge(agent, test_executor).build()

# Act
request_info_events: list[RequestInfoEvent] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ def build_workflow(storage: InMemoryCheckpointStorage, finish_id: str = "finish"
start = StartExecutor(id="start")
finish = FinishExecutor(id=finish_id)

builder = WorkflowBuilder(max_iterations=3).set_start_executor(start).add_edge(start, finish)
builder = builder.with_checkpointing(checkpoint_storage=storage)
builder = WorkflowBuilder(max_iterations=3, start_executor=start, checkpoint_storage=storage).add_edge(
start, finish
)
return builder.build()


Expand Down
10 changes: 5 additions & 5 deletions python/packages/core/tests/workflow/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def handle(self, text: str, ctx: WorkflowContext) -> None:
upper = UpperCaseExecutor(id="upper")
collector = CollectorExecutor(id="collector")

workflow = WorkflowBuilder().add_edge(upper, collector).set_start_executor(upper).build()
workflow = WorkflowBuilder(start_executor=upper).add_edge(upper, collector).build()

events = await workflow.run("hello world")
invoked_events = [e for e in events if isinstance(e, ExecutorInvokedEvent)]
Expand Down Expand Up @@ -191,7 +191,7 @@ async def handle(self, text: str, ctx: WorkflowContext) -> None:
sender = MultiSenderExecutor(id="sender")
collector = CollectorExecutor(id="collector")

workflow = WorkflowBuilder().add_edge(sender, collector).set_start_executor(sender).build()
workflow = WorkflowBuilder(start_executor=sender).add_edge(sender, collector).build()

events = await workflow.run("hello")
completed_events = [e for e in events if isinstance(e, ExecutorCompletedEvent)]
Expand Down Expand Up @@ -220,7 +220,7 @@ async def handle(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(text.upper())

executor = YieldOnlyExecutor(id="yielder")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

events = await workflow.run("test")
completed_events = [e for e in events if isinstance(e, ExecutorCompletedEvent)]
Expand Down Expand Up @@ -263,7 +263,7 @@ async def handle(self, response: Response, ctx: WorkflowContext) -> None:
processor = ProcessorExecutor(id="processor")
collector = CollectorExecutor(id="collector")

workflow = WorkflowBuilder().add_edge(processor, collector).set_start_executor(processor).build()
workflow = WorkflowBuilder(start_executor=processor).add_edge(processor, collector).build()

input_request = Request(query="hello", limit=3)
events = await workflow.run(input_request)
Expand Down Expand Up @@ -542,7 +542,7 @@ async def mutator(messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMes
# Verify mutation happened
assert len(messages) == original_len + 1

workflow = WorkflowBuilder().set_start_executor(mutator).build()
workflow = WorkflowBuilder(start_executor=mutator).build()

# Run with a single user message
input_messages = [ChatMessage("user", ["hello"])]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,7 @@ async def test_agent_executor_populates_full_conversation_non_streaming() -> Non
agent_exec = AgentExecutor(agent, id="agent1-exec")
capturer = _CaptureFullConversation(id="capture")

wf = (
WorkflowBuilder()
.set_start_executor(agent_exec)
.add_edge(agent_exec, capturer)
.with_output_from([capturer])
.build()
)
wf = WorkflowBuilder(start_executor=agent_exec, output_executors=[capturer]).add_edge(agent_exec, capturer).build()

# Act: use run() instead of run_stream() to test non-streaming mode
result = await wf.run("hello world")
Expand Down
4 changes: 2 additions & 2 deletions python/packages/core/tests/workflow/test_function_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async def reverse_text(text: str, ctx: WorkflowContext[Any, str]) -> None:
assert reverse_spec["output_types"] == [Any] # First parameter is Any
assert reverse_spec["workflow_output_types"] == [str] # Second parameter is str

workflow = WorkflowBuilder().add_edge(to_upper, reverse_text).set_start_executor(to_upper).build()
workflow = WorkflowBuilder(start_executor=to_upper).add_edge(to_upper, reverse_text).build()

# Run workflow
events = await workflow.run("hello world")
Expand Down Expand Up @@ -345,7 +345,7 @@ async def double_value(value: int):

# Since single-parameter functions can't send messages,
# they're typically used as terminal nodes or for side effects
WorkflowBuilder().set_start_executor(double_value).build()
WorkflowBuilder(start_executor=double_value).build()

# For testing purposes, we can check that the handler is registered correctly
assert double_value.can_handle(Message(data=5, source_id="mock"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TestRequestInfoAndResponse:
async def test_approval_workflow(self):
"""Test end-to-end workflow with approval request."""
executor = ApprovalRequiredExecutor(id="approval_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# First run the workflow until it emits a request
request_info_event: RequestInfoEvent | None = None
Expand All @@ -204,7 +204,7 @@ async def test_approval_workflow(self):
async def test_calculation_workflow(self):
"""Test end-to-end workflow with calculation request."""
executor = CalculationExecutor(id="calc_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# First run the workflow until it emits a calculation request
request_info_event: RequestInfoEvent | None = None
Expand All @@ -231,7 +231,7 @@ async def test_calculation_workflow(self):
async def test_multiple_requests_workflow(self):
"""Test workflow with multiple concurrent requests."""
executor = MultiRequestExecutor(id="multi_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# Collect all request events by running the full stream
request_events: list[RequestInfoEvent] = []
Expand Down Expand Up @@ -265,7 +265,7 @@ async def test_multiple_requests_workflow(self):
async def test_denied_approval_workflow(self):
"""Test workflow when approval is denied."""
executor = ApprovalRequiredExecutor(id="approval_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# First run the workflow until it emits a request
request_info_event: RequestInfoEvent | None = None
Expand All @@ -288,7 +288,7 @@ async def test_denied_approval_workflow(self):
async def test_workflow_state_with_pending_requests(self):
"""Test workflow state when waiting for responses."""
executor = ApprovalRequiredExecutor(id="approval_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# Run workflow until idle with pending requests
request_info_event: RequestInfoEvent | None = None
Expand All @@ -313,7 +313,7 @@ async def test_workflow_state_with_pending_requests(self):
async def test_invalid_calculation_input(self):
"""Test workflow handling of invalid calculation input."""
executor = CalculationExecutor(id="calc_executor")
workflow = WorkflowBuilder().set_start_executor(executor).build()
workflow = WorkflowBuilder(start_executor=executor).build()

# Send invalid input (no numbers)
completed = False
Expand All @@ -335,7 +335,7 @@ async def test_checkpoint_with_pending_request_info_events(self):

# Create workflow with checkpointing enabled
executor = ApprovalRequiredExecutor(id="approval_executor")
workflow = WorkflowBuilder().set_start_executor(executor).with_checkpointing(storage).build()
workflow = WorkflowBuilder(start_executor=executor, checkpoint_storage=storage).build()

# Step 1: Run workflow to completion to ensure checkpoints are created
request_info_event: RequestInfoEvent | None = None
Expand Down Expand Up @@ -373,7 +373,7 @@ async def test_checkpoint_with_pending_request_info_events(self):

# Step 4: Create a fresh workflow and restore from checkpoint
new_executor = ApprovalRequiredExecutor(id="approval_executor")
restored_workflow = WorkflowBuilder().set_start_executor(new_executor).with_checkpointing(storage).build()
restored_workflow = WorkflowBuilder(start_executor=new_executor, checkpoint_storage=storage).build()

# Step 5: Resume from checkpoint and verify the request can be continued
completed = False
Expand Down
31 changes: 13 additions & 18 deletions python/packages/core/tests/workflow/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,27 +413,23 @@ def test_nested_workflow_executor_serialization(self) -> None:
"""
# Create innermost workflow
inner_executor = SampleExecutor(id="inner-exec")
inner_workflow = WorkflowBuilder().set_start_executor(inner_executor).set_max_iterations(10).build()
inner_workflow = WorkflowBuilder(max_iterations=10, start_executor=inner_executor).build()

# Create middle workflow with WorkflowExecutor
inner_workflow_executor = WorkflowExecutor(workflow=inner_workflow, id="inner-workflow-exec")
middle_executor = SampleExecutor(id="middle-exec")
middle_workflow = (
WorkflowBuilder()
.set_start_executor(middle_executor)
WorkflowBuilder(max_iterations=20, start_executor=middle_executor)
.add_edge(middle_executor, inner_workflow_executor)
.set_max_iterations(20)
.build()
)

# Create outer workflow with nested WorkflowExecutor
middle_workflow_executor = WorkflowExecutor(workflow=middle_workflow, id="middle-workflow-exec")
outer_executor = SampleExecutor(id="outer-exec")
outer_workflow = (
WorkflowBuilder()
.set_start_executor(outer_executor)
WorkflowBuilder(max_iterations=30, start_executor=outer_executor)
.add_edge(outer_executor, middle_workflow_executor)
.set_max_iterations(30)
.build()
)

Expand Down Expand Up @@ -543,7 +539,7 @@ def test_workflow_serialization(self) -> None:
executor1 = SampleExecutor(id="executor1")
executor2 = SampleExecutor(id="executor2")

workflow = WorkflowBuilder().add_edge(executor1, executor2).set_start_executor(executor1).build()
workflow = WorkflowBuilder(start_executor=executor1).add_edge(executor1, executor2).build()

# Test model_dump
data = workflow.to_dict()
Expand Down Expand Up @@ -616,7 +612,7 @@ def test_workflow_serialization_excludes_non_serializable_fields(self) -> None:
executor1 = SampleExecutor(id="executor1")
executor2 = SampleExecutor(id="executor2")

workflow = WorkflowBuilder().add_edge(executor1, executor2).set_start_executor(executor1).build()
workflow = WorkflowBuilder(start_executor=executor1).add_edge(executor1, executor2).build()

# Test model_dump - should not include private runtime objects
data = workflow.to_dict()
Expand All @@ -629,11 +625,11 @@ def test_workflow_serialization_excludes_non_serializable_fields(self) -> None:
def test_workflow_name_description_serialization(self) -> None:
"""Test that workflow name and description are serialized correctly."""
# Test 1: With name and description
workflow1 = (
WorkflowBuilder(name="Test Pipeline", description="Test workflow description")
.set_start_executor(SampleExecutor(id="e1"))
.build()
)
workflow1 = WorkflowBuilder(
name="Test Pipeline",
description="Test workflow description",
start_executor=SampleExecutor(id="e1"),
).build()

assert workflow1.name == "Test Pipeline"
assert workflow1.description == "Test workflow description"
Expand All @@ -649,7 +645,7 @@ def test_workflow_name_description_serialization(self) -> None:
assert parsed1["description"] == "Test workflow description"

# Test 2: Without name and description (defaults)
workflow2 = WorkflowBuilder().set_start_executor(SampleExecutor(id="e2")).build()
workflow2 = WorkflowBuilder(start_executor=SampleExecutor(id="e2")).build()

assert workflow2.name is None
assert workflow2.description is None
Expand All @@ -659,7 +655,7 @@ def test_workflow_name_description_serialization(self) -> None:
assert "description" not in data2

# Test 3: With only name (no description)
workflow3 = WorkflowBuilder(name="Named Only").set_start_executor(SampleExecutor(id="e3")).build()
workflow3 = WorkflowBuilder(name="Named Only", start_executor=SampleExecutor(id="e3")).build()

assert workflow3.name == "Named Only"
assert workflow3.description is None
Expand Down Expand Up @@ -706,8 +702,7 @@ def test_comprehensive_edge_groups_workflow_serialization() -> None:

# Build workflow with all three edge group types
workflow = (
WorkflowBuilder()
.set_start_executor(router)
WorkflowBuilder(start_executor=router)
# 1. SwitchCaseEdgeGroup: Conditional routing
.add_switch_case_edge_group(
router,
Expand Down
Loading
Loading