Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions src/adcp/server/a2a_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,22 @@
from google.protobuf.struct_pb2 import Value
from starlette.applications import Starlette

from adcp.exceptions import ADCPError, ADCPTaskError
from adcp.exceptions import ADCPError
from adcp.server.base import ADCPHandler, ToolContext

# Decisioning-layer ``AdcpError`` (from ``adcp.decisioning.types``) is the
# wire-shaped structured error platform methods raise. It is NOT a subclass
# of :class:`adcp.exceptions.ADCPError`; the executor must catch both so
# storyboards graded against decisioning adopters see the same structured
# envelope as MCP. Lazy import — ``adcp.decisioning`` pulls in the
# decisioning graph, which the A2A server module shouldn't load at import
# time. When the import fails (decisioning extra not installed), only the
# client-side ``ADCPError`` path is active.
try:
from adcp.decisioning.types import AdcpError as _DecisioningAdcpError
except Exception: # pragma: no cover - decisioning is an optional dep surface
_DecisioningAdcpError = None # type: ignore[assignment,misc]

if TYPE_CHECKING:
from collections.abc import Sequence

Expand Down Expand Up @@ -73,7 +86,6 @@
"""


from adcp.server.helpers import STANDARD_ERROR_CODES # noqa: E402
from adcp.server.mcp_tools import create_tool_caller, get_tools_for_handler
from adcp.server.test_controller import TestControllerStore, _handle_test_controller

Expand Down Expand Up @@ -203,15 +215,24 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
return

tool_context = self._build_tool_context(skill_name, context)
# Catch both the client-side :class:`ADCPError` (raised by
# framework helpers like ``IdempotencyConflictError``) AND the
# decisioning-layer :class:`AdcpError` (raised by platform methods
# adopters write against the decisioning graph). They are
# disjoint hierarchies; both project onto the same structured
# ``adcp_error`` envelope per transport-errors.mdx §A2A Binding.
structured_error_types: tuple[type[BaseException], ...] = (ADCPError,)
if _DecisioningAdcpError is not None:
structured_error_types = (ADCPError, _DecisioningAdcpError)
try:
result = await self._dispatch_with_middleware(skill_name, params, tool_context)
await self._send_result(event_queue, context, skill_name, result)
except ADCPError as exc:
# Application-layer AdCP error (IdempotencyConflictError etc.).
# Emit a failed task with the adcp_error in a DataPart per
# transport-errors.mdx §A2A Binding, plus a human-readable text
# part. The JSON-RPC channel is reserved for transport-level
# errors (auth rejected, rate-limited pre-dispatch).
except structured_error_types as exc:
# Application-layer AdCP error. Emit a failed task with the
# adcp_error in a DataPart per transport-errors.mdx §A2A
# Binding, plus a human-readable text part. The JSON-RPC
# channel is reserved for transport-level errors (auth
# rejected, rate-limited pre-dispatch).
logger.info("AdCP application error for skill %s: %s", skill_name, exc)
await self._send_adcp_error(event_queue, context, exc)
except Exception:
Expand Down Expand Up @@ -406,37 +427,51 @@ async def _send_adcp_error(
self,
event_queue: EventQueue,
context: RequestContext,
exc: ADCPError,
exc: Any,
) -> None:
"""Publish a failed task carrying an AdCP ``adcp_error`` payload.

Follows transport-errors.mdx §A2A Binding: failed task with artifact
containing a ``DataPart`` keyed under ``adcp_error`` plus a terse
``TextPart`` for human/LLM consumption.

The structured envelope carries the full spec shape — ``code``,
``message``, ``recovery``, ``field``, ``suggestion``,
``retry_after``, ``details`` — populated when the raised
exception supplies them, omitted when ``None``. Field extraction
is shared with the MCP path via
:func:`adcp.server.translate._extract_structured_fields`, so
both transports project off the same source-of-truth shape.
"""
# Derive the spec error code. ADCPTaskError carries a list of codes
# (e.g. IdempotencyConflictError → IDEMPOTENCY_CONFLICT); fall back
# to a generic INTERNAL_ERROR when the exception doesn't supply one.
code = "INTERNAL_ERROR"
if isinstance(exc, ADCPTaskError) and exc.error_codes:
code = str(exc.error_codes[0])
# Lazy import — ``translate.py`` pulls in heavier server deps
# (mcp.types) which the A2A module doesn't otherwise need.
from adcp.server.translate import _extract_structured_fields

code, message, recovery, field, suggestion, details, _errors = _extract_structured_fields(
exc
)

adcp_error: dict[str, Any] = {
"code": code,
"message": exc.message,
"message": message,
"recovery": recovery,
}
recovery = STANDARD_ERROR_CODES.get(code, {}).get("recovery")
if recovery:
adcp_error["recovery"] = recovery
suggestion = getattr(exc, "suggestion", None)
if suggestion:
if field is not None:
adcp_error["field"] = field
if suggestion is not None:
adcp_error["suggestion"] = suggestion
# ``retry_after`` lives on decisioning AdcpError; project when present.
retry_after = getattr(exc, "retry_after", None)
if retry_after is not None:
adcp_error["retry_after"] = retry_after
if details:
adcp_error["details"] = dict(details)

task = _make_task(
context,
state=pb.TaskState.TASK_STATE_FAILED,
data={"adcp_error": adcp_error},
message=exc.message,
message=message,
)
await event_queue.enqueue_event(task)

Expand Down
Loading
Loading