Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@
RequestExternalInputExecutor,
WaitForInputExecutor,
)
from ._executors_tools import (
FUNCTION_TOOL_REGISTRY_KEY,
TOOL_ACTION_EXECUTORS,
TOOL_APPROVAL_STATE_KEY,
BaseToolExecutor,
InvokeFunctionToolExecutor,
ToolApprovalRequest,
ToolApprovalResponse,
ToolApprovalState,
ToolInvocationResult,
)
from ._factory import DeclarativeWorkflowError, WorkflowFactory
from ._handlers import ActionHandler, action_handler, get_action_handler
from ._human_input import (
Expand All @@ -86,6 +97,9 @@
"CONTROL_FLOW_EXECUTORS",
"DECLARATIVE_STATE_KEY",
"EXTERNAL_INPUT_EXECUTORS",
"FUNCTION_TOOL_REGISTRY_KEY",
"TOOL_ACTION_EXECUTORS",
"TOOL_APPROVAL_STATE_KEY",
"TOOL_REGISTRY_KEY",
"ActionComplete",
"ActionHandler",
Expand All @@ -95,6 +109,7 @@
"AgentInvocationError",
"AgentResult",
"AppendValueExecutor",
"BaseToolExecutor",
"BreakLoopExecutor",
"ClearAllVariablesExecutor",
"ConfirmationExecutor",
Expand All @@ -116,6 +131,7 @@
"ForeachInitExecutor",
"ForeachNextExecutor",
"InvokeAzureAgentExecutor",
"InvokeFunctionToolExecutor",
"InvokeToolExecutor",
"JoinExecutor",
"LoopControl",
Expand All @@ -129,6 +145,10 @@
"SetTextVariableExecutor",
"SetValueExecutor",
"SetVariableExecutor",
"ToolApprovalRequest",
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WaitForInputExecutor",
"WorkflowFactory",
"WorkflowState",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Loop edges for foreach
"""

import logging
from typing import Any

from agent_framework._workflows import (
Expand All @@ -36,13 +37,18 @@
SwitchEvaluatorExecutor,
)
from ._executors_external_input import EXTERNAL_INPUT_EXECUTORS
from ._executors_tools import TOOL_ACTION_EXECUTORS, InvokeFunctionToolExecutor

logger = logging.getLogger(__name__)


# Combined mapping of all action kinds to executor classes
ALL_ACTION_EXECUTORS = {
**BASIC_ACTION_EXECUTORS,
**CONTROL_FLOW_EXECUTORS,
**AGENT_ACTION_EXECUTORS,
**EXTERNAL_INPUT_EXECUTORS,
**TOOL_ACTION_EXECUTORS,
}

# Action kinds that terminate control flow (no fall-through to successor)
Expand All @@ -66,6 +72,7 @@
"RequestHumanInput": ["variable"],
"WaitForHumanInput": ["variable"],
"EmitEvent": ["event"],
"InvokeFunctionTool": ["functionName"],
}

# Alternate field names that satisfy required field requirements
Expand Down Expand Up @@ -106,6 +113,7 @@ def __init__(
yaml_definition: dict[str, Any],
workflow_id: str | None = None,
agents: dict[str, Any] | None = None,
tools: dict[str, Any] | None = None,
checkpoint_storage: Any | None = None,
validate: bool = True,
):
Expand All @@ -115,6 +123,7 @@ def __init__(
yaml_definition: The parsed YAML workflow definition
workflow_id: Optional ID for the workflow (defaults to name from YAML)
agents: Registry of agent instances by name (for InvokeAzureAgent actions)
tools: Registry of tool/function instances by name (for InvokeFunctionTool actions)
checkpoint_storage: Optional checkpoint storage for pause/resume support
validate: Whether to validate the workflow definition before building (default: True)
"""
Expand All @@ -123,6 +132,7 @@ def __init__(
self._executors: dict[str, Any] = {} # id -> executor
self._action_index = 0 # Counter for generating unique IDs
self._agents = agents or {} # Agent registry for agent executors
self._tools = tools or {} # Tool registry for tool executors
self._checkpoint_storage = checkpoint_storage
self._pending_gotos: list[tuple[Any, str]] = [] # (goto_executor, target_id)
self._validate = validate
Expand Down Expand Up @@ -404,8 +414,13 @@ def _create_executor_for_action(
executor_class = ALL_ACTION_EXECUTORS.get(kind)

if executor_class is None:
# Unknown action type - skip with warning
# In production, might want to log this
# Unknown action type - log warning and skip
logger.warning(
"Unknown action kind '%s' encountered at index %d - action will be skipped. Available action kinds: %s",
kind,
self._action_index,
list(ALL_ACTION_EXECUTORS.keys()),
)
return None

# Create the executor with ID
Expand All @@ -418,10 +433,12 @@ def _create_executor_for_action(
action_id = f"{parent_id}_{kind}_{self._action_index}" if parent_id else f"{kind}_{self._action_index}"
self._action_index += 1

# Pass agents to agent-related executors
# Pass agents/tools to specialized executors
executor: Any
if kind in ("InvokeAzureAgent",):
executor = InvokeAzureAgentExecutor(action_def, id=action_id, agents=self._agents)
elif kind == "InvokeFunctionTool":
executor = InvokeFunctionToolExecutor(action_def, id=action_id, tools=self._tools)
else:
executor = executor_class(action_def, id=action_id)
self._executors[action_id] = executor
Expand Down
Loading
Loading