Skip to content

Commit 9eb962c

Browse files
bokelleyclaude
andauthored
feat(webhooks)!: create_mcp_webhook_payload returns McpWebhookPayload (#632)
Closes #607. `create_mcp_webhook_payload` now returns a typed `McpWebhookPayload` instance instead of `dict[str, Any]`. Adopters drop the `McpWebhookPayload.model_construct(**dict)` ceremony and get attribute access + IDE autocomplete directly. Pair with `to_wire_dict(payload)` for HTTP transport. `task_type` becomes a required `TaskType | str` argument and is validated against the closed `TaskType` enum at construction. Synchronous-only operations (`get_products`, `list_creatives`, `preview_creative`, etc.) are deliberately not in the enum because they don't flow through the task management system — they shouldn't appear in webhook payloads. Failing fast at the publisher surfaces the bug here rather than as a silent receiver-side rejection. Also: - `WebhookSender.send_mcp` and `WebhookDeliverySupervisor.send_mcp` propagate the same required `task_type: TaskType | str`. - The PG supervisor coerces `TaskType` enum to its on-wire string before binding to psycopg, preventing `repr()` corruption of the `adcp_webhook_delivery_queue.task_type` column. - `webhook_sender.py:550` callsite uses `payload.idempotency_key` and `to_wire_dict(payload)` for HTTP transport. BREAKING CHANGE: `create_mcp_webhook_payload` returns a Pydantic model, not a dict; `task_type` is now required. Migration: | Old | New | |---|---| | `payload["task_id"]` | `payload.task_id` | | `json=payload` (HTTP) | `json=to_wire_dict(payload)` | | `create_mcp_webhook_payload(...)` without `task_type` | required arg | Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 14d294c commit 9eb962c

11 files changed

Lines changed: 241 additions & 125 deletions

src/adcp/webhook_sender.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
build_async_ip_pinned_transport,
5151
)
5252
from adcp.signing.standard_webhooks import decode_secret as _decode_sw_secret
53-
from adcp.types import GeneratedTaskStatus
53+
from adcp.types import GeneratedTaskStatus, TaskType
5454
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
5555
from adcp.webhook_auth import (
5656
AdcpLegacyHmacStrategy,
@@ -68,6 +68,7 @@
6868
from adcp.webhooks import (
6969
create_mcp_webhook_payload,
7070
generate_webhook_idempotency_key,
71+
to_wire_dict,
7172
)
7273

7374
# The signer emits a signature valid for 300 seconds; anything beyond that
@@ -521,7 +522,7 @@ async def send_mcp(
521522
url: str,
522523
task_id: str,
523524
status: GeneratedTaskStatus | str,
524-
task_type: str | None = None,
525+
task_type: TaskType | str,
525526
result: AdcpAsyncResponseData | dict[str, Any] | None = None,
526527
timestamp: datetime | None = None,
527528
operation_id: str | None = None,
@@ -562,8 +563,8 @@ async def send_mcp(
562563
)
563564
return await self.send_raw(
564565
url=url,
565-
idempotency_key=str(payload["idempotency_key"]),
566-
payload=payload,
566+
idempotency_key=payload.idempotency_key,
567+
payload=to_wire_dict(payload),
567568
extra_headers=extra_headers,
568569
)
569570

src/adcp/webhook_supervisor.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@
5656
# isort rule.
5757
UTC = timezone.utc
5858

59+
# Runtime import — used to coerce TaskType enum to its on-wire string at
60+
# the audit-log boundary. Must not be inside TYPE_CHECKING.
61+
from adcp.types import TaskType # noqa: E402
62+
5963
if TYPE_CHECKING:
6064
from adcp.types import GeneratedTaskStatus
6165
from adcp.webhook_sender import WebhookDeliveryResult, WebhookSender
@@ -210,7 +214,7 @@ async def send_mcp(
210214
url: str,
211215
task_id: str,
212216
status: GeneratedTaskStatus | str,
213-
task_type: str | None = None,
217+
task_type: TaskType | str,
214218
result: Any = None,
215219
token: str | None = None,
216220
sequence_key: str | None = None,
@@ -397,7 +401,7 @@ async def send_mcp(
397401
url: str,
398402
task_id: str,
399403
status: GeneratedTaskStatus | str,
400-
task_type: str | None = None,
404+
task_type: TaskType | str,
401405
result: Any = None,
402406
token: str | None = None,
403407
sequence_key: str | None = None,
@@ -443,6 +447,9 @@ async def send_mcp(
443447
"""
444448
breaker = self._breaker_for(breaker_key or url)
445449
sequence_number: int | None = None # allocated AFTER breaker check
450+
# Audit log + DeliveryAttempt store the on-wire string; TaskType
451+
# enum is normalized once here so every record sees the same value.
452+
task_type_str: str = task_type.value if isinstance(task_type, TaskType) else task_type
446453

447454
if not breaker.can_attempt():
448455
await self._record(
@@ -459,7 +466,7 @@ async def send_mcp(
459466
occurred_at=datetime.now(UTC),
460467
will_retry=False,
461468
next_retry_at=None,
462-
task_type=task_type,
469+
task_type=task_type_str,
463470
task_id=task_id,
464471
payload_size_bytes=None,
465472
notification_type=notification_type,
@@ -521,7 +528,7 @@ async def send_mcp(
521528
occurred_at=attempt_started,
522529
will_retry=False,
523530
next_retry_at=None,
524-
task_type=task_type,
531+
task_type=task_type_str,
525532
task_id=task_id,
526533
payload_size_bytes=len(last_result.sent_body),
527534
notification_type=notification_type,
@@ -556,7 +563,7 @@ async def send_mcp(
556563
occurred_at=attempt_started,
557564
will_retry=will_retry,
558565
next_retry_at=next_retry_at,
559-
task_type=task_type,
566+
task_type=task_type_str,
560567
task_id=task_id,
561568
payload_size_bytes=len(last_result.sent_body),
562569
notification_type=notification_type,
@@ -588,7 +595,7 @@ async def send_mcp(
588595
occurred_at=attempt_started,
589596
will_retry=will_retry,
590597
next_retry_at=next_retry_at,
591-
task_type=task_type,
598+
task_type=task_type_str,
592599
task_id=task_id,
593600
payload_size_bytes=None,
594601
notification_type=notification_type,

src/adcp/webhook_supervisor_pg.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
except ImportError:
141141
PG_AVAILABLE = False
142142

143+
from adcp.types import TaskType
143144
from adcp.webhook_supervisor import (
144145
CircuitBreakerPolicy,
145146
DeliveryAttempt,
@@ -412,7 +413,7 @@ async def send_mcp(
412413
url: str,
413414
task_id: str,
414415
status: GeneratedTaskStatus | str,
415-
task_type: str | None = None,
416+
task_type: TaskType | str,
416417
result: Any = None,
417418
token: str | None = None,
418419
sequence_key: str | None = None,
@@ -444,6 +445,12 @@ async def send_mcp(
444445
"is running. Call asyncio.create_task(supervisor.run_worker()) at startup."
445446
)
446447

448+
# The queue column is TEXT; psycopg would otherwise bind a TaskType
449+
# enum via repr (`"TaskType.create_media_buy"`), corrupting the wire
450+
# value. Normalize once here so every persistence + log site sees
451+
# the on-wire string.
452+
task_type_str: str = task_type.value if isinstance(task_type, TaskType) else task_type
453+
447454
bkey = breaker_key or url
448455

449456
# Check circuit state; reject immediately if OPEN within the timeout window.
@@ -473,7 +480,7 @@ async def send_mcp(
473480
occurred_at=occurred_at,
474481
will_retry=False,
475482
next_retry_at=None,
476-
task_type=task_type,
483+
task_type=task_type_str,
477484
task_id=task_id,
478485
payload_size_bytes=None,
479486
notification_type=notification_type,
@@ -483,7 +490,7 @@ async def send_mcp(
483490
logger.warning(
484491
"[adcp.webhook_supervisor_pg] circuit OPEN for %s — skipped %s",
485492
bkey,
486-
task_type or "webhook",
493+
task_type_str,
487494
)
488495
return None
489496
# Open timeout elapsed; transition to half_open so next worker probes.
@@ -500,7 +507,7 @@ async def send_mcp(
500507
bkey,
501508
url,
502509
task_id,
503-
task_type,
510+
task_type_str,
504511
status_str,
505512
result_json,
506513
token,

src/adcp/webhooks.py

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
WebhookVerifyOptions,
5757
verify_webhook_signature,
5858
)
59-
from adcp.types import GeneratedTaskStatus
59+
from adcp.types import GeneratedTaskStatus, McpWebhookPayload, TaskType
6060
from adcp.types.base import AdCPBaseModel
6161
from adcp.webhook_receiver import (
6262
LegacyHmacFallback,
@@ -86,72 +86,87 @@ def generate_webhook_idempotency_key() -> str:
8686
def create_mcp_webhook_payload(
8787
task_id: str,
8888
status: GeneratedTaskStatus | str,
89+
task_type: TaskType | str,
90+
*,
8991
result: PydanticBaseModel | dict[str, Any] | None = None,
9092
timestamp: datetime | None = None,
91-
task_type: str | None = None,
9293
operation_id: str | None = None,
9394
message: str | None = None,
9495
context_id: str | None = None,
9596
domain: str | None = None,
9697
idempotency_key: str | None = None,
9798
token: str | None = None,
98-
) -> dict[str, Any]:
99+
) -> McpWebhookPayload:
99100
"""
100-
Create MCP webhook payload dictionary.
101+
Build an :class:`McpWebhookPayload` for a tracked async task.
101102
102-
This function helps agent implementations construct properly formatted
103-
webhook payloads for sending to clients.
103+
Pair with :func:`to_wire_dict` for HTTP transport — Pydantic-typed at
104+
construction so the publisher catches schema drift before it leaves
105+
the process.
106+
107+
``task_type`` is restricted to the closed :class:`TaskType` enum (the
108+
spec's complete set of async/tracked operations). Synchronous-only
109+
operations (e.g. ``get_products``, ``list_creatives``) are not in the
110+
enum because they don't go through the task management system —
111+
passing them would have produced a webhook payload the receiver would
112+
reject as schema-invalid.
104113
105114
Args:
106-
task_id: Unique identifier for the task
107-
status: Current task status
108-
task_type: Optionally type of AdCP operation (e.g., "get_products", "create_media_buy")
109-
timestamp: When the webhook was generated (defaults to current UTC time)
110-
result: Task-specific payload — any Pydantic model or plain dict
111-
operation_id: Client-generated identifier the buyer embedded in the
112-
webhook URL when registering push-notification config. Publishers
113-
MUST echo this back in the payload so buyers correlate notifications
114-
without parsing URL paths (per ``mcp-webhook-payload.json``).
115-
Senders extracting the value from the URL path on emission populate
116-
this field; callers constructing payloads directly pass it through.
117-
message: Human-readable summary of task state
118-
context_id: Session/conversation identifier
119-
domain: AdCP domain this task belongs to
120-
idempotency_key: Sender-generated key stable across retries of the same
121-
event. Defaults to a freshly-generated UUID v4 — callers retrying
122-
delivery of the same event MUST pass the key from their first
123-
attempt; passing None twice mints two keys and defeats dedup.
115+
task_id: Unique identifier for the task.
116+
status: Current task status.
117+
task_type: Type of AdCP async operation (see :class:`TaskType`).
118+
result: Task-specific payload — any Pydantic model or plain dict.
119+
Plain dicts are validated against
120+
:class:`AdcpAsyncResponseData`'s discriminated union.
121+
timestamp: When the webhook was generated. Defaults to current UTC.
122+
operation_id: Client-generated identifier the buyer embedded in
123+
the webhook URL when registering push-notification config.
124+
Publishers MUST echo this back so buyers correlate
125+
notifications without parsing URL paths.
126+
message: Human-readable summary of task state.
127+
context_id: Session/conversation identifier.
128+
domain: AdCP domain this task belongs to.
129+
idempotency_key: Sender-generated key stable across retries of the
130+
same event. Defaults to a freshly-generated UUID v4 — callers
131+
retrying delivery of the same event MUST pass the key from
132+
their first attempt; passing None twice mints two keys and
133+
defeats dedup.
134+
token: Buyer-supplied token from ``push_notification_config.token``,
135+
echoed back per spec for authenticity validation.
124136
125137
Returns:
126-
Dictionary matching McpWebhookPayload schema, ready to be sent as JSON
138+
:class:`McpWebhookPayload` instance. Use :func:`to_wire_dict` (or
139+
``payload.model_dump(mode="json", exclude_none=True)``) to get the
140+
JSON-ready dict for HTTP transport.
127141
128142
Examples:
129143
Create a completed webhook with results:
130-
>>> from adcp.webhooks import create_mcp_webhook_payload
144+
>>> from adcp.webhooks import create_mcp_webhook_payload, to_wire_dict
131145
>>> from adcp.types import GeneratedTaskStatus
132146
>>>
133147
>>> payload = create_mcp_webhook_payload(
134148
... task_id="task_123",
135-
... task_type="get_products",
136149
... status=GeneratedTaskStatus.completed,
137-
... result={"products": [...]},
138-
... message="Found 5 products"
150+
... task_type="create_media_buy",
151+
... result={"media_buy_id": "mb_1", "buyer_ref": "ref_1"},
152+
... message="Created campaign"
139153
... )
154+
>>> wire = to_wire_dict(payload)
140155
141156
Create a failed webhook with error:
142157
>>> payload = create_mcp_webhook_payload(
143158
... task_id="task_456",
144-
... task_type="create_media_buy",
145159
... status=GeneratedTaskStatus.failed,
160+
... task_type="create_media_buy",
146161
... result={"errors": [{"code": "INVALID_INPUT", "message": "..."}]},
147162
... message="Validation failed"
148163
... )
149164
150165
Create a working status update:
151166
>>> payload = create_mcp_webhook_payload(
152167
... task_id="task_789",
153-
... task_type="sync_creatives",
154168
... status=GeneratedTaskStatus.working,
169+
... task_type="sync_creatives",
155170
... message="Processing 3 of 10 creatives"
156171
... )
157172
"""
@@ -160,48 +175,42 @@ def create_mcp_webhook_payload(
160175
if idempotency_key is None:
161176
idempotency_key = generate_webhook_idempotency_key()
162177

163-
# Convert status enum to string value
164178
status_value = status.value if hasattr(status, "value") else str(status)
165179

166-
# Build payload matching McpWebhookPayload schema
167-
payload: dict[str, Any] = {
168-
"idempotency_key": idempotency_key,
169-
"task_id": task_id,
170-
"task_type": task_type,
171-
"status": status_value,
172-
"timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp,
173-
}
174-
175-
# Add optional fields only if provided
176-
if result is not None:
177-
# Convert Pydantic model to dict if needed for JSON serialization
178-
if hasattr(result, "model_dump"):
179-
payload["result"] = result.model_dump(mode="json")
180-
else:
181-
payload["result"] = result
182-
183-
if operation_id is not None:
184-
payload["operation_id"] = operation_id
185-
186-
if message is not None:
187-
payload["message"] = message
188-
189-
if context_id is not None:
190-
payload["context_id"] = context_id
180+
# Foreign BaseModel subclasses (anything outside AdcpAsyncResponseData)
181+
# don't match the discriminated-union variants by identity — dump to a
182+
# dict so the union picks by shape, matching the dict path.
183+
result_value: PydanticBaseModel | dict[str, Any] | None
184+
if isinstance(result, PydanticBaseModel):
185+
result_value = result.model_dump(mode="json")
186+
else:
187+
result_value = result
191188

189+
# `domain` and `token` aren't in the schema but are accepted via
190+
# `extra='allow'`; they round-trip through `model_dump`.
191+
extras: dict[str, Any] = {}
192192
if domain is not None:
193-
payload["domain"] = domain
194-
193+
extras["domain"] = domain
195194
if token is not None:
196195
# Buyer-supplied token from push_notification_config.token,
197196
# echoed back per push-notification-config.json spec text:
198197
# "Echoed back in webhook payload to validate request authenticity."
199-
# Cross-language wire-parity with the JS implementation
200-
# (``buildTaskWebhookPayload`` in ``from-platform.ts``) — buyers
201-
# validating against the spec read body.token, not headers.
202-
payload["token"] = token
198+
extras["token"] = token
203199

204-
return payload
200+
return McpWebhookPayload.model_validate(
201+
{
202+
"idempotency_key": idempotency_key,
203+
"task_id": task_id,
204+
"task_type": task_type,
205+
"status": status_value,
206+
"timestamp": timestamp,
207+
"operation_id": operation_id,
208+
"message": message,
209+
"context_id": context_id,
210+
"result": result_value,
211+
**extras,
212+
}
213+
)
205214

206215

207216
def get_adcp_signed_headers_for_webhook(
@@ -245,9 +254,9 @@ def get_adcp_signed_headers_for_webhook(
245254
>>>
246255
>>> payload = create_mcp_webhook_payload(
247256
... task_id="task_123",
248-
... task_type="get_products",
249257
... status="completed",
250-
... result={"products": [...]}
258+
... task_type="create_media_buy",
259+
... result={"media_buy_id": "mb_1"},
251260
... )
252261
>>> headers = {"Content-Type": "application/json"}
253262
>>> signed_headers = get_adcp_signed_headers_for_webhook(

0 commit comments

Comments
 (0)