Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 124 additions & 26 deletions src/adcp/decisioning/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ async def _invoke_platform_method(
registry: TaskRegistry,
arg_projector: dict[str, Any] | None = None,
extra_kwargs: dict[str, Any] | None = None,
on_complete: Callable[[Any], Awaitable[None]] | None = None,
on_failure: Callable[[BaseException], Awaitable[None]] | None = None,
) -> Any:
"""Invoke a platform method, projecting hybrid returns.

Expand Down Expand Up @@ -1074,6 +1076,21 @@ async def _invoke_platform_method(
normal ``(params, ctx)`` call, used when the framework injects
framework-computed values (e.g. ``configs=`` from
``ProductConfigStore``). Ignored when ``arg_projector`` is set.

:param on_complete: Optional framework hook invoked with the
adapter's typed return value before the dispatch returns to
the caller. Fired exactly once per call — inline on the sync
return path, or (forwarded to :func:`_project_handoff`) after
the bg task lands on the handoff path. Used by v1.5
create_media_buy to finalize the consumption reservation when
the typed result is available.

:param on_failure: Optional framework hook invoked with the
terminal exception when the adapter raises (sync path) or
when the handoff fn / on_complete raises (handoff path).
Symmetric with ``on_complete``. Used by v1.5 create_media_buy
to release the consumption reservation so the buyer can retry.
Hook errors are logged but never block exception propagation.
"""
method = getattr(platform, method_name)

Expand Down Expand Up @@ -1104,9 +1121,13 @@ async def _invoke_platform_method(
executor,
functools.partial(ctx_snapshot.run, method, params, ctx),
)
except AdcpError:
except AdcpError as exc:
# Adopter raised structured error — propagate verbatim. The
# outer middleware projects to the wire envelope.
# outer middleware projects to the wire envelope. Fire
# on_failure first so the framework can release any reservation
# taken before dispatch (v1.5 create_media_buy lifecycle).
if on_failure is not None:
await _safe_on_failure_call(on_failure, exc, method_name)
raise
except TypeError as exc:
# Most likely an arg_projector or extra_kwargs signature-drift bug.
Expand All @@ -1122,7 +1143,7 @@ async def _invoke_platform_method(
method_name,
sorted(arg_projector.keys()),
)
raise AdcpError(
wrapped = AdcpError(
"INVALID_REQUEST",
message=(
f"Platform method {method_name!r} signature mismatch — "
Expand All @@ -1133,15 +1154,18 @@ async def _invoke_platform_method(
"Protocol class (typically a renamed parameter)."
),
recovery="terminal",
) from exc
)
if on_failure is not None:
await _safe_on_failure_call(on_failure, wrapped, method_name)
raise wrapped from exc
if extra_kwargs is not None:
logger.exception(
"TypeError invoking platform.%s — likely extra_kwargs "
"signature drift (injected kwargs %s vs adopter signature)",
method_name,
sorted(extra_kwargs.keys()),
)
raise AdcpError(
wrapped = AdcpError(
"INVALID_REQUEST",
message=(
f"Platform method {method_name!r} rejected framework-injected "
Expand All @@ -1150,18 +1174,24 @@ async def _invoke_platform_method(
"if you don't need them."
),
recovery="terminal",
) from exc
)
if on_failure is not None:
await _safe_on_failure_call(on_failure, wrapped, method_name)
raise wrapped from exc
# Non-projected TypeError — fall through to generic wrap.
logger.exception(
"Unhandled exception in platform.%s — wrapping to INTERNAL_ERROR",
method_name,
)
raise AdcpError(
wrapped = AdcpError(
"INTERNAL_ERROR",
message=_internal_error_message(method_name, exc),
recovery="terminal",
details=_internal_error_details(exc),
) from exc
)
if on_failure is not None:
await _safe_on_failure_call(on_failure, wrapped, method_name)
raise wrapped from exc
except Exception as exc:
# Wrap unexpected exceptions so the wire never sees a stack
# trace. Adopter logs the original via observability hooks;
Expand All @@ -1178,12 +1208,15 @@ async def _invoke_platform_method(
"Unhandled exception in platform.%s — wrapping to INTERNAL_ERROR",
method_name,
)
raise AdcpError(
wrapped = AdcpError(
"INTERNAL_ERROR",
message=_internal_error_message(method_name, exc),
recovery="terminal",
details=_internal_error_details(exc),
) from exc
)
if on_failure is not None:
await _safe_on_failure_call(on_failure, wrapped, method_name)
raise wrapped from exc

if is_task_handoff(result):
return await _project_handoff(
Expand All @@ -1192,6 +1225,8 @@ async def _invoke_platform_method(
method_name=method_name,
registry=registry,
executor=executor,
on_complete=on_complete,
on_failure=on_failure,
)
if is_workflow_handoff(result):
return await _project_workflow_handoff(
Expand All @@ -1202,6 +1237,19 @@ async def _invoke_platform_method(
executor=executor,
)

# Sync return path. Fire on_complete with the typed result before
# the credential strip + return. If the hook raises, fire on_failure
# and propagate — same single-hook-per-call semantic as the handoff
# path. v1.5 create_media_buy uses on_complete to finalize the
# consumption reservation when the adapter returned inline.
if on_complete is not None:
try:
await on_complete(result)
except BaseException as exc:
if on_failure is not None:
await _safe_on_failure_call(on_failure, exc, method_name)
raise

# Defense-in-depth credential strip on every sync return. The typed
# projections (:func:`to_wire_account` etc.) handle the case where
# the adopter returns the framework's typed dataclasses; this
Expand All @@ -1211,6 +1259,29 @@ async def _invoke_platform_method(
return strip_credentials_from_wire_result(method_name, result)


async def _safe_on_failure_call(
on_failure: Callable[[BaseException], Awaitable[None]],
exc: BaseException,
method_name: str,
) -> None:
"""Fire the framework on_failure hook; log and swallow hook errors.

Hook errors must NEVER block exception propagation — the buyer
needs to see the original adapter failure. Used by both the sync
path in :func:`_invoke_platform_method` and (via the inner
``_fail`` closure) the handoff path in :func:`_project_handoff`.
"""
try:
await on_failure(exc)
except Exception:
logger.exception(
"on_failure hook raised while handling %s for %s — original "
"exception still propagates",
type(exc).__name__,
method_name,
)


async def _project_handoff(
handoff: TaskHandoff[Any],
ctx: RequestContext[Any],
Expand All @@ -1219,6 +1290,7 @@ async def _project_handoff(
registry: TaskRegistry,
executor: ThreadPoolExecutor,
on_complete: Callable[[Any], Awaitable[None]] | None = None,
on_failure: Callable[[BaseException], Awaitable[None]] | None = None,
) -> dict[str, Any]:
"""Promote a TaskHandoff to a background task.

Expand Down Expand Up @@ -1254,11 +1326,22 @@ async def _project_handoff(
the proposal-finalize lifecycle to commit the proposal to
:class:`ProposalStore` exactly once when the HITL approval
lands. If the hook raises, the framework treats it like a
handoff fn failure: ``registry.fail`` is called with the wrapped
error and ``registry.complete`` is NOT called. This is what
gives the v1.5 single-ledger guarantee its teeth — a commit
failure cannot leave the task in 'submitted' forever or land
the proposal in a half-committed state.
handoff fn failure: ``on_failure`` runs (if set), then
``registry.fail`` is called with the wrapped error, and
``registry.complete`` is NOT called. This is what gives the
v1.5 single-ledger guarantee its teeth — a commit failure
cannot leave the task in 'submitted' forever or land the
proposal in a half-committed state.

:param on_failure: Optional framework hook invoked with the
terminal exception when the handoff fn raises OR when
``on_complete`` raises. Used by the v1.5 create_media_buy HITL
path to release the consumption reservation
(``CONSUMING → COMMITTED``) so the buyer can retry without
``PROPOSAL_NOT_COMMITTED`` blocking them. Hook errors are
logged but never block the ``registry.fail`` call — the buyer
needs the failure visible via ``tasks/get`` regardless of
hook outcomes.

The handoff fn is extracted via the type-identity dispatch in
:func:`adcp.decisioning.types.is_task_handoff`. Subclassed
Expand All @@ -1277,6 +1360,22 @@ async def _project_handoff(
# terminal artifact via the registry.
handoff_ctx = TaskHandoffContext(id=task_id, _registry=registry)

async def _fail(exc: AdcpError) -> None:
"""Run the framework's on_failure hook (if set) then
``registry.fail``. Hook errors are logged but never block the
registry.fail — the buyer needs the failure visible via
tasks/get regardless of hook outcomes."""
if on_failure is not None:
try:
await on_failure(exc)
except Exception:
logger.exception(
"on_failure hook raised for task %s — failure is "
"still recorded in the registry",
task_id,
)
await registry.fail(task_id, exc.to_wire())

async def _run() -> None:
try:
if asyncio.iscoroutinefunction(fn):
Expand All @@ -1289,7 +1388,7 @@ async def _run() -> None:
functools.partial(ctx_snapshot.run, fn, handoff_ctx),
)
except AdcpError as exc:
await registry.fail(task_id, exc.to_wire())
await _fail(exc)
return
except Exception as exc:
logger.exception(
Expand All @@ -1305,23 +1404,22 @@ async def _run() -> None:
recovery="terminal",
details=_internal_error_details(exc),
)
await registry.fail(task_id, wrapped.to_wire())
await _fail(wrapped)
return

# Framework completion hook (e.g., proposal_store.commit for
# finalize). Runs with the TYPED result before model_dump so
# the closure can pull typed fields (.expires_at, .proposal)
# finalize, mark_proposal_consumed for create_media_buy). Runs
# with the TYPED result before model_dump so the closure can
# pull typed fields (.expires_at, .proposal, .media_buy_id)
# off it directly. Failures here are treated identically to a
# handoff fn failure: registry.fail, no registry.complete. This
# is the load-bearing seam for the v1.5 single-ledger D3
# guarantee — if the proposal_store.commit raises, the proposal
# stays DRAFT and the task lands in 'failed', so the buyer's
# tasks/get returns the failure rather than a phantom success.
# handoff fn failure: on_failure runs, registry.fail is called,
# registry.complete is NOT called. This is the load-bearing seam
# for the v1.5 single-ledger D3 guarantee.
if on_complete is not None:
try:
await on_complete(result)
except AdcpError as exc:
await registry.fail(task_id, exc.to_wire())
await _fail(exc)
return
except Exception as exc:
logger.exception(
Expand All @@ -1337,7 +1435,7 @@ async def _run() -> None:
recovery="terminal",
details=_internal_error_details(exc),
)
await registry.fail(task_id, wrapped.to_wire())
await _fail(wrapped)
return

# Persist terminal artifact. Pydantic responses get
Expand Down
89 changes: 50 additions & 39 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import inspect
import logging
import warnings
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, ClassVar, cast

from adcp.decisioning._get_products_helpers import _project_product_fields
Expand Down Expand Up @@ -1352,46 +1353,56 @@ async def create_media_buy( # type: ignore[override]
extra: dict[str, Any] | None = (
{"configs": configs} if self._create_media_buy_accepts_configs else None
)
try:
result = await _invoke_platform_method(
self._platform,
"create_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
extra_kwargs=extra,
)
except BaseException:
# Adapter raised — roll back the consumption reservation so
# the buyer can retry. If we leave it in CONSUMING, the next
# create_media_buy(proposal_id=X) call would see
# PROPOSAL_NOT_COMMITTED until the eviction window passes.
# BaseException catches AdcpError + asyncio.CancelledError +
# generic exceptions; the reservation gets released no matter
# how the adapter exits.
if proposal_record is not None:
await release_proposal_reservation(self._platform, proposal_record, ctx)
raise
# Finalize the consumption once create_media_buy returns
# successfully. Idempotent on re-call with the same media_buy_id
# (idempotency_key replays land here too).

# Build the consumption-lifecycle hooks. Used inline on the
# sync return path AND forwarded into _project_handoff on the
# TaskHandoff path — same closure, two firing points. Single
# source of truth for "what to do when create_media_buy lands"
# regardless of whether it lands now or after HITL approval.
on_complete: Callable[[Any], Awaitable[None]] | None = None
on_failure: Callable[[BaseException], Awaitable[None]] | None = None
if proposal_record is not None:
media_buy_id = _extract_media_buy_id(result)
if media_buy_id is not None:
await mark_proposal_consumed(
self._platform,
proposal_record,
media_buy_id=media_buy_id,
ctx=ctx,
)
# else: TaskHandoff path returned a Submitted envelope; the
# proposal stays CONSUMING until the handoff resolves. The
# framework's _project_handoff on_complete hook (wired below
# for v1.5+) finalizes consumption when the bg task lands.
# Until that hook is wired for create_media_buy specifically,
# the proposal will sit in CONSUMING until eviction — flagged
# as a v1.5.1 follow-up.
captured_record = proposal_record
captured_ctx = ctx
captured_platform = self._platform

async def _finalize_consumption_hook(create_result: Any) -> None:
# Idempotent on re-call with the same media_buy_id
# (idempotency_key replays land here too). For the
# handoff path the create_result is the typed
# CreateMediaBuySuccess from the bg task, NOT the
# Submitted envelope — _extract_media_buy_id reads .id
# off either shape.
media_buy_id = _extract_media_buy_id(create_result)
if media_buy_id is not None:
await mark_proposal_consumed(
captured_platform,
captured_record,
media_buy_id=media_buy_id,
ctx=captured_ctx,
)

async def _release_reservation_hook(_exc: BaseException) -> None:
# Adapter raised (sync) OR handoff fn raised (HITL) OR
# finalize_consumption raised: release the reservation
# so the buyer can retry without PROPOSAL_NOT_COMMITTED
# blocking them.
await release_proposal_reservation(captured_platform, captured_record, captured_ctx)

on_complete = _finalize_consumption_hook
on_failure = _release_reservation_hook

result = await _invoke_platform_method(
self._platform,
"create_media_buy",
params,
ctx,
executor=self._executor,
registry=self._registry,
extra_kwargs=extra,
on_complete=on_complete,
on_failure=on_failure,
)
self._maybe_auto_emit_sync_completion("create_media_buy", params, result)
return cast("CreateMediaBuySuccessResponse", result)

Expand Down
Loading
Loading