Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
DirectoryEdgeStatus,
DirectoryPublisherEntry,
DiscoveryMethod,
DivergenceReport,
EntryErrorKind,
PublisherDivergence,
detect_publisher_properties_divergence,
domain_matches,
fetch_adagents,
fetch_adagents_with_cache,
Expand Down Expand Up @@ -832,7 +835,10 @@ def get_adcp_version() -> str:
"DirectoryEdgeStatus",
"DirectoryPublisherEntry",
"DiscoveryMethod",
"DivergenceReport",
"EntryErrorKind",
"PublisherDivergence",
"detect_publisher_properties_divergence",
"fetch_adagents",
"fetch_adagents_with_cache",
"fetch_agent_authorizations",
Expand Down
247 changes: 246 additions & 1 deletion src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import asyncio
import ipaddress
import json
import logging
import re
import socket
from dataclasses import dataclass, field
Expand All @@ -25,6 +26,8 @@
from adcp.types.base import AdCPBaseModel
from adcp.validation import ValidationError, validate_adagents

logger = logging.getLogger(__name__)

DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"]


Expand Down Expand Up @@ -1804,6 +1807,16 @@ class DirectoryPublisherEntry(AdCPBaseModel):
signing_keys_pinned: bool | None = None
status: DirectoryEdgeStatus
last_verified_at: datetime
property_ids: list[str] | None = Field(
default=None,
description=(
"Canonical property IDs the agent's selectors resolve to under "
"this publisher. Present iff the request was made with "
"include=['properties'] AND the directory server supports it "
"(per adcp#4894). None signals count-only mode for downstream "
"consumers."
),
)


class AgentAuthorizationsDirectoryResult(AdCPBaseModel):
Expand All @@ -1826,12 +1839,18 @@ class AgentAuthorizationsDirectoryResult(AdCPBaseModel):
# page is a small envelope; pagination handles bulk responses.
MAX_DIRECTORY_PAGE_BYTES = 5 * 1024 * 1024

# Hard cap on directory pagination iterations. A misbehaving directory that
# never returns an empty next_cursor would otherwise hang the sweep
# indefinitely; this cap fail-closes the loop.
MAX_DIRECTORY_PAGES = 1000


async def fetch_agent_authorizations_from_directory(
agent_url: str,
*,
directory_url: str,
since: str | None = None,
include: list[str] | None = None,
timeout: float = 10.0,
client: httpx.AsyncClient | None = None,
) -> AgentAuthorizationsDirectoryResult:
Expand All @@ -1854,6 +1873,15 @@ async def fetch_agent_authorizations_from_directory(
since: Optional opaque cursor or RFC 3339 timestamp from a prior
``directory_indexed_at`` — passed through as ``?since=...``
to limit the result to edges that changed since that point.
include: Optional list of expansion keys per the AAO directory
API spec (adcp#4894). Each value is emitted as a separate
``?include=<value>`` query parameter (repeated-key form, not
comma-joined). Pass ``["properties"]`` against directories
that support it to receive per-publisher ``property_ids[]``
on each row, enabling full set-diff against the publisher's
own adagents.json. Directories that don't support a given
expansion key simply omit the corresponding fields from the
response; callers should treat absence as count-only mode.
timeout: Request timeout in seconds.
client: Optional shared ``httpx.AsyncClient`` for connection
pooling. Caller owns the client lifecycle.
Expand Down Expand Up @@ -1890,8 +1918,18 @@ async def fetch_agent_authorizations_from_directory(
_validate_redirect_url(f"{base}/v1/agents/_/publishers")

request_url = f"{base}/v1/agents/{quote(agent_url, safe='')}/publishers"
query_pairs: list[tuple[str, str]] = []
if since is not None:
request_url = f"{request_url}?since={quote(since, safe='')}"
query_pairs.append(("since", since))
if include:
# Repeated-key form per docs/aao/directory-api.mdx (style: form,
# explode: true). Comma-joined NOT accepted by spec-conformant
# directories.
for value in include:
query_pairs.append(("include", value))
if query_pairs:
query_string = "&".join(f"{quote(k, safe='')}={quote(v, safe='')}" for k, v in query_pairs)
request_url = f"{request_url}?{query_string}"

parsed = urlparse(request_url)
await _dns_validate_host(
Expand Down Expand Up @@ -1943,3 +1981,210 @@ async def fetch_agent_authorizations_from_directory(
raise AdagentsValidationError(
f"Agent-publishers directory response failed schema validation: {e}"
) from e


class PublisherDivergence(AdCPBaseModel):
"""Divergence record for a single publisher domain.

``missing_in_inline``: property IDs the federated fetch found in the
publisher's own adagents.json that the directory did not surface
(publisher has properties the directory doesn't know about yet).

``missing_in_federated``: property IDs the directory claims the agent
is authorized for but the publisher's own adagents.json does not
include (stale directory entry or publisher revocation).

Both fields are None in count-only fallback mode (directory did
not return ``property_ids[]``). In count-only mode, count-equality
does NOT guarantee set-equality — same-count substitutions are
undetectable. Use ``?include=properties`` (adcp#4894) on directories
that support it for full set-diff precision.

``child_fetch_error`` is non-None when the publisher's adagents.json
could not be fetched or parsed; other fields carry no meaning.
"""

publisher_domain: str
directory_properties_authorized: int = Field(ge=0)
federated_properties_found: int = Field(ge=0)
missing_in_inline: list[str] | None = None
missing_in_federated: list[str] | None = None
child_fetch_error: str | None = None


DivergenceReport = list[PublisherDivergence]


async def detect_publisher_properties_divergence(
agent_url: str,
*,
directory_url: str,
sample_size: int | None = 200,
max_concurrency: int = 20,
timeout: float = 30.0,
client: httpx.AsyncClient | None = None,
) -> DivergenceReport:
"""Compare directory's inline resolution against per-publisher federated fetches.

For each publisher the directory lists under ``agent_url``, fetches
that publisher's own ``adagents.json`` and compares the property set
against the directory's claim. Returns only publishers where the two
paths disagree (or where the child fetch failed).

Always requests ``include=["properties"]`` from the directory so the
full ``(publisher_domain, property_id)`` set-diff lights up on
directories that support adcp#4894. Against older directories that
return only ``properties_authorized`` counts, falls back to count-
comparison; ``missing_in_inline`` / ``missing_in_federated`` are
None in that fallback path.

Per adcp#4827 §Resolution-paths, the federated result is
authoritative when the two paths disagree.

Args:
agent_url: agent to check.
directory_url: AAO directory base URL (HTTPS only — same SSRF
gate as :func:`fetch_agent_authorizations_from_directory`).
sample_size: cap the sweep at N publishers (drawn from the first
page of directory results). None opts into a full sweep
across all pages — only do this for small networks. Default
200 keeps the divergence sweep bounded by default.
max_concurrency: semaphore-capped concurrent federated fetches.
Default 20 — caps the burst against publisher origins.
timeout: per-request timeout (directory + child fetches).
client: optional shared ``httpx.AsyncClient``.

Returns:
:data:`DivergenceReport` (``list[PublisherDivergence]``). Empty
list = no divergence detected. Note in count-only fallback mode,
an empty list means counts agree but set-equality is not
guaranteed.
"""
own_client = client is None
http = client or httpx.AsyncClient()
try:
collected: list[DirectoryPublisherEntry] = []
cursor: str | None = None
seen_cursors: set[str] = set()
page_count = 0
while True:
page = await fetch_agent_authorizations_from_directory(
agent_url,
directory_url=directory_url,
since=cursor,
include=["properties"],
timeout=timeout,
client=http,
)
page_count += 1
collected.extend(page.publishers)
if sample_size is not None and len(collected) >= sample_size:
collected = collected[:sample_size]
break
cursor = page.next_cursor
if not cursor:
break
if cursor in seen_cursors:
raise AdagentsValidationError(
f"Directory page cursor {cursor!r} repeated — refusing to loop forever."
)
seen_cursors.add(cursor)
if page_count >= MAX_DIRECTORY_PAGES:
raise AdagentsValidationError(
f"Directory pagination exceeded {MAX_DIRECTORY_PAGES} pages — aborting sweep."
)

# Dedupe by publisher_domain before fan-out: a hostile directory
# returning N rows for the same publisher would otherwise amplify
# into N concurrent fetches against a single victim host. First
# occurrence wins (deterministic) — conflicting property_ids /
# properties_authorized across duplicates are dropped here; the
# directory's behavior is itself a divergence signal for ops.
seen_domains: set[str] = set()
deduped: list[DirectoryPublisherEntry] = []
for entry in collected:
if entry.publisher_domain in seen_domains:
continue
seen_domains.add(entry.publisher_domain)
deduped.append(entry)
collected = deduped

# Emit a one-shot warning when the entire sample comes back without
# property_ids[]. In count-only mode, same-count substitutions are
# undetectable — adopters should pin include=["properties"] support
# on directories that offer it.
if collected and all(e.property_ids is None for e in collected):
logger.warning(
"AAO directory %s did not return property_ids[] on any publisher "
"entry — falling back to count-only divergence detection. Same-count "
"substitutions are undetectable in this mode. Upgrade the directory "
"or pin include=['properties'] support.",
directory_url,
)

sem = asyncio.Semaphore(max_concurrency)

async def _probe(entry: DirectoryPublisherEntry) -> PublisherDivergence | None:
async with sem:
try:
data = await fetch_adagents(
entry.publisher_domain, timeout=timeout, client=http
)
federated_props = get_properties_by_agent(data, agent_url)
# Falsy/empty property_id is silently dropped: upstream
# schema requires a non-empty string, so an empty value
# is a structural violation that belongs in
# validate_adagents, not a divergence signal. Federated
# properties with valid IDs only.
federated_ids = {
str(p.get("property_id")) for p in federated_props if p.get("property_id")
}
except (
AdagentsNotFoundError,
AdagentsValidationError,
AdagentsTimeoutError,
httpx.HTTPError,
OSError,
ValueError,
) as exc:
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=0,
missing_in_inline=None,
missing_in_federated=None,
child_fetch_error=str(exc),
)

if entry.property_ids is not None:
# Full set-diff path (adcp#4894).
dir_ids = set(entry.property_ids)
missing_in_inline = sorted(federated_ids - dir_ids)
missing_in_federated = sorted(dir_ids - federated_ids)
if not missing_in_inline and not missing_in_federated:
return None
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=len(federated_ids),
missing_in_inline=missing_in_inline,
missing_in_federated=missing_in_federated,
)

# Count-only fallback (older directories).
if len(federated_ids) == entry.properties_authorized:
return None
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=len(federated_ids),
missing_in_inline=None,
missing_in_federated=None,
)

probes = await asyncio.gather(*[_probe(e) for e in collected])
finally:
if own_client:
await http.aclose()

return [p for p in probes if p is not None]
3 changes: 3 additions & 0 deletions tests/fixtures/public_api_snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
"DirectoryEdgeStatus",
"DirectoryPublisherEntry",
"DiscoveryMethod",
"DivergenceReport",
"DomainLookupResult",
"Duration",
"EntryErrorKind",
Expand Down Expand Up @@ -253,6 +254,7 @@
"ProvidePerformanceFeedbackRequest",
"ProvidePerformanceFeedbackResponse",
"ProvidePerformanceFeedbackSuccessResponse",
"PublisherDivergence",
"PublisherProperties",
"PublisherPropertiesAll",
"PublisherPropertiesById",
Expand Down Expand Up @@ -351,6 +353,7 @@
"create_mcp_webhook_payload",
"create_test_agent",
"creative_agent",
"detect_publisher_properties_divergence",
"domain_matches",
"extract_webhook_result_data",
"fetch_adagents",
Expand Down
Loading
Loading