Skip to content

Commit e246de0

Browse files
committed
feat(decisioning): TaskHandoff create_media_buy on_complete + on_failure hooks
Closes the v1.5.1 follow-up flagged in PR #550. Adopters returning ctx.handoff_to_task(...) from create_media_buy(proposal_id=...) now get the same single-ledger D3 guarantee as the inline path: the proposal transitions CONSUMING → CONSUMED via the framework's on_complete hook when the bg task lands; failure paths release the reservation so the buyer can retry. Previous gap: HITL accept-proposal flows left the proposal in CONSUMING until eviction. No data leak, but reverse-index lookup couldn't hydrate, and a second create_media_buy(proposal_id=X) after a HITL rejection would hit PROPOSAL_NOT_COMMITTED until eviction. Mechanism: * _project_handoff gets on_failure paired with on_complete; inner _fail() helper centralizes 'invoke on_failure (best effort) then registry.fail' across all four failure branches. * _invoke_platform_method forwards both kwargs into _project_handoff on the handoff path; fires on_complete inline before sync return; fires on_failure on every adapter exception path. * handler.py:create_media_buy replaces the local try/except + post-call mark_consumed branches with two closures passed to _invoke_platform_method. Single source of truth for inline + HITL. 3 new tests: handoff happy path, handoff failure releases reservation, end-to-end retry-after-release. All 5 existing finalize-handoff tests still pass. 4014 passed (was 3966 on main + concurrent CI work). Zero regressions. ruff + mypy clean.
1 parent c7c317e commit e246de0

3 files changed

Lines changed: 402 additions & 65 deletions

File tree

src/adcp/decisioning/dispatch.py

Lines changed: 124 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,8 @@ async def _invoke_platform_method(
10431043
registry: TaskRegistry,
10441044
arg_projector: dict[str, Any] | None = None,
10451045
extra_kwargs: dict[str, Any] | None = None,
1046+
on_complete: Callable[[Any], Awaitable[None]] | None = None,
1047+
on_failure: Callable[[BaseException], Awaitable[None]] | None = None,
10461048
) -> Any:
10471049
"""Invoke a platform method, projecting hybrid returns.
10481050
@@ -1074,6 +1076,21 @@ async def _invoke_platform_method(
10741076
normal ``(params, ctx)`` call, used when the framework injects
10751077
framework-computed values (e.g. ``configs=`` from
10761078
``ProductConfigStore``). Ignored when ``arg_projector`` is set.
1079+
1080+
:param on_complete: Optional framework hook invoked with the
1081+
adapter's typed return value before the dispatch returns to
1082+
the caller. Fired exactly once per call — inline on the sync
1083+
return path, or (forwarded to :func:`_project_handoff`) after
1084+
the bg task lands on the handoff path. Used by v1.5
1085+
create_media_buy to finalize the consumption reservation when
1086+
the typed result is available.
1087+
1088+
:param on_failure: Optional framework hook invoked with the
1089+
terminal exception when the adapter raises (sync path) or
1090+
when the handoff fn / on_complete raises (handoff path).
1091+
Symmetric with ``on_complete``. Used by v1.5 create_media_buy
1092+
to release the consumption reservation so the buyer can retry.
1093+
Hook errors are logged but never block exception propagation.
10771094
"""
10781095
method = getattr(platform, method_name)
10791096

@@ -1104,9 +1121,13 @@ async def _invoke_platform_method(
11041121
executor,
11051122
functools.partial(ctx_snapshot.run, method, params, ctx),
11061123
)
1107-
except AdcpError:
1124+
except AdcpError as exc:
11081125
# Adopter raised structured error — propagate verbatim. The
1109-
# outer middleware projects to the wire envelope.
1126+
# outer middleware projects to the wire envelope. Fire
1127+
# on_failure first so the framework can release any reservation
1128+
# taken before dispatch (v1.5 create_media_buy lifecycle).
1129+
if on_failure is not None:
1130+
await _safe_on_failure_call(on_failure, exc, method_name)
11101131
raise
11111132
except TypeError as exc:
11121133
# Most likely an arg_projector or extra_kwargs signature-drift bug.
@@ -1122,7 +1143,7 @@ async def _invoke_platform_method(
11221143
method_name,
11231144
sorted(arg_projector.keys()),
11241145
)
1125-
raise AdcpError(
1146+
wrapped = AdcpError(
11261147
"INVALID_REQUEST",
11271148
message=(
11281149
f"Platform method {method_name!r} signature mismatch — "
@@ -1133,15 +1154,18 @@ async def _invoke_platform_method(
11331154
"Protocol class (typically a renamed parameter)."
11341155
),
11351156
recovery="terminal",
1136-
) from exc
1157+
)
1158+
if on_failure is not None:
1159+
await _safe_on_failure_call(on_failure, wrapped, method_name)
1160+
raise wrapped from exc
11371161
if extra_kwargs is not None:
11381162
logger.exception(
11391163
"TypeError invoking platform.%s — likely extra_kwargs "
11401164
"signature drift (injected kwargs %s vs adopter signature)",
11411165
method_name,
11421166
sorted(extra_kwargs.keys()),
11431167
)
1144-
raise AdcpError(
1168+
wrapped = AdcpError(
11451169
"INVALID_REQUEST",
11461170
message=(
11471171
f"Platform method {method_name!r} rejected framework-injected "
@@ -1150,18 +1174,24 @@ async def _invoke_platform_method(
11501174
"if you don't need them."
11511175
),
11521176
recovery="terminal",
1153-
) from exc
1177+
)
1178+
if on_failure is not None:
1179+
await _safe_on_failure_call(on_failure, wrapped, method_name)
1180+
raise wrapped from exc
11541181
# Non-projected TypeError — fall through to generic wrap.
11551182
logger.exception(
11561183
"Unhandled exception in platform.%s — wrapping to INTERNAL_ERROR",
11571184
method_name,
11581185
)
1159-
raise AdcpError(
1186+
wrapped = AdcpError(
11601187
"INTERNAL_ERROR",
11611188
message=_internal_error_message(method_name, exc),
11621189
recovery="terminal",
11631190
details=_internal_error_details(exc),
1164-
) from exc
1191+
)
1192+
if on_failure is not None:
1193+
await _safe_on_failure_call(on_failure, wrapped, method_name)
1194+
raise wrapped from exc
11651195
except Exception as exc:
11661196
# Wrap unexpected exceptions so the wire never sees a stack
11671197
# trace. Adopter logs the original via observability hooks;
@@ -1178,12 +1208,15 @@ async def _invoke_platform_method(
11781208
"Unhandled exception in platform.%s — wrapping to INTERNAL_ERROR",
11791209
method_name,
11801210
)
1181-
raise AdcpError(
1211+
wrapped = AdcpError(
11821212
"INTERNAL_ERROR",
11831213
message=_internal_error_message(method_name, exc),
11841214
recovery="terminal",
11851215
details=_internal_error_details(exc),
1186-
) from exc
1216+
)
1217+
if on_failure is not None:
1218+
await _safe_on_failure_call(on_failure, wrapped, method_name)
1219+
raise wrapped from exc
11871220

11881221
if is_task_handoff(result):
11891222
return await _project_handoff(
@@ -1192,6 +1225,8 @@ async def _invoke_platform_method(
11921225
method_name=method_name,
11931226
registry=registry,
11941227
executor=executor,
1228+
on_complete=on_complete,
1229+
on_failure=on_failure,
11951230
)
11961231
if is_workflow_handoff(result):
11971232
return await _project_workflow_handoff(
@@ -1202,6 +1237,19 @@ async def _invoke_platform_method(
12021237
executor=executor,
12031238
)
12041239

1240+
# Sync return path. Fire on_complete with the typed result before
1241+
# the credential strip + return. If the hook raises, fire on_failure
1242+
# and propagate — same single-hook-per-call semantic as the handoff
1243+
# path. v1.5 create_media_buy uses on_complete to finalize the
1244+
# consumption reservation when the adapter returned inline.
1245+
if on_complete is not None:
1246+
try:
1247+
await on_complete(result)
1248+
except BaseException as exc:
1249+
if on_failure is not None:
1250+
await _safe_on_failure_call(on_failure, exc, method_name)
1251+
raise
1252+
12051253
# Defense-in-depth credential strip on every sync return. The typed
12061254
# projections (:func:`to_wire_account` etc.) handle the case where
12071255
# the adopter returns the framework's typed dataclasses; this
@@ -1211,6 +1259,29 @@ async def _invoke_platform_method(
12111259
return strip_credentials_from_wire_result(method_name, result)
12121260

12131261

1262+
async def _safe_on_failure_call(
1263+
on_failure: Callable[[BaseException], Awaitable[None]],
1264+
exc: BaseException,
1265+
method_name: str,
1266+
) -> None:
1267+
"""Fire the framework on_failure hook; log and swallow hook errors.
1268+
1269+
Hook errors must NEVER block exception propagation — the buyer
1270+
needs to see the original adapter failure. Used by both the sync
1271+
path in :func:`_invoke_platform_method` and (via the inner
1272+
``_fail`` closure) the handoff path in :func:`_project_handoff`.
1273+
"""
1274+
try:
1275+
await on_failure(exc)
1276+
except Exception:
1277+
logger.exception(
1278+
"on_failure hook raised while handling %s for %s — original "
1279+
"exception still propagates",
1280+
type(exc).__name__,
1281+
method_name,
1282+
)
1283+
1284+
12141285
async def _project_handoff(
12151286
handoff: TaskHandoff[Any],
12161287
ctx: RequestContext[Any],
@@ -1219,6 +1290,7 @@ async def _project_handoff(
12191290
registry: TaskRegistry,
12201291
executor: ThreadPoolExecutor,
12211292
on_complete: Callable[[Any], Awaitable[None]] | None = None,
1293+
on_failure: Callable[[BaseException], Awaitable[None]] | None = None,
12221294
) -> dict[str, Any]:
12231295
"""Promote a TaskHandoff to a background task.
12241296
@@ -1254,11 +1326,22 @@ async def _project_handoff(
12541326
the proposal-finalize lifecycle to commit the proposal to
12551327
:class:`ProposalStore` exactly once when the HITL approval
12561328
lands. If the hook raises, the framework treats it like a
1257-
handoff fn failure: ``registry.fail`` is called with the wrapped
1258-
error and ``registry.complete`` is NOT called. This is what
1259-
gives the v1.5 single-ledger guarantee its teeth — a commit
1260-
failure cannot leave the task in 'submitted' forever or land
1261-
the proposal in a half-committed state.
1329+
handoff fn failure: ``on_failure`` runs (if set), then
1330+
``registry.fail`` is called with the wrapped error, and
1331+
``registry.complete`` is NOT called. This is what gives the
1332+
v1.5 single-ledger guarantee its teeth — a commit failure
1333+
cannot leave the task in 'submitted' forever or land the
1334+
proposal in a half-committed state.
1335+
1336+
:param on_failure: Optional framework hook invoked with the
1337+
terminal exception when the handoff fn raises OR when
1338+
``on_complete`` raises. Used by the v1.5 create_media_buy HITL
1339+
path to release the consumption reservation
1340+
(``CONSUMING → COMMITTED``) so the buyer can retry without
1341+
``PROPOSAL_NOT_COMMITTED`` blocking them. Hook errors are
1342+
logged but never block the ``registry.fail`` call — the buyer
1343+
needs the failure visible via ``tasks/get`` regardless of
1344+
hook outcomes.
12621345
12631346
The handoff fn is extracted via the type-identity dispatch in
12641347
:func:`adcp.decisioning.types.is_task_handoff`. Subclassed
@@ -1277,6 +1360,22 @@ async def _project_handoff(
12771360
# terminal artifact via the registry.
12781361
handoff_ctx = TaskHandoffContext(id=task_id, _registry=registry)
12791362

1363+
async def _fail(exc: AdcpError) -> None:
1364+
"""Run the framework's on_failure hook (if set) then
1365+
``registry.fail``. Hook errors are logged but never block the
1366+
registry.fail — the buyer needs the failure visible via
1367+
tasks/get regardless of hook outcomes."""
1368+
if on_failure is not None:
1369+
try:
1370+
await on_failure(exc)
1371+
except Exception:
1372+
logger.exception(
1373+
"on_failure hook raised for task %s — failure is "
1374+
"still recorded in the registry",
1375+
task_id,
1376+
)
1377+
await registry.fail(task_id, exc.to_wire())
1378+
12801379
async def _run() -> None:
12811380
try:
12821381
if asyncio.iscoroutinefunction(fn):
@@ -1289,7 +1388,7 @@ async def _run() -> None:
12891388
functools.partial(ctx_snapshot.run, fn, handoff_ctx),
12901389
)
12911390
except AdcpError as exc:
1292-
await registry.fail(task_id, exc.to_wire())
1391+
await _fail(exc)
12931392
return
12941393
except Exception as exc:
12951394
logger.exception(
@@ -1305,23 +1404,22 @@ async def _run() -> None:
13051404
recovery="terminal",
13061405
details=_internal_error_details(exc),
13071406
)
1308-
await registry.fail(task_id, wrapped.to_wire())
1407+
await _fail(wrapped)
13091408
return
13101409

13111410
# Framework completion hook (e.g., proposal_store.commit for
1312-
# finalize). Runs with the TYPED result before model_dump so
1313-
# the closure can pull typed fields (.expires_at, .proposal)
1411+
# finalize, mark_proposal_consumed for create_media_buy). Runs
1412+
# with the TYPED result before model_dump so the closure can
1413+
# pull typed fields (.expires_at, .proposal, .media_buy_id)
13141414
# off it directly. Failures here are treated identically to a
1315-
# handoff fn failure: registry.fail, no registry.complete. This
1316-
# is the load-bearing seam for the v1.5 single-ledger D3
1317-
# guarantee — if the proposal_store.commit raises, the proposal
1318-
# stays DRAFT and the task lands in 'failed', so the buyer's
1319-
# tasks/get returns the failure rather than a phantom success.
1415+
# handoff fn failure: on_failure runs, registry.fail is called,
1416+
# registry.complete is NOT called. This is the load-bearing seam
1417+
# for the v1.5 single-ledger D3 guarantee.
13201418
if on_complete is not None:
13211419
try:
13221420
await on_complete(result)
13231421
except AdcpError as exc:
1324-
await registry.fail(task_id, exc.to_wire())
1422+
await _fail(exc)
13251423
return
13261424
except Exception as exc:
13271425
logger.exception(
@@ -1337,7 +1435,7 @@ async def _run() -> None:
13371435
recovery="terminal",
13381436
details=_internal_error_details(exc),
13391437
)
1340-
await registry.fail(task_id, wrapped.to_wire())
1438+
await _fail(wrapped)
13411439
return
13421440

13431441
# Persist terminal artifact. Pydantic responses get

src/adcp/decisioning/handler.py

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import inspect
3636
import logging
3737
import warnings
38+
from collections.abc import Awaitable, Callable
3839
from typing import TYPE_CHECKING, Any, ClassVar, cast
3940

4041
from adcp.decisioning._get_products_helpers import _project_product_fields
@@ -1352,46 +1353,56 @@ async def create_media_buy( # type: ignore[override]
13521353
extra: dict[str, Any] | None = (
13531354
{"configs": configs} if self._create_media_buy_accepts_configs else None
13541355
)
1355-
try:
1356-
result = await _invoke_platform_method(
1357-
self._platform,
1358-
"create_media_buy",
1359-
params,
1360-
ctx,
1361-
executor=self._executor,
1362-
registry=self._registry,
1363-
extra_kwargs=extra,
1364-
)
1365-
except BaseException:
1366-
# Adapter raised — roll back the consumption reservation so
1367-
# the buyer can retry. If we leave it in CONSUMING, the next
1368-
# create_media_buy(proposal_id=X) call would see
1369-
# PROPOSAL_NOT_COMMITTED until the eviction window passes.
1370-
# BaseException catches AdcpError + asyncio.CancelledError +
1371-
# generic exceptions; the reservation gets released no matter
1372-
# how the adapter exits.
1373-
if proposal_record is not None:
1374-
await release_proposal_reservation(self._platform, proposal_record, ctx)
1375-
raise
1376-
# Finalize the consumption once create_media_buy returns
1377-
# successfully. Idempotent on re-call with the same media_buy_id
1378-
# (idempotency_key replays land here too).
1356+
1357+
# Build the consumption-lifecycle hooks. Used inline on the
1358+
# sync return path AND forwarded into _project_handoff on the
1359+
# TaskHandoff path — same closure, two firing points. Single
1360+
# source of truth for "what to do when create_media_buy lands"
1361+
# regardless of whether it lands now or after HITL approval.
1362+
on_complete: Callable[[Any], Awaitable[None]] | None = None
1363+
on_failure: Callable[[BaseException], Awaitable[None]] | None = None
13791364
if proposal_record is not None:
1380-
media_buy_id = _extract_media_buy_id(result)
1381-
if media_buy_id is not None:
1382-
await mark_proposal_consumed(
1383-
self._platform,
1384-
proposal_record,
1385-
media_buy_id=media_buy_id,
1386-
ctx=ctx,
1387-
)
1388-
# else: TaskHandoff path returned a Submitted envelope; the
1389-
# proposal stays CONSUMING until the handoff resolves. The
1390-
# framework's _project_handoff on_complete hook (wired below
1391-
# for v1.5+) finalizes consumption when the bg task lands.
1392-
# Until that hook is wired for create_media_buy specifically,
1393-
# the proposal will sit in CONSUMING until eviction — flagged
1394-
# as a v1.5.1 follow-up.
1365+
captured_record = proposal_record
1366+
captured_ctx = ctx
1367+
captured_platform = self._platform
1368+
1369+
async def _finalize_consumption_hook(create_result: Any) -> None:
1370+
# Idempotent on re-call with the same media_buy_id
1371+
# (idempotency_key replays land here too). For the
1372+
# handoff path the create_result is the typed
1373+
# CreateMediaBuySuccess from the bg task, NOT the
1374+
# Submitted envelope — _extract_media_buy_id reads .id
1375+
# off either shape.
1376+
media_buy_id = _extract_media_buy_id(create_result)
1377+
if media_buy_id is not None:
1378+
await mark_proposal_consumed(
1379+
captured_platform,
1380+
captured_record,
1381+
media_buy_id=media_buy_id,
1382+
ctx=captured_ctx,
1383+
)
1384+
1385+
async def _release_reservation_hook(_exc: BaseException) -> None:
1386+
# Adapter raised (sync) OR handoff fn raised (HITL) OR
1387+
# finalize_consumption raised: release the reservation
1388+
# so the buyer can retry without PROPOSAL_NOT_COMMITTED
1389+
# blocking them.
1390+
await release_proposal_reservation(captured_platform, captured_record, captured_ctx)
1391+
1392+
on_complete = _finalize_consumption_hook
1393+
on_failure = _release_reservation_hook
1394+
1395+
result = await _invoke_platform_method(
1396+
self._platform,
1397+
"create_media_buy",
1398+
params,
1399+
ctx,
1400+
executor=self._executor,
1401+
registry=self._registry,
1402+
extra_kwargs=extra,
1403+
on_complete=on_complete,
1404+
on_failure=on_failure,
1405+
)
13951406
self._maybe_auto_emit_sync_completion("create_media_buy", params, result)
13961407
return cast("CreateMediaBuySuccessResponse", result)
13971408

0 commit comments

Comments
 (0)