Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 85 additions & 10 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
if TYPE_CHECKING:
from concurrent.futures import ThreadPoolExecutor

from adcp.decisioning.media_buy_store import MediaBuyStore
from adcp.decisioning.platform import DecisioningPlatform
from adcp.decisioning.property_list import PropertyListFetcher
from adcp.decisioning.registry import BuyerAgent, BuyerAgentRegistry
Expand Down Expand Up @@ -806,6 +807,28 @@ def _extract_media_buy_id(result: Any) -> str | None:
return str(value)


def _to_store_dict(value: Any) -> Any:
"""Normalize a Pydantic model OR plain dict to a JSON-compatible
dict for the :class:`MediaBuyStore` adopter contract.

The store Protocol uses ``Any`` typing, but the reference impl (and
the JS-side port) expects dict-shaped payloads — ``.get("packages")``,
``["media_buy_id"]``, etc. Adopters writing against the Protocol
benefit from a stable shape regardless of whether their platform
method returned a typed Pydantic response or a hand-built dict.

``mode='json'`` serializes ``AnyUrl`` to ``str`` and ``datetime``
to ISO-8601 ``str`` so the dict matches what would go over the wire
— adopters serializing to JSON for persistence don't have to
re-normalize. ``exclude_none=False`` preserves explicit nulls (the
merge contract treats ``None`` as "clear this field"; dropping them
would silently change semantics).
"""
if hasattr(value, "model_dump"):
return value.model_dump(mode="json", exclude_none=False, by_alias=False)
return value


class PlatformHandler(ADCPHandler[ToolContext]):
"""ADCPHandler subclass that routes wire requests to a
:class:`DecisioningPlatform` via :func:`_invoke_platform_method`.
Expand Down Expand Up @@ -998,6 +1021,7 @@ def __init__(
buyer_agent_registry: BuyerAgentRegistry | None = None,
config_store: ProductConfigStore | None = None,
property_list_fetcher: PropertyListFetcher | None = None,
media_buy_store: MediaBuyStore | None = None,
advertise_all: bool = False,
) -> None:
super().__init__()
Expand All @@ -1012,6 +1036,7 @@ def __init__(
self._buyer_agent_registry = buyer_agent_registry
self._config_store = config_store
self._property_list_fetcher = property_list_fetcher
self._media_buy_store = media_buy_store
self._advertise_all = advertise_all

# Cache whether the platform's create_media_buy accepts 'configs'
Expand Down Expand Up @@ -1643,6 +1668,26 @@ async def _release_reservation_hook(_exc: BaseException) -> None:
on_complete = _finalize_consumption_hook
on_failure = _release_reservation_hook

# MediaBuyStore persist hook — fires on the same on_complete seam
# so both sync and HITL completions persist the per-package
# ``targeting_overlay`` for later echo on ``get_media_buys``.
# Chains with the proposal hook (if any) so both side effects run.
if self._media_buy_store is not None:
prior_on_complete = on_complete
captured_store = self._media_buy_store
captured_account_id = account.id

async def _persist_overlay_hook(create_result: Any) -> None:
if prior_on_complete is not None:
await prior_on_complete(create_result)
await captured_store.persist_from_create(
captured_account_id,
_to_store_dict(params),
_to_store_dict(create_result),
)

on_complete = _persist_overlay_hook

result = await _invoke_platform_method(
self._platform,
"create_media_buy",
Expand Down Expand Up @@ -1679,6 +1724,27 @@ async def update_media_buy( # type: ignore[override]
ctx,
packages=list(getattr(params, "packages", None) or []),
)

# MediaBuyStore merge hook — fires on the on_complete seam so the
# patch is recorded for both sync and HITL completions. Spec:
# ``targeting_overlay`` is echoed from the most recent
# ``create_media_buy`` OR ``update_media_buy`` per the
# ``get-media-buys-response`` schema.
on_complete: Callable[[Any], Awaitable[None]] | None = None
if self._media_buy_store is not None:
captured_store = self._media_buy_store
captured_account_id = account.id
captured_media_buy_id = params.media_buy_id

async def _merge_overlay_hook(_update_result: Any) -> None:
await captured_store.merge_from_update(
captured_account_id,
captured_media_buy_id,
_to_store_dict(params),
)

on_complete = _merge_overlay_hook

result = await _invoke_platform_method(
self._platform,
"update_media_buy",
Expand All @@ -1687,6 +1753,7 @@ async def update_media_buy( # type: ignore[override]
executor=self._executor,
registry=self._registry,
arg_projector={"media_buy_id": params.media_buy_id, "patch": params},
on_complete=on_complete,
)
self._maybe_auto_emit_sync_completion("update_media_buy", params, result)
return cast("UpdateMediaBuySuccessResponse", result)
Expand Down Expand Up @@ -1748,17 +1815,25 @@ async def get_media_buys( # type: ignore[override]
tool_ctx = context or ToolContext()
account = await self._resolve_account(params.account, tool_ctx)
ctx = self._build_ctx(tool_ctx, account)
return cast(
"GetMediaBuysResponse",
await _invoke_platform_method(
self._platform,
"get_media_buys",
params,
ctx,
executor=self._executor,
registry=self._registry,
),
result = await _invoke_platform_method(
self._platform,
"get_media_buys",
params,
ctx,
executor=self._executor,
registry=self._registry,
)
# MediaBuyStore backfill — fill in ``packages[].targeting_overlay``
# from the persisted store for sellers claiming
# ``property-lists`` / ``collection-lists``. Packages the
# platform already echoed are left untouched (the in-memory
# reference impl checks ``pkg.get("targeting_overlay") is not None``
# before injecting). Normalize to dict before the store sees it
# so the adopter contract is consistent regardless of whether the
# platform returned a typed Pydantic response or a dict.
if self._media_buy_store is not None:
result = await self._media_buy_store.backfill(account.id, _to_store_dict(result))
return cast("GetMediaBuysResponse", result)

async def provide_performance_feedback( # type: ignore[override]
self,
Expand Down
17 changes: 17 additions & 0 deletions src/adcp/decisioning/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

if TYPE_CHECKING:
from adcp.decisioning.implementation_config import ProductConfigStore
from adcp.decisioning.media_buy_store import MediaBuyStore
from adcp.decisioning.platform import DecisioningPlatform
from adcp.decisioning.property_list import PropertyListFetcher
from adcp.decisioning.registry import BuyerAgentRegistry
Expand Down Expand Up @@ -87,6 +88,7 @@ def create_adcp_server_from_platform(
buyer_agent_registry: BuyerAgentRegistry | None = None,
config_store: ProductConfigStore | None = None,
property_list_fetcher: PropertyListFetcher | None = None,
media_buy_store: MediaBuyStore | None = None,
advertise_all: bool = False,
validate_at_init: bool = True,
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
Expand Down Expand Up @@ -179,6 +181,20 @@ def create_adcp_server_from_platform(
(avoid duplicate delivery; idempotency-key dedup at the
receiver would handle it but explicit suppression matches the
v5 manual-emit posture for adopters mid-migration).
:param media_buy_store: Opt-in :class:`adcp.decisioning.MediaBuyStore`
wrapper that gates ``targeting_overlay`` echo on the seller's
declared specialisms. Typically built via
:func:`adcp.decisioning.create_media_buy_store` with the seller's
``capabilities`` so the persistence layer only fires for sellers
claiming ``property-lists`` or ``collection-lists``. When wired,
the framework calls ``persist_from_create`` on successful
``create_media_buy`` (via the same on-complete hook the proposal
flow uses, so HITL completions also persist), calls
``merge_from_update`` on successful ``update_media_buy``, and
calls ``backfill`` before returning from ``get_media_buys``.
Default ``None`` — sellers who don't claim the relevant
specialisms or who echo ``targeting_overlay`` themselves omit
this and pay no overhead.
:param advertise_all: Mirror of the same flag on :func:`serve` —
controls how :meth:`PlatformHandler.get_advertised_tools` and
the eventual ``tools/list`` response filter the handler's tool
Expand Down Expand Up @@ -319,6 +335,7 @@ def create_adcp_server_from_platform(
buyer_agent_registry=buyer_agent_registry,
config_store=config_store,
property_list_fetcher=property_list_fetcher,
media_buy_store=media_buy_store,
advertise_all=advertise_all,
)

Expand Down
Loading
Loading