Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@
get_adcp_signed_headers_for_webhook,
sign_legacy_webhook,
sign_webhook,
to_wire_dict,
)

try:
Expand Down Expand Up @@ -620,6 +621,7 @@ def get_adcp_version() -> str:
"generate_webhook_idempotency_key",
"sign_legacy_webhook",
"sign_webhook",
"to_wire_dict",
"WebhookReceiver",
"WebhookReceiverConfig",
"WebhookVerifyOptions",
Expand Down
51 changes: 38 additions & 13 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ async def deliver(
allowed_ports=allowed_ports,
)

body_dict = _payload_to_dict(payload)
body_dict = to_wire_dict(payload)
if token is not None and token_field is not None:
_validate_header_value("config.token", token)
_inject_push_token(body_dict, token, payload, token_field)
Expand Down Expand Up @@ -1036,19 +1036,37 @@ def _reserved_header_message(normalized: str, original_key: Any) -> str:
)


def _payload_to_dict(
def to_wire_dict(
payload: AdCPBaseModel | Task | TaskStatusUpdateEvent | Mapping[str, Any],
) -> dict[str, Any]:
"""Normalize a webhook payload to a JSON-ready dict.

a2a-sdk ``Task`` / ``TaskStatusUpdateEvent`` are protobuf messages and
serialize through ``MessageToDict`` with camelCase field names
(``artifact_id`` → ``artifactId``) so external A2A receivers see the
on-wire shape they expect. The protobuf default emits enum states as
``TASK_STATE_COMPLETED``; we post-process to the 0.3-compatible
lowercase form (``completed``) so existing A2A buyer webhook
receivers keep parsing. MCP-shape dicts / AdCP models are dumped
with camelCase-off defaults.
"""Serialize any AdCP webhook payload to a JSON-ready dict.

Single seam for adopters that accept "any AdCP webhook payload" — a
sender wrapping :func:`create_a2a_webhook_payload` and
:func:`create_mcp_webhook_payload` would otherwise have to write
per-shape dispatch (``isinstance`` checks, ``MessageToDict`` for
protobuf, ``model_dump`` for Pydantic, passthrough for dict). Brittle:
a future a2a-sdk that swaps protobuf for a Pydantic façade silently
changes which branch runs, and adopters duplicate the dispatch in
every send path. Use this helper instead — the dispatch lives here.

Behaviour by input shape:

* a2a ``Task`` / ``TaskStatusUpdateEvent`` (protobuf, a2a-sdk 1.0+) →
``MessageToDict(..., preserving_proto_field_name=False)`` so JSON
keys match the A2A wire spec (camelCase: ``id``, ``contextId``,
``artifactId``). Enum values are normalized from the 1.0 protobuf
form (``TASK_STATE_COMPLETED``, ``ROLE_AGENT``) to the 0.3-spec
lowercase form (``completed``, ``agent``) so 0.3 buyer receivers
keep parsing.
* Any Pydantic model (``McpWebhookPayload``, future Pydantic façades,
:class:`AdCPBaseModel` subclasses) → ``model_dump(mode="json",
exclude_none=True)``.
* ``Mapping`` → coerced to ``dict``. Legacy adopter passthrough for
callers that build the wire dict by hand.

Raises:
TypeError: payload is none of the above.
"""
if isinstance(payload, (Task, TaskStatusUpdateEvent)):
data = MessageToDict(payload, preserving_proto_field_name=False)
Expand All @@ -1057,7 +1075,13 @@ def _payload_to_dict(
if hasattr(payload, "model_dump"):
model = cast(AdCPBaseModel, payload)
return model.model_dump(mode="json", exclude_none=True)
return dict(payload)
if isinstance(payload, Mapping):
return dict(payload)
raise TypeError(
f"Unsupported webhook payload type {type(payload).__name__}: expected "
"a2a Task / TaskStatusUpdateEvent (protobuf), an AdCP Pydantic model "
"(e.g. McpWebhookPayload), or a Mapping[str, Any]."
)


def _normalize_a2a_task_state_to_v03(payload: dict[str, Any]) -> None:
Expand Down Expand Up @@ -1172,6 +1196,7 @@ def _validate_header_value(name: str, value: Any) -> None:
"generate_webhook_idempotency_key",
"get_adcp_signed_headers_for_webhook",
"sign_legacy_webhook",
"to_wire_dict",
# Sender — 9421 signing (low-level)
"sign_webhook",
# Sender — one-call outbound helpers
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures/public_api_snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@
"test_agent_a2a_no_auth",
"test_agent_client",
"test_agent_no_auth",
"to_wire_dict",
"uses_deprecated_assets_field",
"validate_adagents",
"validate_agent_authorization",
Expand Down
129 changes: 129 additions & 0 deletions tests/test_webhooks_to_wire_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Tests for ``adcp.webhooks.to_wire_dict``.

The seam exists so adopters wrapping ``create_a2a_webhook_payload`` and
``create_mcp_webhook_payload`` can serialize either return shape with one
call. The load-bearing properties:

* a2a protobuf round-trips to camelCase keys (``id``, ``contextId``,
``artifactId``) so external A2A receivers see the on-wire shape.
* MCP dicts pass through with the snake_case keys the MCP webhook
schema specifies (``task_id``, ``task_type``).
* Pydantic models dump to JSON-mode dicts so sub-models serialize too.
* Unsupported types raise ``TypeError`` at the seam — silent fallthrough
to ``str(payload)`` or similar would mask integration bugs.
"""

from __future__ import annotations

from datetime import datetime, timezone

import pytest

from adcp.types import GeneratedTaskStatus
from adcp.types.generated_poc.core.mcp_webhook_payload import McpWebhookPayload
from adcp.webhooks import (
create_a2a_webhook_payload,
create_mcp_webhook_payload,
to_wire_dict,
)


def test_a2a_task_round_trips_to_camelcase_wire_keys() -> None:
"""Terminated A2A status returns a Task → camelCase wire keys."""
payload = create_a2a_webhook_payload(
task_id="task_123",
status=GeneratedTaskStatus.completed,
context_id="ctx_456",
result={"media_buy_id": "mb_1"},
timestamp=datetime(2026, 5, 8, 12, 0, 0, tzinfo=timezone.utc),
)

wire = to_wire_dict(payload)

assert wire["id"] == "task_123"
assert wire["contextId"] == "ctx_456"
assert wire["status"]["state"] == "completed"
assert wire["artifacts"][0]["artifactId"] == "task_123_result"
# Inner DataPart preserves the AdCP response payload verbatim.
assert wire["artifacts"][0]["parts"][0]["data"] == {"media_buy_id": "mb_1"}


def test_a2a_status_update_event_round_trips_to_camelcase_wire_keys() -> None:
"""Intermediate A2A status returns a TaskStatusUpdateEvent."""
payload = create_a2a_webhook_payload(
task_id="task_789",
status=GeneratedTaskStatus.working,
context_id="ctx_789",
result={"current_step": "processing", "percentage": 50},
)

wire = to_wire_dict(payload)

assert wire["taskId"] == "task_789"
assert wire["contextId"] == "ctx_789"
assert wire["status"]["state"] == "working"
assert wire["status"]["message"]["role"] == "agent"
assert wire["status"]["message"]["parts"][0]["data"] == {
"current_step": "processing",
"percentage": 50,
}


def test_mcp_dict_passes_through_with_snake_case_keys() -> None:
"""MCP wire shape is snake_case per mcp-webhook-payload.json."""
payload = create_mcp_webhook_payload(
task_id="task_123",
task_type="create_media_buy",
status="completed",
result={"media_buy_id": "mb_1"},
idempotency_key="whk_01HW9D2T3VXQ5M7K9N1P3R5S7U",
)

wire = to_wire_dict(payload)

assert wire["task_id"] == "task_123"
assert wire["task_type"] == "create_media_buy"
assert wire["status"] == "completed"
assert wire["result"] == {"media_buy_id": "mb_1"}
assert wire["idempotency_key"] == "whk_01HW9D2T3VXQ5M7K9N1P3R5S7U"


def test_mcp_pydantic_model_dumps_to_snake_case_wire_keys() -> None:
"""Adopters that construct ``McpWebhookPayload`` directly get the
same wire shape as the dict path — single seam, no per-shape branch.
"""
model = McpWebhookPayload(
idempotency_key="whk_01HW9D2T3VXQ5M7K9N1P3R5S7U",
task_id="task_456",
task_type="create_media_buy",
status="completed",
timestamp=datetime(2026, 5, 8, 12, 0, 0, tzinfo=timezone.utc),
)

wire = to_wire_dict(model)

assert wire["task_id"] == "task_456"
assert wire["task_type"] == "create_media_buy"
assert wire["status"] == "completed"
# ``mode="json", exclude_none=True`` is load-bearing — None fields
# would otherwise pollute the wire body.
assert "operation_id" not in wire
assert "context_id" not in wire


def test_plain_dict_passes_through_unchanged() -> None:
"""Hand-built dicts (legacy adopter passthrough) round-trip verbatim."""
raw = {"task_id": "t1", "status": "working", "extra": {"nested": True}}

wire = to_wire_dict(raw)

assert wire == raw
# Defensive copy — caller mutating the returned dict must not
# mutate the input.
assert wire is not raw


def test_unsupported_type_raises_type_error() -> None:
"""Silent fallthrough would mask integration bugs — fail loud."""
with pytest.raises(TypeError, match="Unsupported webhook payload type"):
to_wire_dict("not a payload") # type: ignore[arg-type]
Loading