Skip to content

Commit 59f3b6e

Browse files
bokelleyclaude
andauthored
feat(decisioning): ProductConfigStore lookup helper for create_media_buy (#498)
* feat(decisioning): ProductConfigStore lookup helper for create_media_buy Adds a pluggable ProductConfigStore Protocol that the framework calls before invoking SalesPlatform.create_media_buy, injecting seller-side per-product implementation configs as a configs= kwarg. Adopters who don't wire a store or don't declare configs in their signature see no change (fully backward-compatible). Closes #497. https://claude.ai/code/session_01WS8EChCkSo6diBNgbXBMM1 * fix(decisioning): stable product_id ordering, extra_kwargs TypeError diagnostic, test warning scope - dict.fromkeys() instead of set comprehension in handler.py for stable insertion-order deduplication of product_ids before store lookup - targeted TypeError diagnostic branch in dispatch.py so extra_kwargs signature drift surfaces as INVALID_REQUEST rather than INTERNAL_ERROR - move warnings.catch_warnings() to wrap _make_handler() construction in test_no_store_wired_adopter_gets_empty_configs (warning fires at init, not at call time) https://claude.ai/code/session_01WS8EChCkSo6diBNgbXBMM1 --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 69c585c commit 59f3b6e

7 files changed

Lines changed: 506 additions & 4 deletions

File tree

src/adcp/decisioning/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def create_media_buy(
9696
ValidationError,
9797
)
9898
from adcp.decisioning.helpers import ref_account_id
99+
from adcp.decisioning.implementation_config import ProductConfigStore
99100
from adcp.decisioning.media_buy_store import (
100101
MediaBuyStore,
101102
create_media_buy_store,
@@ -298,6 +299,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
298299
"Proposal",
299300
"PropertyList",
300301
"PropertyListReference",
302+
"ProductConfigStore",
301303
"PropertyListsPlatform",
302304
"RateLimitedBuyerAgentRegistry",
303305
"RateLimitedError",

src/adcp/decisioning/dispatch.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,7 @@ async def _invoke_platform_method(
10401040
executor: ThreadPoolExecutor,
10411041
registry: TaskRegistry,
10421042
arg_projector: dict[str, Any] | None = None,
1043+
extra_kwargs: dict[str, Any] | None = None,
10431044
) -> Any:
10441045
"""Invoke a platform method, projecting hybrid returns.
10451046
@@ -1064,15 +1065,22 @@ async def _invoke_platform_method(
10641065
:param arg_projector: Optional kwargs dict for tools whose Python
10651066
method signature differs from the wire shape (D1
10661067
arg-projection, e.g. ``update_media_buy(media_buy_id, patch,
1067-
ctx)``). Codegen-emitted shims pass this for those tools;
1068-
most tools call with ``None``.
1068+
ctx)``). Replaces the default positional ``(params, ctx)``
1069+
call entirely. Codegen-emitted shims pass this for those
1070+
tools; most tools call with ``None``.
1071+
:param extra_kwargs: Optional additional kwargs appended to the
1072+
normal ``(params, ctx)`` call, used when the framework injects
1073+
framework-computed values (e.g. ``configs=`` from
1074+
``ProductConfigStore``). Ignored when ``arg_projector`` is set.
10691075
"""
10701076
method = getattr(platform, method_name)
10711077

10721078
try:
10731079
if asyncio.iscoroutinefunction(method):
10741080
if arg_projector is not None:
10751081
result = await method(**arg_projector, ctx=ctx)
1082+
elif extra_kwargs:
1083+
result = await method(params, ctx, **extra_kwargs)
10761084
else:
10771085
result = await method(params, ctx)
10781086
else:
@@ -1084,6 +1092,11 @@ async def _invoke_platform_method(
10841092
executor,
10851093
functools.partial(ctx_snapshot.run, method, **projected_kwargs),
10861094
)
1095+
elif extra_kwargs:
1096+
result = await loop.run_in_executor(
1097+
executor,
1098+
functools.partial(ctx_snapshot.run, method, params, ctx, **extra_kwargs),
1099+
)
10871100
else:
10881101
result = await loop.run_in_executor(
10891102
executor,
@@ -1094,8 +1107,7 @@ async def _invoke_platform_method(
10941107
# outer middleware projects to the wire envelope.
10951108
raise
10961109
except TypeError as exc:
1097-
# Most likely an arg_projector signature-drift bug — adopter
1098-
# renamed update_media_buy's `patch` kwarg → `update`, etc.
1110+
# Most likely an arg_projector or extra_kwargs signature-drift bug.
10991111
# Bare INTERNAL_ERROR would hide the cause; project to
11001112
# INVALID_REQUEST with a hint pointing at the adopter's
11011113
# method signature so they fix it without a server-log dive.
@@ -1120,6 +1132,23 @@ async def _invoke_platform_method(
11201132
),
11211133
recovery="terminal",
11221134
) from exc
1135+
if extra_kwargs is not None:
1136+
logger.exception(
1137+
"TypeError invoking platform.%s — likely extra_kwargs "
1138+
"signature drift (injected kwargs %s vs adopter signature)",
1139+
method_name,
1140+
sorted(extra_kwargs.keys()),
1141+
)
1142+
raise AdcpError(
1143+
"INVALID_REQUEST",
1144+
message=(
1145+
f"Platform method {method_name!r} rejected framework-injected "
1146+
f"kwargs {sorted(extra_kwargs.keys())!r}. Declare the matching "
1147+
"parameter(s) in your platform method signature, or remove them "
1148+
"if you don't need them."
1149+
),
1150+
recovery="terminal",
1151+
) from exc
11231152
# Non-projected TypeError — fall through to generic wrap.
11241153
logger.exception(
11251154
"Unhandled exception in platform.%s — wrapping to INTERNAL_ERROR",

src/adcp/decisioning/handler.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
from __future__ import annotations
3333

3434
import asyncio
35+
import inspect
36+
import logging
37+
import warnings
3538
from typing import TYPE_CHECKING, Any, ClassVar, cast
3639

3740
from adcp.decisioning._get_products_helpers import _project_product_fields
@@ -40,9 +43,12 @@
4043
_build_request_context,
4144
_invoke_platform_method,
4245
)
46+
from adcp.decisioning.implementation_config import ProductConfigStore
4347
from adcp.decisioning.webhook_emit import maybe_emit_sync_completion
4448
from adcp.server.base import ADCPHandler, ToolContext
4549

50+
logger = logging.getLogger(__name__)
51+
4652
# Pydantic Request/Response types are imported at module scope (NOT
4753
# under TYPE_CHECKING) so that ``typing.get_type_hints(method)`` can
4854
# resolve every shim's ``params`` annotation at runtime. The dispatcher
@@ -575,6 +581,18 @@ def _project_sync_audiences(result: Any) -> Any:
575581
return result
576582

577583

584+
def _method_accepts_configs(platform: Any, method_name: str) -> bool:
585+
"""Return True when the platform's ``method_name`` declares a ``configs`` parameter."""
586+
method = getattr(platform, method_name, None)
587+
if method is None:
588+
return False
589+
try:
590+
sig = inspect.signature(method)
591+
return "configs" in sig.parameters
592+
except (ValueError, TypeError):
593+
return False
594+
595+
578596
class PlatformHandler(ADCPHandler[ToolContext]):
579597
"""ADCPHandler subclass that routes wire requests to a
580598
:class:`DecisioningPlatform` via :func:`_invoke_platform_method`.
@@ -676,6 +694,7 @@ def __init__(
676694
webhook_supervisor: WebhookDeliverySupervisor | None = None,
677695
auto_emit_completion_webhooks: bool = True,
678696
buyer_agent_registry: BuyerAgentRegistry | None = None,
697+
config_store: ProductConfigStore | None = None,
679698
) -> None:
680699
super().__init__()
681700
self._platform = platform
@@ -687,6 +706,23 @@ def __init__(
687706
self._webhook_supervisor = webhook_supervisor
688707
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks
689708
self._buyer_agent_registry = buyer_agent_registry
709+
self._config_store = config_store
710+
711+
# Cache whether the platform's create_media_buy accepts 'configs'
712+
# so we only pay the inspect.signature cost at construction time.
713+
self._create_media_buy_accepts_configs = _method_accepts_configs(
714+
platform, "create_media_buy"
715+
)
716+
if config_store is None and self._create_media_buy_accepts_configs:
717+
warnings.warn(
718+
"create_media_buy declares a 'configs' parameter but no "
719+
"ProductConfigStore was wired — the framework will inject "
720+
"configs={} (empty dict) on every call. Wire a store via "
721+
"config_store= in create_adcp_server_from_platform to enable "
722+
"automatic implementation_config lookup.",
723+
UserWarning,
724+
stacklevel=2,
725+
)
690726

691727
# ----- account resolution helper -----
692728

@@ -1048,16 +1084,50 @@ async def create_media_buy( # type: ignore[override]
10481084
params: CreateMediaBuyRequest,
10491085
context: ToolContext | None = None,
10501086
) -> CreateMediaBuySuccessResponse:
1087+
from adcp.decisioning.types import AdcpError
1088+
10511089
tool_ctx = context or ToolContext()
10521090
account = await self._resolve_account(params.account, tool_ctx)
10531091
ctx = self._build_ctx(tool_ctx, account)
1092+
1093+
configs: dict[str, Any] = {}
1094+
if self._config_store is not None:
1095+
# proposal_id flows have packages=None — skip lookup, inject {}
1096+
if params.packages:
1097+
product_ids = list(dict.fromkeys(p.product_id for p in params.packages))
1098+
try:
1099+
configs = await self._config_store.lookup_implementation_configs(
1100+
product_ids, ctx
1101+
)
1102+
except AdcpError:
1103+
raise
1104+
except Exception as exc:
1105+
logger.exception(
1106+
"[adcp.decisioning] ProductConfigStore.lookup_implementation_configs "
1107+
"raised for create_media_buy — translating to SERVICE_UNAVAILABLE"
1108+
)
1109+
raise AdcpError(
1110+
"SERVICE_UNAVAILABLE",
1111+
message=(
1112+
"implementation_config lookup failed for "
1113+
f"{len(product_ids)} product(s). Retry the request; "
1114+
"if the problem persists contact the seller."
1115+
),
1116+
recovery="transient",
1117+
details={"caused_by": {"type": type(exc).__name__}},
1118+
) from exc
1119+
1120+
extra: dict[str, Any] | None = (
1121+
{"configs": configs} if self._create_media_buy_accepts_configs else None
1122+
)
10541123
result = await _invoke_platform_method(
10551124
self._platform,
10561125
"create_media_buy",
10571126
params,
10581127
ctx,
10591128
executor=self._executor,
10601129
registry=self._registry,
1130+
extra_kwargs=extra,
10611131
)
10621132
self._maybe_auto_emit_sync_completion("create_media_buy", params, result)
10631133
return cast("CreateMediaBuySuccessResponse", result)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""ProductConfigStore — pluggable implementation_config lookup for create_media_buy.
2+
3+
Seller adapters (GAM, Kevel, Xandr, Prebid Server, …) attach seller-side
4+
configuration to each ``Product`` under ``Product.ext.implementation_config``
5+
by convention. Every adopter that implements ``create_media_buy`` writes the
6+
same boilerplate: pull the packages off the request, collect the product_ids,
7+
look up the per-product config from a store, hand it to the adapter alongside
8+
the wire request.
9+
10+
This module lifts that loop into the framework. The adopter wires a
11+
``ProductConfigStore`` once at server construction; the framework calls
12+
``lookup_implementation_configs`` before invoking the platform method and
13+
injects the result as ``configs=`` kwarg. Adopters who declare ``configs`` in
14+
their ``create_media_buy`` signature receive the resolved dict automatically.
15+
Adopters who don't wire a store receive ``configs={}`` (current behaviour,
16+
fully backward-compatible).
17+
18+
Reference pattern: ``adcp.decisioning.webhook_emit`` — same "framework
19+
intercepts at the handler seam before the platform method fires" design.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
25+
26+
if TYPE_CHECKING:
27+
from adcp.decisioning.context import RequestContext
28+
29+
30+
@runtime_checkable
31+
class ProductConfigStore(Protocol):
32+
"""Adopter-supplied lookup for seller-side product configuration.
33+
34+
The framework calls ``lookup_implementation_configs`` once per
35+
``create_media_buy`` request, before invoking the platform method.
36+
Adopters return a dict keyed by ``product_id``; the framework injects
37+
it as ``configs=`` on the platform method call.
38+
39+
If the store returns a partial dict (some product_ids missing), the
40+
adopter handles it — the framework does not fabricate entries for missing
41+
ids. If the store raises, the framework surfaces
42+
``SERVICE_UNAVAILABLE`` with ``recovery='transient'`` to the buyer.
43+
44+
The ``ctx`` parameter carries the resolved ``RequestContext``, including
45+
``ctx.account`` for multi-tenant stores that need tenant isolation.
46+
"""
47+
48+
async def lookup_implementation_configs(
49+
self,
50+
product_ids: list[str],
51+
ctx: RequestContext[Any],
52+
) -> dict[str, dict[str, Any]]:
53+
"""Return seller-side config keyed by product_id.
54+
55+
:param product_ids: Deduplicated list of product ids from
56+
``CreateMediaBuyRequest.packages[*].product_id``. Empty when
57+
the request uses ``proposal_id`` without an explicit
58+
``packages`` array.
59+
:param ctx: Resolved request context — use ``ctx.account`` for
60+
tenant scoping.
61+
:returns: Dict mapping each product_id to its config dict. Missing
62+
keys mean "no config for this product"; the framework passes the
63+
partial dict to the adopter unchanged.
64+
"""
65+
...
66+
67+
68+
__all__ = ["ProductConfigStore"]

src/adcp/decisioning/serve.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from adcp.decisioning.types import AdcpError
4141

4242
if TYPE_CHECKING:
43+
from adcp.decisioning.implementation_config import ProductConfigStore
4344
from adcp.decisioning.platform import DecisioningPlatform
4445
from adcp.decisioning.registry import BuyerAgentRegistry
4546
from adcp.decisioning.resolve import ResourceResolver
@@ -83,6 +84,7 @@ def create_adcp_server_from_platform(
8384
webhook_supervisor: WebhookDeliverySupervisor | None = None,
8485
auto_emit_completion_webhooks: bool = True,
8586
buyer_agent_registry: BuyerAgentRegistry | None = None,
87+
config_store: ProductConfigStore | None = None,
8688
) -> tuple[PlatformHandler, ThreadPoolExecutor, TaskRegistry]:
8789
"""Build the :class:`PlatformHandler` + supporting wiring from a
8890
:class:`DecisioningPlatform`.
@@ -272,6 +274,7 @@ def create_adcp_server_from_platform(
272274
webhook_supervisor=webhook_supervisor,
273275
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
274276
buyer_agent_registry=buyer_agent_registry,
277+
config_store=config_store,
275278
)
276279

277280
# F12 boot-time fail-fast (Emma sales-direct P0 root cause): if
@@ -322,6 +325,7 @@ def serve(
322325
webhook_supervisor: WebhookDeliverySupervisor | None = None,
323326
auto_emit_completion_webhooks: bool = True,
324327
buyer_agent_registry: BuyerAgentRegistry | None = None,
328+
config_store: ProductConfigStore | None = None,
325329
advertise_all: bool = False,
326330
mock_ad_server: Any | None = None,
327331
enable_debug_endpoints: bool = False,
@@ -399,6 +403,7 @@ def serve(
399403
webhook_supervisor=webhook_supervisor,
400404
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
401405
buyer_agent_registry=buyer_agent_registry,
406+
config_store=config_store,
402407
)
403408

404409
# Phase 1 sandbox-authority — wire the comply controller's account

src/adcp/decisioning/specialisms/sales.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,21 @@ def create_media_buy(
136136
* ``media_buy_id`` field present → sync success
137137
* ``task_id`` + ``status='submitted'`` → poll ``tasks_get`` or
138138
receive webhook
139+
140+
**Framework injection:** when a :class:`~adcp.decisioning.ProductConfigStore`
141+
is wired via ``config_store=`` in
142+
:func:`adcp.decisioning.serve.create_adcp_server_from_platform`,
143+
the framework calls the store before invoking this method and
144+
injects the result as ``configs: dict[str, dict[str, Any]]``.
145+
Declare ``configs`` in your method signature to receive it::
146+
147+
def create_media_buy(self, req, ctx, configs=None):
148+
configs = configs or {}
149+
line_item_id = configs.get(pkg.product_id, {}).get("line_item_id")
150+
151+
When the store is not wired, or when ``req.packages`` is
152+
``None`` (proposal-id flow), ``configs`` arrives as an empty
153+
dict ``{}``.
139154
"""
140155
...
141156

0 commit comments

Comments
 (0)