Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/adcp/decisioning/brand_authz_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Tier 3 brand-authorization dispatch gate.

Composes the :class:`adcp.signing.BrandAuthorizationResolver` Protocol
(landed in v3-identity stages 1-3, issue #350 PR #770) with the
framework's dispatch path so adopters can opt into per-brand
authorization the same way they opt into Tier 2 commercial-identity
gating today (via ``serve(buyer_agent_registry=...)``).

The gate runs **after** Tier 2 buyer-agent resolution and
:meth:`AccountStore.resolve`, and **before** the platform method
executes. By that point the framework has:

* a verified, registry-recognized :class:`BuyerAgent` with an active
status (else Tier 2 already raised), and
* a resolved :class:`Account` the request operates on.

What the gate is *not*: it is not the verifier-side
``request_signature_*`` rejection path (that lives in
:mod:`adcp.signing.key_origins` for the JWKS-origin check and in the
forthcoming verifier integration for the brand_json chain checks —
issue #776). This dispatch-layer gate emits the cross-cutting
``PERMISSION_DENIED`` auth-denial code regardless of credential kind,
matching the Tier 2 unrecognized-agent rejection surface. Per-purpose
spec-shape codes are reserved for the verifier path.

The gate is opt-in. Adopters wire it by passing **both**
``brand_authz_resolver`` and ``brand_identity_resolver`` to
:func:`adcp.decisioning.serve`. Wiring one without the other raises
at boot — partial wiring is almost always a misconfiguration: a
resolver without an extractor never has a brand to check, an extractor
without a resolver never has anything to do.
"""

from __future__ import annotations

from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING

# ``BuyerAgent`` and ``Account`` are referenced at runtime by the
# :data:`BrandIdentityResolver` ``Callable`` alias below — that
# subscription happens at module load (not under
# ``from __future__ import annotations``, which defers annotations
# only), so the names must be importable then. Both modules are
# foundational with no path back to this one, so promoting them out
# of ``TYPE_CHECKING`` carries no circular-import risk.
from adcp.decisioning.registry import BuyerAgent
from adcp.decisioning.types import Account

if TYPE_CHECKING:
from adcp.signing.brand_authz import BrandAuthorizationResolver


@dataclass(frozen=True)
class BrandIdentity:
"""The brand a request operates against — the input to the Tier 3
binding check.

Mirrors the brand identifier shape carried on wire :class:`AccountReference`
natural-key references (``account.brand.domain``) and the
spec-defined ``brand_id`` field on
``brand.json/authorized_operators[].brands[]``.

:param domain: The brand's domain (e.g. ``"nike.com"``). Required —
eTLD+1 binding cannot proceed without it.
:param id: Optional brand identifier from the seller's portfolio.
When set, the operator-delegation check is scoped to this brand
(``authorized_operators[i].brands[]`` must contain this ``id``
or ``"*"``). When ``None``, only ``"*"``-scoped operators
satisfy the delegation check — fail-closed on unscoped
delegation per :class:`BrandJsonAuthorizationResolver`.
"""

domain: str
id: str | None = None


#: Adopter-provided callable that extracts the brand identity from a
#: resolved :class:`Account` (+ the resolved :class:`BuyerAgent`).
#:
#: Returns ``None`` when the request has no brand to bind against — the
#: gate skips on ``None`` (the adopter is telling the framework "this
#: request isn't brand-scoped, no Tier 3 check applies"). Returns a
#: :class:`BrandIdentity` when the request operates on behalf of a
#: brand — the framework runs the binding check.
#:
#: Adopters in ``'derived'`` resolution mode typically derive the brand
#: from ``buyer_agent.allowed_brands`` or the agent's onboarding record;
#: adopters in ``'explicit'`` mode read ``account.metadata`` populated
#: by their :meth:`AccountStore.resolve`. Either path is fine — the
#: framework does not constrain the source, only the return shape.
BrandIdentityResolver = Callable[
[Account[object], BuyerAgent | None],
BrandIdentity | None | Awaitable[BrandIdentity | None],
]


@dataclass(frozen=True)
class BrandAuthorizationGate:
"""Bundle of the Tier 3 dependencies — the authorization resolver
plus the brand-identity extractor.

Passed to the dispatch path as a single immutable bundle so the
pairing rule (both wired or neither) holds by construction:
constructing the bundle requires both, so the framework only ever
sees a complete configuration or no configuration at all.
"""

resolver: BrandAuthorizationResolver
extract_identity: BrandIdentityResolver
124 changes: 124 additions & 0 deletions src/adcp/decisioning/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
if TYPE_CHECKING:
from concurrent.futures import ThreadPoolExecutor

from adcp.decisioning.brand_authz_gate import BrandAuthorizationGate
from adcp.decisioning.media_buy_store import MediaBuyStore
from adcp.decisioning.platform import DecisioningPlatform
from adcp.decisioning.property_list import PropertyListFetcher
Expand Down Expand Up @@ -598,6 +599,107 @@ async def _resolve_buyer_agent(
)


async def _enforce_brand_authorization(
gate: BrandAuthorizationGate,
account: Account[Any],
buyer_agent: BuyerAgent | None,
) -> None:
"""Tier 3 per-brand authorization gate.

Runs after Tier 2 (:func:`_resolve_buyer_agent`) and
:meth:`AccountStore.resolve` — by this point the framework has a
verified, active buyer-agent and a resolved account. The gate asks
the adopter's :class:`BrandAuthorizationResolver`: is this agent
authorized to act *for this brand*?

Sourcing the brand identity is the adopter's call — the framework
delegates via :attr:`BrandAuthorizationGate.extract_identity`. The
extractor reads whatever signal the seller's account model carries
(``account.metadata['brand_domain']``, the natural-key reference's
``brand.domain``, an internal mapping, etc.) and returns a
:class:`BrandIdentity` or ``None``. ``None`` means "this request
isn't brand-scoped" — the gate skips, the platform method runs.

The extractor MAY be async. Both sync (``-> BrandIdentity | None``)
and async (``-> Awaitable[BrandIdentity | None]``) shapes are
supported so adopters can fetch from a remote registry without
wrapping in :func:`asyncio.to_thread`.

**No buyer-agent → no gate.** When Tier 2 isn't wired (the
framework has no ``buyer_agent``), the gate skips: brand
authorization without a buyer-agent identity has no subject to
authorize. Adopters wiring Tier 3 without Tier 2 see the gate
silently pass — the boot validation in :func:`serve` doesn't
enforce the Tier-2-before-Tier-3 pairing because some adopters
legitimately want brand identity flowing through dispatch as a
capability without commercial-identity gating (read-only audit
paths, etc.). They get a no-op gate, not a rejection.

**Denial surface.** Rejection emits the generic
``PERMISSION_DENIED`` (``recovery="correctable"``) with the same
cross-tenant-safe message as Tier 2's unrecognized-agent rejection.
Spec-shape ``request_signature_brand_origin_mismatch`` /
``request_signature_agent_not_in_brand_json`` codes belong to the
verifier-side wire path (issue #776 will plumb the
``BrandJsonJwksResolver`` source discriminant through to the
verifier — this dispatch-layer gate emits the
framework-cross-cutting denial code).

Timing-oracle defense: same :class:`PermissionDeniedBudget` shape
as Tier 2. The brand.json fetch path's natural variance (cache hit
vs miss vs stale-on-error) is larger than the registry-lookup
variance Tier 2 absorbs, so the budget here is a floor, not a
ceiling — it prevents the cache-hit-authorized vs
cache-hit-rejected paths from leaking timing-distinguishable
decisions on the happy-cache path, which is the common case.

:raises AdcpError: ``PERMISSION_DENIED`` when the resolver denies
authorization. ``recovery="correctable"`` per the spec's
``enumMetadata`` for ``PERMISSION_DENIED``.
"""
import inspect

from adcp.decisioning._permission_denied_budget import PermissionDeniedBudget
from adcp.decisioning.types import AdcpError

if buyer_agent is None:
return

maybe_identity = gate.extract_identity(account, buyer_agent)
if inspect.isawaitable(maybe_identity):
identity = await maybe_identity
else:
identity = maybe_identity
if identity is None:
return

budget = PermissionDeniedBudget()
authorized = await gate.resolver.is_authorized(
agent_url=buyer_agent.agent_url,
brand_domain=identity.domain,
brand_id=identity.id,
)
if authorized:
return

# Same generic message as the Tier 2 unrecognized-agent rejection.
# Identical wire-level error.message across both gates so the
# message itself is not a side channel discriminating
# commercial-identity rejection from brand-authorization rejection.
_denied_message = (
"Buyer agent is not authorized for this seller. The seller's "
"commercial allowlist did not authorize this credential. "
"Resolve out-of-band via the seller's onboarding contact; this "
"is not a request-side error the buyer can correct."
)
await budget.enforce()
raise AdcpError(
"PERMISSION_DENIED",
message=_denied_message,
recovery="correctable",
)


def _project_build_creative(result: Any) -> Any:
"""Project the adopter's ``build_creative`` return into the wire
envelope shape.
Expand Down Expand Up @@ -1028,6 +1130,7 @@ def __init__(
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
buyer_agent_registry: BuyerAgentRegistry | None = None,
brand_authorization_gate: BrandAuthorizationGate | None = None,
config_store: ProductConfigStore | None = None,
property_list_fetcher: PropertyListFetcher | None = None,
media_buy_store: MediaBuyStore | None = None,
Expand All @@ -1043,6 +1146,7 @@ def __init__(
self._webhook_supervisor = webhook_supervisor
self._auto_emit_completion_webhooks = auto_emit_completion_webhooks
self._buyer_agent_registry = buyer_agent_registry
self._brand_authorization_gate = brand_authorization_gate
self._config_store = config_store
self._property_list_fetcher = property_list_fetcher
self._media_buy_store = media_buy_store
Expand Down Expand Up @@ -1153,6 +1257,26 @@ async def _resolve_account(
from adcp.decisioning.observed_modes import record_resolved_account_mode

record_resolved_account_mode(resolved)

# Tier 3 brand-authorization gate (issue #350 stage 5). Opt-in
# via ``serve(brand_authz_resolver=..., brand_identity_resolver=...)``.
# Runs AFTER ``accounts.resolve`` so the gate has the resolved
# :class:`Account` available — adopters source the brand identity
# from whatever signal their account model carries (the wire
# :class:`AccountReference` natural-key shape, ``account.metadata``,
# an internal mapping, etc.). The gate is a no-op when no buyer
# agent has been resolved (Tier 2 not wired) — see
# :func:`_enforce_brand_authorization` for the rationale.
if self._brand_authorization_gate is not None:
buyer_agent_for_gate = cast(
"BuyerAgent | None",
ctx.metadata.get(_BUYER_AGENT_METADATA_KEY) if ctx.metadata else None,
)
await _enforce_brand_authorization(
self._brand_authorization_gate,
resolved,
buyer_agent_for_gate,
)
return resolved

@staticmethod
Expand Down
35 changes: 35 additions & 0 deletions src/adcp/decisioning/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from adcp.decisioning.types import AdcpError

if TYPE_CHECKING:
from adcp.decisioning.brand_authz_gate import BrandIdentityResolver
from adcp.decisioning.implementation_config import ProductConfigStore
from adcp.decisioning.media_buy_store import MediaBuyStore
from adcp.decisioning.platform import DecisioningPlatform
Expand All @@ -48,6 +49,7 @@
from adcp.decisioning.resolve import ResourceResolver
from adcp.decisioning.state import StateReader
from adcp.decisioning.task_registry import TaskRegistry
from adcp.signing.brand_authz import BrandAuthorizationResolver
from adcp.webhook_sender import WebhookSender
from adcp.webhook_supervisor import WebhookDeliverySupervisor

Expand Down Expand Up @@ -86,6 +88,8 @@ def create_adcp_server_from_platform(
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
buyer_agent_registry: BuyerAgentRegistry | None = None,
brand_authz_resolver: BrandAuthorizationResolver | None = None,
brand_identity_resolver: BrandIdentityResolver | None = None,
config_store: ProductConfigStore | None = None,
property_list_fetcher: PropertyListFetcher | None = None,
media_buy_store: MediaBuyStore | None = None,
Expand Down Expand Up @@ -323,6 +327,32 @@ def create_adcp_server_from_platform(
# propagates to the caller.
validate_platform(platform)

# Tier 3 brand-authorization gate (issue #350 stage 5). The pair is
# bundled here so the dispatch path sees an atomic configuration:
# both wired or neither. Passing one without the other is almost
# always a misconfiguration (a resolver without an extractor never
# has a brand to check; an extractor without a resolver never has
# anything to do) — fail closed at boot with a specific diagnostic
# rather than a silent never-fires gate at request time.
if (brand_authz_resolver is None) != (brand_identity_resolver is None):
raise ValueError(
"brand_authz_resolver and brand_identity_resolver must be wired "
"together. Pass both (to enable Tier 3 brand-authorization gating) "
"or neither (to skip the gate). A resolver without an extractor "
"has no brand identity to check; an extractor without a resolver "
"has nothing to do."
)
brand_authorization_gate: BrandAuthorizationGate | None
if brand_authz_resolver is not None and brand_identity_resolver is not None:
from adcp.decisioning.brand_authz_gate import BrandAuthorizationGate

brand_authorization_gate = BrandAuthorizationGate(
resolver=brand_authz_resolver,
extract_identity=brand_identity_resolver,
)
else:
brand_authorization_gate = None

handler = PlatformHandler(
platform,
executor=executor,
Expand All @@ -333,6 +363,7 @@ def create_adcp_server_from_platform(
webhook_supervisor=webhook_supervisor,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
buyer_agent_registry=buyer_agent_registry,
brand_authorization_gate=brand_authorization_gate,
config_store=config_store,
property_list_fetcher=property_list_fetcher,
media_buy_store=media_buy_store,
Expand Down Expand Up @@ -428,6 +459,8 @@ def serve(
webhook_supervisor: WebhookDeliverySupervisor | None = None,
auto_emit_completion_webhooks: bool = True,
buyer_agent_registry: BuyerAgentRegistry | None = None,
brand_authz_resolver: BrandAuthorizationResolver | None = None,
brand_identity_resolver: BrandIdentityResolver | None = None,
config_store: ProductConfigStore | None = None,
property_list_fetcher: PropertyListFetcher | None = None,
advertise_all: bool = False,
Expand Down Expand Up @@ -539,6 +572,8 @@ def serve(
webhook_supervisor=webhook_supervisor,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
buyer_agent_registry=buyer_agent_registry,
brand_authz_resolver=brand_authz_resolver,
brand_identity_resolver=brand_identity_resolver,
config_store=config_store,
property_list_fetcher=property_list_fetcher,
advertise_all=advertise_all,
Expand Down
Loading
Loading