Skip to content

Commit 73a7512

Browse files
bokelleyclaude
andauthored
feat(decisioning): WebhookDeliverySupervisor + SQLAlchemy A2A stores example (#348)
* feat(decisioning): add WebhookDeliverySupervisor + SQLAlchemy A2A stores example Round-2 P1 reliability layer for F12 sync-completion webhooks. The existing WebhookSender is the transport (HTTP-Signatures POST, one attempt, no retry); production sellers wrap it with retry, circuit breaker, and per-attempt audit. This adds the SDK-side seam. Tracks adopter feedback that salesagent's webhook reliability layer (~1,041 LOC across webhook_delivery_service.py + protocol_webhook_service.py) has no SDK home. After this PR, those LOC compose against the Protocol seam instead of being adopter-rolled. Components in src/adcp/webhook_supervisor.py: * WebhookDeliverySupervisor Protocol — async send_mcp surface, Protocol-conformant; adopters with infra-side retry (Celery, Kafka, durable outbox) implement against their queue. * InMemoryWebhookDeliverySupervisor reference impl — wraps a WebhookSender with per-endpoint CircuitBreaker (CLOSED/OPEN/HALF_OPEN state machine, 5-failure threshold, 60s recovery), exponential- backoff RetryPolicy with jitter, per-sequence_key monotonic counter for delivery-report sequence numbers, optional DeliveryLogSink Protocol for BYO persistence. * DeliveryAttempt frozen dataclass — one record per attempt (success / failure / circuit_open) for audit-log persistence. * DeliveryLogSink Protocol — adopters wire sinks to their existing webhook_delivery_log tables; sink failures swallowed (broken sink must not cascade into delivery loss). Wire-through: * PlatformHandler accepts webhook_supervisor=; F12 auto-emit routes through supervisor when configured, falls back to bare sender otherwise. Backward-compat: existing webhook_sender= path unchanged. * serve() and create_adcp_server_from_platform() forward the new param. Boot-time validate_webhook_sender_for_platform now accepts either a sender or a supervisor (changed missing-sender error code from "webhook_sender" to "webhook_sender_or_supervisor"). A2A stores example: examples/a2a_sqlalchemy_tasks.py. Companion to the SQLite reference (examples/a2a_db_tasks.py). Same Protocol surface (TaskStore + PushNotificationConfigStore) backed by SQLAlchemy ORM so the same code runs against any backend SQLA supports — SQLite for the demo, Postgres / MySQL in production. Salesagent and other SQLAlchemy-based sellers wrap their existing models behind the Protocols. Tests: 22 new in test_webhook_supervisor.py (retry math, circuit breaker state machine, supervisor success/retry/exception paths, sink failure isolation, F12 wire-through, boot-validation). Existing F12 + serve tests updated for the supervisor parameter. 79 affected tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(decisioning): address expert review on PR #348 — idempotency, breaker_key, sink timeout Ten findings from four expert reviews on the WebhookDeliverySupervisor: **Critical (spec compliance):** - Idempotency-key reuse on retry — spec requires reusing the same key on every retry. Refactored to use sender.resend(last_result) for attempts 2+; only attempt 1 (or after exception with no result) calls send_mcp fresh. **High:** - Cross-tenant breaker collision — added optional breaker_key parameter; multi-tenant sellers scope via f"{tenant_id}:{url}". - Sink unbounded on hot path — wrapped in asyncio.wait_for with configurable RetryPolicy.sink_timeout_seconds (default 5s). - Sequence number burned on circuit-open — allocated only after can_attempt() returns True. - InMemoryWebhookDeliverySupervisor.__init__ now raises ValueError when sender is None (preserves F12 boot fail-fast). - UTC = timezone.utc moved below all imports for ruff isort compliance. **Medium:** - response_time_ms switched from datetime deltas to time.monotonic(). - record_success while OPEN now warns + transitions to HALF_OPEN with success_count=1 (was silently flipping to CLOSED). - DeliveryAttempt.notification_type new optional field for delivery reports parity with salesagent's WebhookDeliveryLog. - sequence_key docstring clarified per-stream recommendation (f"{media_buy_id}:{url}"). Tests: 30 (up from 22). 8 new tests cover idempotency-key reuse via resend, breaker_key tenant isolation, sequence-number no-burn on circuit-open, sink timeout, notification_type passthrough, monotonic clock, init-time None-sender rejection, sender-only backward-compat. 2,990 total tests pass. ruff + mypy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a016eca commit 73a7512

8 files changed

Lines changed: 1916 additions & 28 deletions

File tree

examples/a2a_sqlalchemy_tasks.py

Lines changed: 434 additions & 0 deletions
Large diffs are not rendered by default.

src/adcp/decisioning/handler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
from adcp.decisioning.task_registry import TaskRegistry
149149
from adcp.decisioning.types import Account
150150
from adcp.webhook_sender import WebhookSender
151+
from adcp.webhook_supervisor import WebhookDeliverySupervisor
151152

152153

153154
# ---------------------------------------------------------------------------
@@ -465,6 +466,7 @@ def __init__(
465466
state_reader: StateReader | None = None,
466467
resource_resolver: ResourceResolver | None = None,
467468
webhook_sender: WebhookSender | None = None,
469+
webhook_supervisor: WebhookDeliverySupervisor | None = None,
468470
auto_emit_completion_webhooks: bool = True,
469471
) -> None:
470472
super().__init__()
@@ -474,6 +476,7 @@ def __init__(
474476
self._state_reader = state_reader
475477
self._resource_resolver = resource_resolver
476478
self._webhook_sender = webhook_sender
479+
self._webhook_supervisor = webhook_supervisor
477480
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks
478481

479482
# ----- account resolution helper -----
@@ -569,6 +572,7 @@ def _maybe_auto_emit_sync_completion(
569572
return
570573
maybe_emit_sync_completion(
571574
sender=self._webhook_sender,
575+
supervisor=self._webhook_supervisor,
572576
enabled=self._auto_emit_completion_webhooks,
573577
method_name=method_name,
574578
params=params,

src/adcp/decisioning/serve.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from adcp.decisioning.state import StateReader
4545
from adcp.decisioning.task_registry import TaskRegistry
4646
from adcp.webhook_sender import WebhookSender
47+
from adcp.webhook_supervisor import WebhookDeliverySupervisor
4748

4849

4950
def _is_production_env() -> bool:
@@ -77,6 +78,7 @@ def create_adcp_server_from_platform(
7778
state_reader: StateReader | None = None,
7879
resource_resolver: ResourceResolver | None = None,
7980
webhook_sender: WebhookSender | None = None,
81+
webhook_supervisor: WebhookDeliverySupervisor | None = None,
8082
auto_emit_completion_webhooks: bool = True,
8183
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
8284
"""Build the :class:`PlatformHandler` + supporting wiring from a
@@ -122,11 +124,22 @@ def create_adcp_server_from_platform(
122124
v6.1).
123125
:param webhook_sender: Bring-your-own
124126
:class:`adcp.webhook_sender.WebhookSender` for sync-completion
125-
and HITL-completion webhook delivery. Default ``None`` — when
126-
unset, sync-completion auto-emit is a silent no-op (no URL to
127-
deliver to, framework can't synthesize a sender). Adopters
128-
wiring webhook delivery pass a configured sender (with their
129-
signing key, IP-pinned transport, etc.).
127+
and HITL-completion webhook delivery. Default ``None``. The
128+
sender is the *transport* — one HTTP-Signatures POST per call,
129+
no retry, no breaker. Production sellers typically wrap the
130+
sender in a :class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor`
131+
and pass that via ``webhook_supervisor=`` instead.
132+
:param webhook_supervisor: Bring-your-own
133+
:class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` for
134+
reliable delivery (retry, circuit breaker, attempt audit). When
135+
passed, the F12 auto-emit path routes through it instead of
136+
``webhook_sender``. The reference
137+
:class:`~adcp.webhook_supervisor.InMemoryWebhookDeliverySupervisor`
138+
wraps a sender; adopters with infra-side retry (Celery, Kafka,
139+
durable outbox) implement the Protocol against their queue.
140+
Mutually optional with ``webhook_sender``; passing both is
141+
valid (supervisor wins for auto-emit, sender remains available
142+
for direct calls inside platform methods).
130143
:param auto_emit_completion_webhooks: F12 feature gate. When
131144
``True`` (default), the framework auto-fires a completion
132145
webhook on the sync-success arm of mutating tools whenever the
@@ -235,6 +248,7 @@ def create_adcp_server_from_platform(
235248
state_reader=state_reader,
236249
resource_resolver=resource_resolver,
237250
webhook_sender=webhook_sender,
251+
webhook_supervisor=webhook_supervisor,
238252
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
239253
)
240254

@@ -255,6 +269,7 @@ def create_adcp_server_from_platform(
255269
validate_webhook_sender_for_platform(
256270
advertised_tools=handler.advertised_tools_for_instance(),
257271
sender=webhook_sender,
272+
supervisor=webhook_supervisor,
258273
auto_emit=auto_emit_completion_webhooks,
259274
)
260275

@@ -271,6 +286,7 @@ def serve(
271286
state_reader: StateReader | None = None,
272287
resource_resolver: ResourceResolver | None = None,
273288
webhook_sender: WebhookSender | None = None,
289+
webhook_supervisor: WebhookDeliverySupervisor | None = None,
274290
auto_emit_completion_webhooks: bool = True,
275291
advertise_all: bool = False,
276292
**serve_kwargs: Any,
@@ -294,7 +310,15 @@ def serve(
294310
:param resource_resolver: Custom :class:`ResourceResolver` impl (D15).
295311
:param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender`
296312
for completion webhook delivery (sync auto-emit + HITL terminal).
297-
``None`` disables auto-emit silently.
313+
Transport only — one attempt, no retry. ``None`` disables
314+
auto-emit silently.
315+
:param webhook_supervisor: BYO
316+
:class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` for
317+
reliable delivery (retry, circuit breaker, attempt audit).
318+
Takes precedence over ``webhook_sender`` for F12 auto-emit
319+
when both are passed. Production sellers typically pass an
320+
:class:`~adcp.webhook_supervisor.InMemoryWebhookDeliverySupervisor`
321+
wrapping their sender.
298322
:param auto_emit_completion_webhooks: F12 — auto-fire a completion
299323
webhook on the sync-success arm of mutating tools when the
300324
request supplied ``push_notification_config.url``. Default
@@ -322,6 +346,7 @@ def serve(
322346
state_reader=state_reader,
323347
resource_resolver=resource_resolver,
324348
webhook_sender=webhook_sender,
349+
webhook_supervisor=webhook_supervisor,
325350
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
326351
)
327352

src/adcp/decisioning/webhook_emit.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747

4848
if TYPE_CHECKING:
4949
from adcp.webhook_sender import WebhookSender
50+
from adcp.webhook_supervisor import WebhookDeliverySupervisor
51+
52+
DeliveryTarget = WebhookSender | WebhookDeliverySupervisor
5053

5154
logger = logging.getLogger(__name__)
5255

@@ -125,7 +128,7 @@ def _extract_push_notification_url_and_token(
125128

126129
async def _emit_sync_completion_webhook(
127130
*,
128-
sender: WebhookSender,
131+
target: DeliveryTarget,
129132
url: str,
130133
token: str | None,
131134
method_name: str,
@@ -139,10 +142,15 @@ async def _emit_sync_completion_webhook(
139142
correlate primarily via resource ids on ``result``
140143
(``media_buy_id``, ``creative_id``, etc.); ``task_id`` here is
141144
informational for the spec's required-field shape.
145+
146+
``target`` is either a bare :class:`WebhookSender` (one attempt,
147+
no breaker) or a :class:`WebhookDeliverySupervisor` (retry, breaker,
148+
optional delivery log). Both expose ``send_mcp(...)`` with
149+
compatible kwargs; the call site is polymorphic.
142150
"""
143151
task_id = f"sync-{uuid.uuid4().hex[:16]}"
144152
try:
145-
await sender.send_mcp(
153+
await target.send_mcp(
146154
url=url,
147155
task_id=task_id,
148156
status="completed",
@@ -169,6 +177,7 @@ def maybe_emit_sync_completion(
169177
method_name: str,
170178
params: Any,
171179
result: Any,
180+
supervisor: WebhookDeliverySupervisor | None = None,
172181
) -> None:
173182
"""Fire-and-forget auto-emit gate. Called by handler shims after
174183
the sync-success arm of mutating tools.
@@ -217,7 +226,8 @@ def maybe_emit_sync_completion(
217226
if config is None:
218227
return # buyer didn't register — nothing to do
219228

220-
if sender is None:
229+
target = supervisor or sender
230+
if target is None:
221231
# Buyer registered a webhook config but the adopter didn't
222232
# wire a sender. Without this branch, the buyer's
223233
# notification quietly disappears — they think they
@@ -237,8 +247,8 @@ def maybe_emit_sync_completion(
237247
logger.warning(
238248
"[adcp.decisioning] buyer registered "
239249
"push_notification_config (url=%s) for %s but auto-emit "
240-
"webhook_sender is None — webhook silently dropped. "
241-
"Pass webhook_sender to "
250+
"has neither webhook_sender nor webhook_supervisor — "
251+
"webhook silently dropped. Pass one to "
242252
"adcp.decisioning.serve.create_adcp_server_from_platform, "
243253
"or set auto_emit_completion_webhooks=False to silence "
244254
"this warning.",
@@ -279,7 +289,7 @@ def maybe_emit_sync_completion(
279289
return
280290
bg = loop.create_task(
281291
_emit_sync_completion_webhook(
282-
sender=sender,
292+
target=target,
283293
url=url,
284294
token=token,
285295
method_name=method_name,
@@ -307,16 +317,17 @@ def validate_webhook_sender_for_platform(
307317
advertised_tools: frozenset[str] | set[str],
308318
sender: Any,
309319
auto_emit: bool,
320+
supervisor: Any = None,
310321
) -> None:
311322
"""Server-boot fail-fast for the F12 misconfig (Emma sales-direct
312323
P0 root cause).
313324
314325
When an adopter claims a specialism whose tool surface includes
315326
any spec-eligible webhook task type (e.g., ``create_media_buy``,
316327
``activate_signal``, ``acquire_rights``) AND auto-emit is on AND
317-
no ``webhook_sender`` is wired, every buyer who registers
318-
``push_notification_config.url`` would have their notification
319-
silently dropped. The runtime gate at
328+
neither ``webhook_sender`` nor ``webhook_supervisor`` is wired,
329+
every buyer who registers ``push_notification_config.url`` would
330+
have their notification silently dropped. The runtime gate at
320331
:func:`maybe_emit_sync_completion` warns on the FIRST call, but
321332
by then the buyer has already burned a request and the adopter
322333
has shipped without webhook wiring.
@@ -335,7 +346,7 @@ def validate_webhook_sender_for_platform(
335346
"""
336347
if not auto_emit:
337348
return
338-
if sender is not None:
349+
if sender is not None or supervisor is not None:
339350
return
340351
eligible = SPEC_WEBHOOK_TASK_TYPES & set(advertised_tools)
341352
if not eligible:
@@ -347,18 +358,19 @@ def validate_webhook_sender_for_platform(
347358
message=(
348359
"auto_emit_completion_webhooks is enabled and the platform's "
349360
"claimed specialisms expose webhook-eligible tools "
350-
f"{sorted(eligible)!r}, but no webhook_sender was wired. "
351-
"Buyers who register push_notification_config.url on these "
352-
"tools would have their notifications silently dropped. "
353-
"Either pass a configured WebhookSender via "
354-
"adcp.decisioning.serve.create_adcp_server_from_platform("
355-
"..., webhook_sender=...), or set "
356-
"auto_emit_completion_webhooks=False if you handle webhooks "
357-
"manually inside your platform methods."
361+
f"{sorted(eligible)!r}, but neither webhook_sender nor "
362+
"webhook_supervisor was wired. Buyers who register "
363+
"push_notification_config.url on these tools would have their "
364+
"notifications silently dropped. Pass a configured "
365+
"WebhookSender (transport only) or InMemoryWebhookDeliverySupervisor "
366+
"(retry + circuit breaker) to "
367+
"adcp.decisioning.serve.create_adcp_server_from_platform, "
368+
"or set auto_emit_completion_webhooks=False if you handle "
369+
"webhooks manually inside your platform methods."
358370
),
359371
recovery="terminal",
360372
details={
361-
"missing": "webhook_sender",
373+
"missing": "webhook_sender_or_supervisor",
362374
"webhook_eligible_tools": sorted(eligible),
363375
},
364376
)

0 commit comments

Comments
 (0)