Skip to content

Commit 2d1ae2f

Browse files
bokelleyclaude
andauthored
feat(testing): add SellerA2AClient for in-process A2A handler testing (#694)
* feat(testing): add SellerA2AClient for in-process A2A handler testing Refs #678. A2A sibling to SellerTestClient. Same call shape (`await client.invoke(skill, payload)`), same return type (ToolInvokeResult), but routes through the A2A executor + event-queue dispatch path instead of MCP's tool call. This eliminates the same boilerplate adopters currently rewrite in every A2A test file: ADCPAgentExecutor construction, RequestContext + SendMessageRequest + DataPart proto plumbing, EventQueueLegacy drain loop, terminal-Task projection back to a dict, plus separate handling of the structured (adcp_error in DataPart) vs. unstructured (FAILED Task without adcp_error) error paths. The harness drains the event queue with a bounded loop + per-event timeout so a buggy handler that never publishes a terminal event can't hang the test runner. Unstructured A2A failures (unknown skill, unparseable message) are synthesized into an INTERNAL_ERROR-coded AdcpErrorPayload so callers can assert on `result.adcp_error` uniformly across both failure modes. Explicitly out of scope for this PR (tracked as follow-ups on #678): - Push-notification capture sink for asserting on outbound signed webhooks - Intermediate-state observation (working/input_required transitions) - Task cancellation harness These depend on the TaskStore + push-notification + middleware hooks that are themselves deferred per the framework's A2A adoption roadmap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(testing): validation round-trip on SellerA2AClient Per PR #694 review: prove the `validation=` parameter actually engages the validation hook chain end-to-end, not just that __init__ accepts it. The stub returns `creative_formats: []` while the spec requires `formats`. With validation off (the test default) this passes through; with SERVER_DEFAULT_VALIDATION the response-side validator rejects and surfaces a structured VALIDATION_ERROR with `side: response` in details. Asserting on that proves the harness threads validation through the A2A executor's event-queue → terminal-Task path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent de65957 commit 2d1ae2f

3 files changed

Lines changed: 409 additions & 5 deletions

File tree

src/adcp/testing/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
from __future__ import annotations
2222

2323
from adcp.testing.decisioning import build_asgi_app, build_test_client, make_request_context
24-
from adcp.testing.harness import AdcpErrorPayload, SellerTestClient, ToolInvokeResult
24+
from adcp.testing.harness import (
25+
AdcpErrorPayload,
26+
SellerA2AClient,
27+
SellerTestClient,
28+
ToolInvokeResult,
29+
)
2530
from adcp.testing.test_helpers import (
2631
CREATIVE_AGENT_CONFIG,
2732
TEST_AGENT_A2A_CONFIG,
@@ -40,6 +45,7 @@
4045

4146
__all__ = [
4247
"AdcpErrorPayload",
48+
"SellerA2AClient",
4349
"SellerTestClient",
4450
"ToolInvokeResult",
4551
"build_asgi_app",

src/adcp/testing/harness.py

Lines changed: 227 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
"""SellerTestClient — in-process MCP harness for AdCP seller unit tests."""
1+
"""SellerTestClient / SellerA2AClient — in-process AdCP harnesses for unit tests."""
22

33
from __future__ import annotations
44

55
import asyncio
66
from dataclasses import dataclass
77
from typing import TYPE_CHECKING, Any
8+
from uuid import uuid4
89

910
if TYPE_CHECKING:
1011
from adcp.decisioning import DecisioningPlatform
@@ -72,8 +73,9 @@ async def test_get_products_success(seller):
7273
or SSE framing. For HTTP-level tests (auth middleware, CORS, size
7374
limits), use :func:`adcp.testing.build_test_client` directly.
7475
75-
A2A transport is not yet supported (A2A is served via a separate
76-
ASGI app; tracked as a follow-up on #662).
76+
For A2A transport, use :class:`SellerA2AClient` — same call
77+
shape (:meth:`SellerA2AClient.invoke`), different in-process
78+
dispatch path (executor + event queue rather than MCP tool call).
7779
7880
``run_scenario()`` is not implemented — it requires bundled
7981
compliance scenario playbooks that are not yet available in this SDK.
@@ -203,4 +205,225 @@ async def invoke(
203205
return ToolInvokeResult(data=data, adcp_error=adcp_error, structured_content=structured)
204206

205207

206-
__all__ = ["AdcpErrorPayload", "SellerTestClient", "ToolInvokeResult"]
208+
class SellerA2AClient:
209+
"""In-process A2A test client for AdCP seller implementations.
210+
211+
The A2A sibling to :class:`SellerTestClient`. Same call shape
212+
(``await client.invoke(skill, payload)``), same return type
213+
(:class:`ToolInvokeResult`), but routes through the A2A executor
214+
+ event-queue dispatch path instead of MCP's tool call.
215+
216+
Usage::
217+
218+
@pytest.fixture
219+
def seller_a2a():
220+
return SellerA2AClient(MySeller())
221+
222+
async def test_buy_not_found_a2a(seller_a2a):
223+
result = await seller_a2a.invoke(
224+
"update_media_buy", {"media_buy_id": "missing", ...}
225+
)
226+
assert not result.ok
227+
assert result.adcp_error.code == "MEDIA_BUY_NOT_FOUND"
228+
229+
The harness constructs an :class:`~adcp.server.a2a_server.ADCPAgentExecutor`,
230+
builds a minimal A2A :class:`~a2a.server.agent_execution.context.RequestContext`
231+
carrying a ``DataPart`` with ``{"skill": ..., "parameters": ...}``,
232+
runs the executor against a fresh event queue, and drains the queue
233+
until a terminal Task arrives. Success payloads come from the Task's
234+
first DataPart artifact; structured errors land in that same DataPart
235+
keyed under ``adcp_error`` per transport-errors.mdx §A2A Binding.
236+
237+
Limitations (each tracked as a follow-up on #678):
238+
239+
* **No push-notification capture.** Adopters who need to assert on
240+
outbound signed webhook delivery need a sink primitive this
241+
client doesn't yet provide.
242+
* **No intermediate state observation.** :meth:`invoke` drains to
243+
the terminal Task and returns. Tasks that pass through
244+
``working`` / ``input_required`` are observed only at their final
245+
state.
246+
* **No task cancellation harness.** Once :meth:`invoke` is awaited
247+
there is no handle to cancel a long-running task.
248+
"""
249+
250+
def __init__(
251+
self,
252+
platform: DecisioningPlatform,
253+
*,
254+
validation: ValidationHookConfig | None = None,
255+
) -> None:
256+
"""
257+
Args:
258+
platform: The :class:`~adcp.decisioning.DecisioningPlatform`
259+
instance under test.
260+
validation: Schema validation config. ``None`` (default) disables
261+
validation so tests focus on handler behavior, not schema
262+
conformance. Pass
263+
:data:`~adcp.validation.client_hooks.SERVER_DEFAULT_VALIDATION`
264+
to match production behavior.
265+
"""
266+
self._platform = platform
267+
self._validation = validation
268+
self._executor: Any | None = None
269+
self._executor_lock = asyncio.Lock()
270+
271+
def _build_executor_sync(self) -> Any:
272+
from adcp.decisioning.serve import create_adcp_server_from_platform
273+
from adcp.server.a2a_server import ADCPAgentExecutor
274+
275+
handler, _executor, _registry = create_adcp_server_from_platform(
276+
self._platform,
277+
auto_emit_completion_webhooks=False,
278+
)
279+
return ADCPAgentExecutor(handler, validation=self._validation)
280+
281+
async def _ensure_executor(self) -> Any:
282+
async with self._executor_lock:
283+
if self._executor is None:
284+
# create_adcp_server_from_platform calls asyncio.run() internally
285+
# (via validate_capabilities_response_shape) — must run in a thread
286+
# to avoid "cannot be called from a running event loop".
287+
self._executor = await asyncio.to_thread(self._build_executor_sync)
288+
return self._executor
289+
290+
async def invoke(
291+
self,
292+
skill: str,
293+
payload: dict[str, Any] | None = None,
294+
*,
295+
timeout_seconds: float = 5.0,
296+
) -> ToolInvokeResult:
297+
"""Invoke a skill and return the terminal-state result.
298+
299+
Args:
300+
skill: AdCP skill name (e.g. ``"update_media_buy"``).
301+
payload: Arguments forwarded to the skill. ``None`` → empty dict.
302+
timeout_seconds: Per-event dequeue timeout. The harness drains
303+
the event queue waiting for a terminal Task; this caps how
304+
long any single dequeue waits. Default 5s.
305+
306+
Returns:
307+
:class:`ToolInvokeResult` — ``ok`` reflects whether the terminal
308+
Task state was ``COMPLETED``; ``adcp_error`` carries the structured
309+
error from a failed Task's DataPart per the A2A binding.
310+
"""
311+
from a2a import types as pb
312+
from a2a.auth.user import UnauthenticatedUser
313+
from a2a.server.agent_execution.context import (
314+
RequestContext as A2ARequestContext,
315+
)
316+
from a2a.server.context import ServerCallContext
317+
from a2a.server.events.event_queue import EventQueueLegacy
318+
from google.protobuf.json_format import MessageToDict, ParseDict
319+
from google.protobuf.struct_pb2 import Value
320+
321+
executor = await self._ensure_executor()
322+
kwargs = payload or {}
323+
324+
# Build the DataPart-shaped skill invocation that ADCPAgentExecutor's
325+
# default parser accepts: {"skill": "...", "parameters": {...}}.
326+
value = Value()
327+
ParseDict({"skill": skill, "parameters": kwargs}, value)
328+
msg = pb.Message(
329+
message_id=str(uuid4()),
330+
role="ROLE_USER",
331+
parts=[pb.Part(data=value)],
332+
)
333+
call_ctx = ServerCallContext(user=UnauthenticatedUser())
334+
request_ctx = A2ARequestContext(
335+
call_context=call_ctx,
336+
request=pb.SendMessageRequest(message=msg),
337+
)
338+
queue = EventQueueLegacy()
339+
await executor.execute(request_ctx, queue)
340+
341+
# Drain the event queue until a terminal Task arrives. Bounded so a
342+
# buggy handler that never publishes a terminal event can't hang the
343+
# test runner — each dequeue carries `timeout_seconds`, and the loop
344+
# is bounded to a small number of intermediate state events.
345+
terminal_task: Any = None
346+
for _ in range(32):
347+
try:
348+
event = await asyncio.wait_for(queue.dequeue_event(), timeout=timeout_seconds)
349+
except asyncio.TimeoutError:
350+
break
351+
if isinstance(event, pb.Task) and event.status.state in (
352+
pb.TaskState.TASK_STATE_COMPLETED,
353+
pb.TaskState.TASK_STATE_FAILED,
354+
pb.TaskState.TASK_STATE_CANCELED,
355+
):
356+
terminal_task = event
357+
break
358+
359+
if terminal_task is None:
360+
raise RuntimeError(
361+
f"A2A executor for skill={skill!r} produced no terminal Task "
362+
f"within {timeout_seconds}s — check executor middleware for hangs"
363+
)
364+
365+
# Project the terminal Task's first DataPart artifact to a dict.
366+
structured: dict[str, Any] = {}
367+
if terminal_task.artifacts:
368+
for part in terminal_task.artifacts[0].parts:
369+
if part.WhichOneof("content") == "data":
370+
projected = MessageToDict(part.data)
371+
if isinstance(projected, dict):
372+
structured = projected
373+
break
374+
375+
raw_error = structured.get("adcp_error")
376+
377+
adcp_error: AdcpErrorPayload | None = None
378+
if raw_error is not None:
379+
code = raw_error.get("code")
380+
message = raw_error.get("message")
381+
if not code:
382+
raise RuntimeError(
383+
"adcp_error envelope is missing required 'code' field; "
384+
"server is non-conformant to AdCP transport-errors spec"
385+
)
386+
if not message:
387+
raise RuntimeError(
388+
"adcp_error envelope is missing required 'message' field; "
389+
"server is non-conformant to AdCP transport-errors spec"
390+
)
391+
adcp_error = AdcpErrorPayload(
392+
code=code,
393+
message=message,
394+
recovery=raw_error.get("recovery"),
395+
field=raw_error.get("field"),
396+
suggestion=raw_error.get("suggestion"),
397+
retry_after=raw_error.get("retry_after"),
398+
details=raw_error.get("details"),
399+
)
400+
elif terminal_task.status.state == pb.TaskState.TASK_STATE_FAILED:
401+
# A2A unstructured-error path: failed Task without an `adcp_error`
402+
# DataPart (unknown skill, no parseable skill, transport-layer
403+
# rejection). Synthesize an envelope so callers can assert on
404+
# ``result.adcp_error`` uniformly across structured/unstructured
405+
# failures.
406+
status_msg = ""
407+
if terminal_task.status.HasField("message"):
408+
for part in terminal_task.status.message.parts:
409+
if part.WhichOneof("content") == "text":
410+
status_msg = part.text
411+
break
412+
adcp_error = AdcpErrorPayload(
413+
code="INTERNAL_ERROR",
414+
message=status_msg or "A2A task failed without structured adcp_error envelope",
415+
)
416+
417+
data: dict[str, Any] | None = None
418+
if raw_error is None and structured:
419+
data = dict(structured)
420+
421+
return ToolInvokeResult(data=data, adcp_error=adcp_error, structured_content=structured)
422+
423+
424+
__all__ = [
425+
"AdcpErrorPayload",
426+
"SellerA2AClient",
427+
"SellerTestClient",
428+
"ToolInvokeResult",
429+
]

0 commit comments

Comments
 (0)