[BREAKING] Python: Add bulk executor registration method to WorkflowBuilder#3565
[BREAKING] Python: Add bulk executor registration method to WorkflowBuilder#3565holtvogt wants to merge 6 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a bulk executor registration method to the Python WorkflowBuilder class to reduce boilerplate when registering multiple executors. The change addresses issue #3551 by providing a dict-based API as an alternative to multiple individual register_executor() calls.
Changes:
- Added
register_executors()method that accepts adict[str, Callable[[], Executor]]for bulk registration - Added unit test
test_register_executors_bulk()to verify the new method works correctly and supports method chaining
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_workflow_builder.py |
Added register_executors() method with proper validation, documentation, and examples following existing patterns |
python/packages/core/tests/workflow/test_workflow_builder.py |
Added basic test for bulk registration functionality and chaining behavior |
|
|
||
| return self | ||
|
|
||
| def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: |
There was a problem hiding this comment.
We can replace the original register_executor API with this new one.
There was a problem hiding this comment.
@TaoChenOSU Okay! Do we plan to add deprecation notes for the old approach, or are we considering a big bang removal of register_executor() entirely? The latter would be a larger change, since it would require touching more files.
There was a problem hiding this comment.
We can add a deprecation note to the old one.
There was a problem hiding this comment.
I'd also simply prefer us to remove the old one. We're trying to get the API as clean as possible, as quickly as possible. Breaking changes are acceptable right now.
|
Hi, thank you for your contribution! This is actually something we wanted to improve too! |
|
|
||
| return self | ||
|
|
||
| def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: |
There was a problem hiding this comment.
Can we also add async callable support, too? Currently when one uses the AzureAIClient, it's expected that creating an agent is an asynchronous process - therefore we should allow for an asynchronous callable, and handle it properly.
There was a problem hiding this comment.
Should async executor factories be supported when building workflows inside an async context? If so, should we introduce an async build API (e.g., await build_async()), or is it acceptable to require building outside the event loop?
There was a problem hiding this comment.
Good question. I've been thinking through this and see a few approaches:
Option 1: Add build_async() method
Keep the builder chain synchronous, but add an async variant of the terminal build() method:
workflow = await (
WorkflowBuilder()
.register_executors({
"Agent": lambda: create_azure_agent() # async factory
})
.build_async()
)build() would detect async factories and raise a helpful error pointing users to build_async(). Users with sync-only factories continue using build() unchanged.
To be clear, this is not an "async builder" where every method in the chain is async (that would be ugly). The builder pattern stays fully synchronous - only the terminal build() has an async variant.
Option 2: Defer factory execution to runtime
Don't call factories at build time - store them and invoke lazily when the workflow runs. Since execution is already async, this handles async factories naturally without any API changes.
The problem: we lose build-time validation. Currently build() instantiates executors to validate type compatibility (handler signatures, input/output types across edges). With deferred instantiation, these become runtime errors instead of build-time errors - you wouldn't discover a type mismatch until the workflow actually runs.
Option 3: Push async instantiation to the user
Require users to pre-create async executors themselves:
agent = await create_azure_agent()
workflow = (
WorkflowBuilder()
.add_edge(agent, processor) # pass instance directly
.build()
)No async factories, no build_async(). But the problem is this breaks the isolation guarantee that factories provide. If someone creates a helper to build workflows on demand, they'd share the same executor instance
across all workflows - which is exactly what factories were designed to prevent.
I'm leaning toward Option 1. The async footprint is minimal (just the terminal method), we preserve both build-time validation and instance isolation, and users with sync-only factories are unaffected.
Thoughts?
Introduces build_async() to support async executor factories while minimizing code duplication. The sync and async build paths differ only in how they instantiate executors (sync raises on awaitables, async awaits them). All other logic (validation, edge resolution, workflow creation) is extracted into shared helpers to ensure maintainability and avoid duplicating the logic.
|
@moonbox3 The async support involved some heavier lifting, mainly to avoid duplicating existing logic and to cleanly separate reusable helpers. Does this approach look reasonable, or would you prefer handling the duplication differently? Let me know what you think! |
moonbox3
left a comment
There was a problem hiding this comment.
Thanks for working on this. Some comments/questions.
| if inspect.isawaitable(instance): | ||
| # Close un-awaited coroutines to avoid runtime warnings or memory leaks | ||
| if hasattr(instance, "close"): | ||
| instance.close() # type: ignore[reportGeneralTypeIssues] |
There was a problem hiding this comment.
if instance.close() ever throws, the ValueError is never raised. Should we wrap it in a try/except?
| if hasattr(instance, "close"): | ||
| instance.close() # type: ignore[reportGeneralTypeIssues] | ||
| raise ValueError("Async executor factories were detected. Use build_async() instead.") | ||
| factory_name_to_instance[name] = instance |
There was a problem hiding this comment.
Should we add a isinstance(instance, Executor) check here like:
if not isinstance(instance, Executor):
raise TypeError(
f"Factory '{name}' returned {type(instance).__name__} instead of an Executor."
)Same for the new async path?
Also, when a factory in a large dict raises an exception during build(), the error has no indication of which factory name caused it. A try/except that adds the factory name would help improve debuggability, I think.
| start_executor = self._start_executor | ||
|
|
||
| deferred_edge_groups: list[EdgeGroup] = [] | ||
| executor_ids_seen: set[str] = set() |
There was a problem hiding this comment.
When a duplicate ID is found, the error says which ID collided but not which two factory names produced it. What if we use a dict[str, str] (id -> name) so we could say: "ID 'agent' from factory 'B' conflicts with factory 'A'"?
There was a problem hiding this comment.
Do we have tests for:
- mixed sync+async factories with build_async()?
- coroutine
.close()actually called on sync build with async factory? build_async()when async factory raises exception?- multiple
register_executors()calls with non-overlapping names? - whitespace-only names (like " ") failing as rejected?
| raise ValueError(f"An executor factory with the name '{n}' is already registered.") | ||
| for name, factory_function in executor_factories.items(): | ||
| if not name or not name.strip(): | ||
| raise ValueError("Executor factory name cannot be empty.") |
There was a problem hiding this comment.
| raise ValueError("Executor factory name cannot be empty.") | |
| raise ValueError("Executor factory name cannot be empty or whitespace-only") |
|
|
||
| def _create_workflow_from_resolved_registry( | ||
| self, | ||
| span: Any, |
There was a problem hiding this comment.
| span: Any, | |
| span: trace.Span, |
?
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
Motivation and Context
Registering many executors currently needs repetitive
register_executorcalls. It solves this by adding a dict-based bulk registration method that reduces boilerplate. This is most useful in complex workflows with many executors.Fixes: #3551
Description
Added a bulk executor registration API to
WorkflowBuilderthat accepts adict[str, Callable[[], Executor]]. This reduces boilerplate for workflows that register many executors, while keeping the existingregister_executor()API intact. Also added a small unit test covering the new bulk registration path and chaining behavior.Contribution Checklist