Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 167 additions & 12 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,12 +1207,16 @@ async def verify_agent_for_property(
def _resolve_agent_properties(
agent: dict[str, Any],
top_level_properties: list[dict[str, Any]],
domain_index: dict[str, list[dict[str, Any]]],
) -> list[dict[str, Any]]:
"""Resolve properties for a single agent entry based on its authorization_type.

Args:
agent: An authorized_agents entry
top_level_properties: The top-level properties array from adagents.json
domain_index: Pre-built ``publisher_domain → [property, ...]`` index
over ``top_level_properties`` (built once per file by the caller
via :func:`_build_domain_index`).

Returns:
List of resolved property dicts for this agent
Expand All @@ -1230,7 +1234,10 @@ def _resolve_agent_properties(

# Handle property_ids (filter top-level properties by property_id)
if authorization_type == "property_ids":
authorized_ids = set(agent.get("property_ids", []))
raw_ids = agent.get("property_ids")
if not isinstance(raw_ids, list):
return []
authorized_ids = {i for i in raw_ids if isinstance(i, str)}
return [
p
for p in top_level_properties
Expand All @@ -1239,7 +1246,10 @@ def _resolve_agent_properties(

# Handle property_tags (filter top-level properties by tags)
if authorization_type == "property_tags":
authorized_tags = {t for t in agent.get("property_tags", []) if isinstance(t, str)}
raw_tags = agent.get("property_tags")
if not isinstance(raw_tags, list):
return []
authorized_tags = {t for t in raw_tags if isinstance(t, str)}
return [
p
for p in top_level_properties
Expand All @@ -1250,16 +1260,131 @@ def _resolve_agent_properties(
# Handle publisher_properties (cross-domain references).
# Each entry with publisher_domains[a,b,c] fans out to one selector per
# listed domain — the compact form is exactly equivalent to repeating
# the entry once per publisher per adcp#4504.
# the entry once per publisher per adcp#4504. Selectors are then
# resolved inline against the parent file's top-level properties[]
# array, indexed by publisher_domain, per adcp#4827.
if authorization_type == "publisher_properties":
publisher_props = agent.get("publisher_properties", [])
if not isinstance(publisher_props, list):
return []
return _fanout_publisher_properties([p for p in publisher_props if isinstance(p, dict)])
selectors = _fanout_publisher_properties(
[p for p in publisher_props if isinstance(p, dict)]
)
return _resolve_publisher_property_selectors(selectors, domain_index)

return []


def _build_domain_index(
properties: list[dict[str, Any]],
) -> dict[str, list[dict[str, Any]]]:
"""Build a ``publisher_domain → [property, ...]`` index.

O(N) up-front cost; reused across every selector for that file so the
per-selector resolution cost drops from O(properties) to O(1) lookup
plus O(matches) filtering. Malformed entries (non-dict, missing or
non-string ``publisher_domain``) are skipped.
"""
domain_index: dict[str, list[dict[str, Any]]] = {}
for prop in properties:
if not isinstance(prop, dict):
continue
domain = prop.get("publisher_domain")
if not isinstance(domain, str) or not domain:
continue
domain_index.setdefault(domain, []).append(prop)
return domain_index


def _resolve_publisher_property_selectors(
selectors: list[dict[str, Any]],
domain_index: dict[str, list[dict[str, Any]]],
) -> list[dict[str, Any]]:
"""Resolve fanned-out publisher_properties selectors against inline data.

Resolves selectors per adcp#4827 §Resolution-paths (inline path
against the parent file's top-level properties[] indexed by
publisher_domain).

For each selector (one per publisher_domain), look up the matching
properties in ``domain_index`` by ``publisher_domain`` and apply the
selector's ``selection_type``:

- ``"all"``: every property under that domain
- ``"by_tag"``: properties whose ``tags`` intersect ``property_tags``
(empty ``property_tags`` resolves to ``[]`` — fail-closed, no
"tag list omitted means everything")
- ``"by_id"``: properties whose ``property_id`` is in ``property_ids``
(empty ``property_ids`` resolves to ``[]`` — same fail-closed rule)
- Anything else: ``[]`` (fail-closed; unknown selection_type does
not authorize anything — see CLAUDE.md "no fallbacks" on
authorization decisions)

Selectors whose domain has no entries in the index are skipped —
federated fallback (fetching the publisher's own adagents.json) is
out of scope for this resolver and lives in companion helpers.

``domain_index`` is built once per file by :func:`_build_domain_index`
and reused across every agent's selectors in that file.

Results are deduplicated by ``(publisher_domain, property_id)``.
Raises :class:`AdagentsValidationError` if any matching property is
missing the required ``property_id`` field (fail-fast per CLAUDE.md).
"""
if not selectors:
return []

resolved: list[dict[str, Any]] = []
seen: set[tuple[str, str]] = set()
for selector in selectors:
domain = selector.get("publisher_domain")
if not isinstance(domain, str) or not domain:
continue
candidates = domain_index.get(domain)
if not candidates:
continue

selection_type = selector.get("selection_type")
matched: list[dict[str, Any]]
if selection_type == "all":
matched = list(candidates)
elif selection_type == "by_tag":
raw_tags = selector.get("property_tags")
if not isinstance(raw_tags, list):
continue
wanted_tags = {t for t in raw_tags if isinstance(t, str)}
if not wanted_tags:
continue
matched = [
p
for p in candidates
if {t for t in p.get("tags", []) or [] if isinstance(t, str)} & wanted_tags
]
elif selection_type == "by_id":
raw_ids = selector.get("property_ids")
if not isinstance(raw_ids, list):
continue
wanted_ids = {i for i in raw_ids if isinstance(i, str)}
if not wanted_ids:
continue
matched = [p for p in candidates if p.get("property_id") in wanted_ids]
else:
continue

for prop in matched:
prop_id = prop.get("property_id")
if not isinstance(prop_id, str) or not prop_id:
raise AdagentsValidationError(
f"property under domain={domain!r} missing required 'property_id'"
)
key = (domain, prop_id)
if key in seen:
continue
seen.add(key)
resolved.append(prop)
return resolved


def _fanout_publisher_properties(
publisher_props: list[dict[str, Any]],
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -1340,6 +1465,17 @@ def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]:
Handles all authorization types: inline_properties, property_ids,
property_tags, and publisher_properties.

For ``publisher_properties`` selectors whose target ``publisher_domain``
is NOT present inline in this file's top-level ``properties[]`` array,
this function returns no properties for that selector. Federated
fallback (fetching the child publisher's own adagents.json to resolve
the selector remotely) is out of scope here and lives in
:func:`fetch_agent_authorizations_from_directory` and
:func:`detect_publisher_properties_divergence` from companion PR #752.
Wire-only authorization checks that assume federated resolution will
under-authorize against managed-network parent files that only inline
a subset of their child domains.

Args:
adagents_data: Parsed adagents.json data

Expand Down Expand Up @@ -1371,6 +1507,11 @@ def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]:
)
]

# Build the domain index once per file — _resolve_agent_properties is
# called per-agent, and at cafemedia scale (thousands of properties ×
# multiple agents) rebuilding it inside each call is O(agents × N).
domain_index = _build_domain_index(revoked_top_level)

properties = []
for agent in authorized_agents:
if not isinstance(agent, dict):
Expand All @@ -1380,9 +1521,9 @@ def get_all_properties(adagents_data: dict[str, Any]) -> list[dict[str, Any]]:
if not agent_url:
continue

agent_properties = _resolve_agent_properties(agent, revoked_top_level)
if revoked and agent.get("authorization_type") == "publisher_properties":
agent_properties = filter_revoked_selectors(agent_properties, revoked)
# revoked_top_level pre-filters revoked domains from the per-domain
# index, so inline resolution honors revocation transparently.
agent_properties = _resolve_agent_properties(agent, revoked_top_level, domain_index)

for prop in agent_properties:
prop_with_agent = {**prop, "agent_url": agent_url}
Expand Down Expand Up @@ -1423,8 +1564,20 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li
- inline_properties: Properties defined directly in the agent's properties array
- property_ids: Filter top-level properties by property_id
- property_tags: Filter top-level properties by tags
- publisher_properties: References properties from other publisher domains
(returns the selector objects, not resolved properties)
- publisher_properties: Inline-resolved properties from cross-publisher
selectors (resolved from the parent file's top-level properties[]
array per adcp#4827)

For ``publisher_properties`` selectors whose target ``publisher_domain``
is NOT present inline in this file's top-level ``properties[]`` array,
this function returns no properties for that selector. Federated
fallback (fetching the child publisher's own adagents.json to resolve
the selector remotely) is out of scope here and lives in
:func:`fetch_agent_authorizations_from_directory` and
:func:`detect_publisher_properties_divergence` from companion PR #752.
Wire-only authorization checks that assume federated resolution will
under-authorize against managed-network parent files that only inline
a subset of their child domains.

Args:
adagents_data: Parsed adagents.json data
Expand Down Expand Up @@ -1460,6 +1613,8 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li

normalized_agent_url = normalize_url(agent_url)

domain_index = _build_domain_index(revoked_top_level)

for agent in authorized_agents:
if not isinstance(agent, dict):
continue
Expand All @@ -1471,9 +1626,9 @@ def get_properties_by_agent(adagents_data: dict[str, Any], agent_url: str) -> li
if normalize_url(agent_url_from_json) != normalized_agent_url:
continue

resolved = _resolve_agent_properties(agent, revoked_top_level)
if revoked and agent.get("authorization_type") == "publisher_properties":
resolved = filter_revoked_selectors(resolved, revoked)
# revoked_top_level pre-filters revoked domains from the per-domain
# index, so inline resolution honors revocation transparently.
resolved = _resolve_agent_properties(agent, revoked_top_level, domain_index)
return resolved

return []
Expand Down
Loading
Loading