Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions src/adcp/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.struct_pb2 import Value
from pydantic import BaseModel as PydanticBaseModel

from adcp.server.idempotency.backends import MemoryBackend as MemoryBackend
from adcp.server.idempotency.webhook_dedup import WebhookDedupStore as WebhookDedupStore
Expand All @@ -57,7 +58,6 @@
)
from adcp.types import GeneratedTaskStatus
from adcp.types.base import AdCPBaseModel
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
from adcp.webhook_receiver import (
LegacyHmacFallback,
VerifiedSignerLike,
Expand Down Expand Up @@ -86,7 +86,7 @@ def generate_webhook_idempotency_key() -> str:
def create_mcp_webhook_payload(
task_id: str,
status: GeneratedTaskStatus | str,
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
result: PydanticBaseModel | dict[str, Any] | None = None,
timestamp: datetime | None = None,
task_type: str | None = None,
operation_id: str | None = None,
Expand All @@ -107,7 +107,7 @@ def create_mcp_webhook_payload(
status: Current task status
task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
timestamp: When the webhook was generated (defaults to current UTC time)
result: Task-specific payload (AdCP response data)
result: Task-specific payload — any Pydantic model or plain dict
operation_id: Client-generated identifier the buyer embedded in the
webhook URL when registering push-notification config. Publishers
MUST echo this back in the payload so buyers correlate notifications
Expand Down Expand Up @@ -358,7 +358,7 @@ def sign_legacy_webhook(
return signature_headers, body_bytes


def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncResponseData | None:
def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> dict[str, Any] | None:
"""
Extract result data from webhook payload (MCP or A2A format).

Expand All @@ -376,8 +376,8 @@ def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncRes
webhook_payload: Raw webhook dictionary from HTTP request (JSON-deserialized)

Returns:
AdcpAsyncResponseData union type containing the extracted AdCP response, or None
if no result present. For A2A webhooks, unwraps data from artifacts/message parts
dict[str, Any] containing the extracted AdCP response data, or None if no
result is present. For A2A webhooks, unwraps data from artifacts/message parts
structure. For MCP webhooks, returns the result field directly.

Examples:
Expand Down Expand Up @@ -464,8 +464,8 @@ def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncRes
data = part["data"]
# Unwrap {"response": {...}} wrapper if present (A2A convention)
if isinstance(data, dict) and "response" in data and len(data) == 1:
return cast(AdcpAsyncResponseData, data["response"])
return cast(AdcpAsyncResponseData, data)
return cast(dict[str, Any], data["response"])
return cast(dict[str, Any], data)

return None

Expand All @@ -485,20 +485,20 @@ def extract_webhook_result_data(webhook_payload: dict[str, Any]) -> AdcpAsyncRes
data = part["data"]
# Unwrap {"response": {...}} wrapper if present
if isinstance(data, dict) and "response" in data and len(data) == 1:
return cast(AdcpAsyncResponseData, data["response"])
return cast(AdcpAsyncResponseData, data)
return cast(dict[str, Any], data["response"])
return cast(dict[str, Any], data)

return None

# MCP format: result field directly
return cast(AdcpAsyncResponseData | None, webhook_payload.get("result"))
return cast(dict[str, Any] | None, webhook_payload.get("result"))


def create_a2a_webhook_payload(
task_id: str,
status: GeneratedTaskStatus,
context_id: str,
result: AdcpAsyncResponseData | dict[str, Any],
result: PydanticBaseModel | dict[str, Any],
timestamp: datetime | None = None,
) -> Task | TaskStatusUpdateEvent:
"""
Expand All @@ -517,7 +517,7 @@ def create_a2a_webhook_payload(
status: Current task status
context_id: Session/conversation identifier (required by A2A protocol)
timestamp: When the webhook was generated (defaults to current UTC time)
result: Task-specific payload (AdCP response data)
result: Task-specific payload — any Pydantic model or plain dict

Returns:
Task object for terminated statuses, TaskStatusUpdateEvent for intermediate statuses
Expand All @@ -529,17 +529,18 @@ def create_a2a_webhook_payload(
>>>
>>> task = create_a2a_webhook_payload(
... task_id="task_123",
... context_id="ctx_123",
... status=GeneratedTaskStatus.completed,
... result={"products": [...]},
... message="Found 5 products"
... )
>>> # task is a Task object with artifacts containing the result

Create a working status update:
>>> event = create_a2a_webhook_payload(
... task_id="task_456",
... context_id="ctx_456",
... status=GeneratedTaskStatus.working,
... message="Processing 3 of 10 items"
... result={"current_step": "processing", "percentage": 30},
... )
>>> # event is a TaskStatusUpdateEvent with status.message

Expand Down Expand Up @@ -590,7 +591,7 @@ def create_a2a_webhook_payload(
# Build parts for the message/artifact.
parts: list[pb.Part] = []

# Convert AdcpAsyncResponseData to dict if it's a Pydantic model
# Convert Pydantic model to dict if needed
if hasattr(result, "model_dump"):
result_dict: dict[str, Any] = result.model_dump(mode="json")
else:
Expand Down
1 change: 0 additions & 1 deletion tests/test_import_layering.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
"src/adcp/utils/preview_cache.py",
"src/adcp/webhook_receiver.py",
"src/adcp/webhook_sender.py",
"src/adcp/webhooks.py",
}
)

Expand Down
83 changes: 82 additions & 1 deletion tests/test_webhook_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@
from a2a.types import TaskState, TaskStatusUpdateEvent
from google.protobuf.json_format import MessageToDict as _MessageToDict

from pydantic import BaseModel

from adcp.client import ADCPClient
from adcp.exceptions import ADCPWebhookSignatureError
from adcp.types import GeneratedTaskStatus
from adcp.types.core import AgentConfig, Protocol, TaskStatus
from adcp.webhooks import extract_webhook_result_data, get_adcp_signed_headers_for_webhook
from adcp.webhooks import (
create_a2a_webhook_payload,
create_mcp_webhook_payload,
extract_webhook_result_data,
get_adcp_signed_headers_for_webhook,
)
from tests.a2a_compat_shim import (
Artifact,
DataPart,
Expand Down Expand Up @@ -1186,6 +1194,79 @@ def test_extract_from_mcp_with_error_response(self):
assert result["errors"][0]["code"] == "INTERNAL_ERROR"


class _DeliveryResponse(BaseModel):
"""Minimal Pydantic model for testing the BaseModel branch in payload builders."""

media_buy_id: str
buyer_ref: str
packages: list[str] = []


class TestWebhookPayloadBuilderPydanticModel:
"""Pydantic BaseModel inputs to create_mcp_webhook_payload / create_a2a_webhook_payload.

Regression guard for the PydanticBaseModel branch (model_dump path inside
both builders). Prior to the fix these functions were typed to accept only
AdcpAsyncResponseData, which is a narrow discriminated union — passing any
other BaseModel subclass required a type: ignore comment even though the
runtime hasattr(result, "model_dump") guard handled it correctly.
"""

def test_create_mcp_payload_accepts_pydantic_model(self):
model = _DeliveryResponse(media_buy_id="mb_1", buyer_ref="ref_1")
payload = create_mcp_webhook_payload(
task_id="task_1",
task_type="media_buy_delivery",
status=GeneratedTaskStatus.completed,
result=model,
)
assert payload["result"] == {"media_buy_id": "mb_1", "buyer_ref": "ref_1", "packages": []}

def test_create_mcp_payload_pydantic_model_serialized_as_json(self):
model = _DeliveryResponse(media_buy_id="mb_2", buyer_ref="ref_2", packages=["pkg_a"])
payload = create_mcp_webhook_payload(
task_id="task_2",
task_type="media_buy_delivery",
status=GeneratedTaskStatus.completed,
result=model,
)
result = payload["result"]
assert isinstance(result, dict)
assert result["packages"] == ["pkg_a"]

def test_create_a2a_payload_accepts_pydantic_model_completed(self):
from a2a.types import Task as A2ATask

model = _DeliveryResponse(media_buy_id="mb_3", buyer_ref="ref_3")
task = create_a2a_webhook_payload(
task_id="task_3",
context_id="ctx_3",
status=GeneratedTaskStatus.completed,
result=model,
)
assert isinstance(task, A2ATask)
task_dict = _MessageToDict(task, preserving_proto_field_name=False)
extracted = extract_webhook_result_data(task_dict)
assert extracted is not None
assert extracted["media_buy_id"] == "mb_3"

def test_create_a2a_payload_accepts_pydantic_model_working(self):
from a2a.types import TaskStatusUpdateEvent as A2AEvent

model = _DeliveryResponse(media_buy_id="mb_4", buyer_ref="ref_4")
event = create_a2a_webhook_payload(
task_id="task_4",
context_id="ctx_4",
status=GeneratedTaskStatus.working,
result=model,
)
assert isinstance(event, A2AEvent)
event_dict = _MessageToDict(event, preserving_proto_field_name=False)
extracted = extract_webhook_result_data(event_dict)
assert extracted is not None
assert extracted["media_buy_id"] == "mb_4"


# Load official AdCP HMAC test vectors from fixtures.
# Source: adcontextprotocol/adcp PR #2478 (merged 2026-04-20), which pins the
# canonical on-wire JSON form (compact separators) and adds rejection vectors
Expand Down
Loading