Skip to content

Commit f48cd04

Browse files
bokelleyclaude
andcommitted
fix(adagents): address review on directory wrapper + divergence detector (#752)
Spec conformance and concurrency fixes from PR review: - Endpoint path corrected: /v1/agents/{agent_url}/publishers (drop /api/). - Response envelope now spec-strict: read `next_cursor` and `publishers` only (no `cursor`/`results` fallbacks); read `directory_indexed_at` required on non-empty pages; remove dead `total` field. - `AgentDirectoryLookup.cursor` renamed to `next_cursor`. - `AgentPublisherEntry.discovery_method` and `.status` now typed as the spec Literal enums (`DiscoveryMethod`, new `PublisherStatus`); a missing or out-of-enum value raises AdagentsValidationError instead of silently defaulting to `"adagents_authoritative"`/`"authorized"`. - `detect_publisher_properties_divergence` now caps concurrent child probes via `max_concurrency` (default 20) using `asyncio.Semaphore`. The `_probe` except clause now also catches `httpx.HTTPError`, `OSError`, `ValueError` so a single network failure no longer aborts the gather. - `sample_size` default flipped from `None` to `200`; pass `sample_size=None` to opt into a full sweep. - `directory_url` default now reads `AAO_PUBLISHER_DIRECTORY_URL` env var at call time via a sentinel, falling back to `"https://agenticadvertising.org"`. - Module-local `import asyncio` / `from urllib.parse import quote` moved to the top of the file. - Tests for the directory wrapper and divergence detector rewritten to use `respx.mock` with spec-shaped JSON payloads instead of bare `MagicMock(httpx.AsyncClient.get)`. New tests cover env-var defaulting, `directory_indexed_at` null-on-empty rule, enum rejection, default sample-size cap, `max_concurrency` peak-in-flight bound, and a selective `httpx.ConnectError` that must not poison sibling probes. - Public API snapshot regenerated to add the six new symbols plus `PublisherStatus`. (See follow-up spec issue, will be linked) for the `?include=properties` extension that would enable full set-diff in the divergence detector. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 62cf427 commit f48cd04

4 files changed

Lines changed: 600 additions & 222 deletions

File tree

src/adcp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DivergenceReport,
2222
EntryErrorKind,
2323
PublisherDivergence,
24+
PublisherStatus,
2425
detect_publisher_properties_divergence,
2526
domain_matches,
2627
fetch_adagents,
@@ -832,6 +833,7 @@ def get_adcp_version() -> str:
832833
"fetch_agent_authorizations",
833834
"fetch_agent_authorizations_from_directory",
834835
"PublisherDivergence",
836+
"PublisherStatus",
835837
"validate_adagents_domain",
836838
"validate_adagents_structure",
837839
"verify_agent_authorization",

src/adcp/adagents.py

Lines changed: 152 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,41 @@
88
for sales agents to verify they are authorized for specific properties.
99
"""
1010

11+
import asyncio
1112
import ipaddress
13+
import os
1214
import re
1315
from dataclasses import dataclass, field
1416
from datetime import datetime
1517
from typing import Any, Literal
16-
from urllib.parse import urlparse
18+
from urllib.parse import quote, urlparse
1719

1820
import httpx
1921

2022
from adcp.exceptions import AdagentsNotFoundError, AdagentsTimeoutError, AdagentsValidationError
2123
from adcp.validation import ValidationError, validate_adagents
2224

23-
DiscoveryMethod = Literal["direct", "authoritative_location", "ads_txt_managerdomain"]
25+
DiscoveryMethod = Literal[
26+
"direct",
27+
"authoritative_location",
28+
"adagents_authoritative",
29+
"ads_txt_managerdomain",
30+
]
31+
32+
# AAO directory ``status`` enum per agent-publishers.json schema.
33+
PublisherStatus = Literal["authorized", "revoked"]
34+
35+
# Sentinel used so ``directory_url=`` defaults are evaluated at call time
36+
# from ``AAO_PUBLISHER_DIRECTORY_URL`` rather than baked in at import.
37+
_DIRECTORY_URL_SENTINEL: Any = object()
38+
_DEFAULT_DIRECTORY_URL = "https://agenticadvertising.org"
39+
40+
41+
def _resolve_directory_url(directory_url: Any) -> str:
42+
"""Resolve directory base URL, consulting env var when the sentinel is passed."""
43+
if directory_url is _DIRECTORY_URL_SENTINEL:
44+
return os.environ.get("AAO_PUBLISHER_DIRECTORY_URL", _DEFAULT_DIRECTORY_URL)
45+
return str(directory_url)
2446

2547

2648
# authorization_type discriminator -> required selector field, per the AdCP
@@ -1345,7 +1367,6 @@ async def fetch_agent_authorizations(
13451367
- For production use with many domains, pass a shared httpx.AsyncClient
13461368
to enable connection pooling
13471369
"""
1348-
import asyncio
13491370

13501371
# Create tasks to fetch all adagents.json files in parallel
13511372
async def fetch_authorization_for_domain(
@@ -1382,6 +1403,12 @@ async def fetch_authorization_for_domain(
13821403
# ---------------------------------------------------------------------------
13831404

13841405

1406+
_VALID_DISCOVERY_METHODS: frozenset[str] = frozenset(
1407+
{"direct", "authoritative_location", "adagents_authoritative", "ads_txt_managerdomain"}
1408+
)
1409+
_VALID_PUBLISHER_STATUSES: frozenset[str] = frozenset({"authorized", "revoked"})
1410+
1411+
13851412
@dataclass
13861413
class AgentPublisherEntry:
13871414
"""A single publisher row from the AAO directory's agent-publishers endpoint.
@@ -1393,12 +1420,12 @@ class AgentPublisherEntry:
13931420
"""
13941421

13951422
publisher_domain: str
1396-
discovery_method: str
1423+
discovery_method: DiscoveryMethod
13971424
manager_domain: str | None
13981425
properties_authorized: int
13991426
properties_total: int
14001427
signing_keys_pinned: bool
1401-
status: str
1428+
status: PublisherStatus
14021429
last_verified_at: str | None
14031430

14041431

@@ -1407,22 +1434,23 @@ class AgentDirectoryLookup:
14071434
"""Envelope returned by :func:`fetch_agent_authorizations_from_directory`.
14081435
14091436
``publishers`` is the list of publisher entries for this page.
1410-
``cursor`` is ``None`` when there are no further pages.
1411-
``total`` is set when the server returns a total count.
1437+
``next_cursor`` is ``None`` when there are no further pages.
1438+
``directory_indexed_at`` is the server's index timestamp for this response
1439+
(``None`` only permitted on empty pages per spec NULL-on-empty rule).
14121440
"""
14131441

14141442
agent_url: str
14151443
publishers: list[AgentPublisherEntry]
1416-
cursor: str | None = None
1417-
total: int | None = None
1444+
directory_indexed_at: str | None = None
1445+
next_cursor: str | None = None
14181446

14191447

14201448
async def fetch_agent_authorizations_from_directory(
14211449
agent_url: str,
1422-
directory_url: str = "https://agenticadvertising.org",
1450+
directory_url: Any = _DIRECTORY_URL_SENTINEL,
14231451
*,
14241452
since: datetime | None = None,
1425-
status: list[str] | None = None,
1453+
status: list[PublisherStatus] | None = None,
14261454
cursor: str | None = None,
14271455
limit: int = 200,
14281456
timeout: float = 30.0,
@@ -1432,7 +1460,7 @@ async def fetch_agent_authorizations_from_directory(
14321460
14331461
Inverse of :func:`fetch_agent_authorizations` — instead of pulling from
14341462
individual publisher adagents.json files, this queries the directory's
1435-
``GET /api/v1/agents/{agent_url}/publishers`` index.
1463+
``GET /v1/agents/{agent_url}/publishers`` index.
14361464
14371465
Each returned :class:`AgentPublisherEntry` carries the same
14381466
``discovery_method`` / ``manager_domain`` provenance fields as
@@ -1441,41 +1469,49 @@ async def fetch_agent_authorizations_from_directory(
14411469
14421470
Args:
14431471
agent_url: The sales agent URL to look up (``%``-encoded in the path).
1444-
directory_url: Base URL of the AAO directory
1445-
(default ``"https://agenticadvertising.org"``).
1472+
directory_url: Base URL of the AAO directory. When omitted, defaults
1473+
to the ``AAO_PUBLISHER_DIRECTORY_URL`` environment variable, or
1474+
``"https://agenticadvertising.org"`` if that is unset. The default
1475+
is resolved on every call so test harnesses can monkeypatch the
1476+
environment.
14461477
since: Only return publishers whose authorization was last verified
14471478
after this timestamp. ``None`` returns all.
14481479
status: Filter by authorization status (default ``["authorized"]``).
14491480
Pass ``["authorized", "revoked"]`` to include revoked entries.
1450-
cursor: Pagination cursor from a previous call's ``cursor`` field.
1481+
cursor: Pagination cursor from a previous call's ``next_cursor`` field.
14511482
limit: Maximum entries per page (server-side cap may be lower).
14521483
timeout: Per-request timeout in seconds.
14531484
client: Optional ``httpx.AsyncClient`` for connection pooling.
14541485
The client is **not** closed by this function.
14551486
14561487
Returns:
14571488
:class:`AgentDirectoryLookup` with ``publishers`` for this page and
1458-
a ``cursor`` for the next page (``None`` when exhausted).
1489+
a ``next_cursor`` for the next page (``None`` when exhausted).
1490+
1491+
Raises:
1492+
AdagentsValidationError: If the response envelope is malformed, a
1493+
row is missing required fields, or a row carries an enum value
1494+
outside the spec.
14591495
14601496
Example::
14611497
14621498
lookup = await fetch_agent_authorizations_from_directory(
14631499
"https://interchange.io",
14641500
)
14651501
print(f"{len(lookup.publishers)} publishers on first page")
1466-
while lookup.cursor:
1502+
while lookup.next_cursor:
14671503
lookup = await fetch_agent_authorizations_from_directory(
14681504
"https://interchange.io",
1469-
cursor=lookup.cursor,
1505+
cursor=lookup.next_cursor,
14701506
)
14711507
"""
1472-
from urllib.parse import quote
1508+
resolved_directory_url = _resolve_directory_url(directory_url)
14731509

14741510
if status is None:
14751511
status = ["authorized"]
14761512

14771513
encoded_agent = quote(agent_url, safe="")
1478-
url = f"{directory_url.rstrip('/')}/api/v1/agents/{encoded_agent}/publishers"
1514+
url = f"{resolved_directory_url.rstrip('/')}/v1/agents/{encoded_agent}/publishers"
14791515

14801516
# Build params as a list of (key, value) tuples so multi-value status
14811517
# produces repeated keys (?status=authorized&status=revoked), not a
@@ -1501,33 +1537,68 @@ async def fetch_agent_authorizations_from_directory(
15011537
if not isinstance(data, dict):
15021538
raise AdagentsValidationError(
15031539
f"Directory returned unexpected JSON type {type(data).__name__!r} "
1504-
f"for /api/v1/agents/{{agent_url}}/publishers"
1540+
f"for /v1/agents/{{agent_url}}/publishers"
1541+
)
1542+
1543+
raw_rows = data.get("publishers")
1544+
if raw_rows is None:
1545+
raise AdagentsValidationError("Directory response is missing required 'publishers' array")
1546+
if not isinstance(raw_rows, list):
1547+
raise AdagentsValidationError(
1548+
f"Directory 'publishers' must be an array, got {type(raw_rows).__name__!r}"
15051549
)
15061550

15071551
publishers: list[AgentPublisherEntry] = []
1508-
raw_rows = data.get("publishers") or data.get("results") or []
1509-
for row in raw_rows:
1510-
domain = row.get("publisher_domain", "")
1511-
if not domain:
1512-
continue # skip malformed rows missing the required field
1552+
for index, row in enumerate(raw_rows):
1553+
if not isinstance(row, dict):
1554+
raise AdagentsValidationError(f"publishers[{index}] is not a JSON object")
1555+
domain = row.get("publisher_domain")
1556+
if not isinstance(domain, str) or not domain:
1557+
raise AdagentsValidationError(
1558+
f"publishers[{index}] is missing required 'publisher_domain'"
1559+
)
1560+
1561+
discovery_method = row.get("discovery_method")
1562+
if discovery_method not in _VALID_DISCOVERY_METHODS:
1563+
raise AdagentsValidationError(
1564+
f"publishers[{index}] has invalid discovery_method "
1565+
f"{discovery_method!r} (expected one of: "
1566+
f"{', '.join(sorted(_VALID_DISCOVERY_METHODS))})"
1567+
)
1568+
1569+
row_status = row.get("status")
1570+
if row_status not in _VALID_PUBLISHER_STATUSES:
1571+
raise AdagentsValidationError(
1572+
f"publishers[{index}] has invalid status {row_status!r} "
1573+
f"(expected one of: {', '.join(sorted(_VALID_PUBLISHER_STATUSES))})"
1574+
)
1575+
15131576
publishers.append(
15141577
AgentPublisherEntry(
15151578
publisher_domain=domain,
1516-
discovery_method=row.get("discovery_method", "adagents_authoritative"),
1579+
discovery_method=discovery_method,
15171580
manager_domain=row.get("manager_domain"),
15181581
properties_authorized=row.get("properties_authorized", 0),
15191582
properties_total=row.get("properties_total", 0),
15201583
signing_keys_pinned=bool(row.get("signing_keys_pinned", False)),
1521-
status=row.get("status", "authorized"),
1584+
status=row_status,
15221585
last_verified_at=row.get("last_verified_at"),
15231586
)
15241587
)
15251588

1589+
directory_indexed_at = data.get("directory_indexed_at")
1590+
# Per spec, directory_indexed_at is required except on empty pages where
1591+
# it may be NULL. Tolerate missing only when the page is empty.
1592+
if directory_indexed_at is None and publishers:
1593+
raise AdagentsValidationError(
1594+
"Directory response is missing required 'directory_indexed_at' " "on a non-empty page"
1595+
)
1596+
15261597
return AgentDirectoryLookup(
15271598
agent_url=agent_url,
15281599
publishers=publishers,
1529-
cursor=data.get("cursor") or data.get("next_cursor"),
1530-
total=data.get("total"),
1600+
directory_indexed_at=directory_indexed_at,
1601+
next_cursor=data.get("next_cursor"),
15311602
)
15321603

15331604

@@ -1574,9 +1645,10 @@ class PublisherDivergence:
15741645

15751646
async def detect_publisher_properties_divergence(
15761647
agent_url: str,
1577-
directory_url: str = "https://agenticadvertising.org",
1648+
directory_url: Any = _DIRECTORY_URL_SENTINEL,
15781649
*,
1579-
sample_size: int | None = None,
1650+
sample_size: int | None = 200,
1651+
max_concurrency: int = 20,
15801652
timeout: float = 30.0,
15811653
client: httpx.AsyncClient | None = None,
15821654
) -> DivergenceReport:
@@ -1603,18 +1675,24 @@ async def detect_publisher_properties_divergence(
16031675
directory endpoint will enable full set-diff once that parameter is
16041676
deployed.
16051677
1606-
**Cost warning — ``sample_size`` is mandatory for large networks.**
1678+
**Cost — ``sample_size`` defaults to 200; opt in to a full sweep.**
16071679
Running a full sweep against cafemedia's ~6,800 child publishers launches
1608-
~6,800 concurrent HTTP fetches. With a 30 s timeout each, total wall-clock
1609-
is bounded by the slowest fetch, but server-side rate limits may apply.
1610-
Pass ``sample_size=N`` to cap the sweep; the sample is taken from the
1611-
first page of directory results.
1680+
thousands of HTTP fetches. To guard against accidental fan-out the default
1681+
caps at 200; pass ``sample_size=None`` to scan every publisher the
1682+
directory returns. ``max_concurrency`` (default 20) bounds the number of
1683+
in-flight child fetches via an internal ``asyncio.Semaphore`` regardless
1684+
of ``sample_size``.
16121685
16131686
Args:
16141687
agent_url: The agent URL to check authorizations for.
1615-
directory_url: AAO directory base URL.
1616-
sample_size: Maximum number of publisher domains to probe. ``None``
1617-
sweeps all pages (full network — may be very slow).
1688+
directory_url: AAO directory base URL. Defaults to the
1689+
``AAO_PUBLISHER_DIRECTORY_URL`` env var or
1690+
``"https://agenticadvertising.org"``.
1691+
sample_size: Maximum number of publisher domains to probe (default
1692+
``200``). Pass ``None`` to opt into a full sweep across every
1693+
page of directory results.
1694+
max_concurrency: Maximum number of concurrent child adagents.json
1695+
fetches. Defaults to 20.
16181696
timeout: Per-request timeout for both directory and child fetches.
16191697
client: Optional shared ``httpx.AsyncClient``.
16201698
@@ -1643,7 +1721,10 @@ async def detect_publisher_properties_divergence(
16431721
f"(dir={entry.directory_properties_authorized}, "
16441722
f"federated={entry.federated_properties_found})")
16451723
"""
1646-
import asyncio
1724+
if max_concurrency < 1:
1725+
raise ValueError("max_concurrency must be at least 1")
1726+
1727+
resolved_directory_url = _resolve_directory_url(directory_url)
16471728

16481729
own_client = client is None
16491730
http = client or httpx.AsyncClient()
@@ -1655,7 +1736,7 @@ async def detect_publisher_properties_divergence(
16551736
while True:
16561737
page = await fetch_agent_authorizations_from_directory(
16571738
agent_url,
1658-
directory_url=directory_url,
1739+
directory_url=resolved_directory_url,
16591740
cursor=page_cursor,
16601741
timeout=timeout,
16611742
client=http,
@@ -1664,31 +1745,39 @@ async def detect_publisher_properties_divergence(
16641745
if sample_size is not None and len(all_entries) >= sample_size:
16651746
all_entries = all_entries[:sample_size]
16661747
break
1667-
page_cursor = page.cursor
1748+
page_cursor = page.next_cursor
16681749
if not page_cursor:
16691750
break
16701751

1752+
semaphore = asyncio.Semaphore(max_concurrency)
1753+
16711754
async def _probe(entry: AgentPublisherEntry) -> PublisherDivergence | None:
1672-
try:
1673-
data = await fetch_adagents(
1674-
entry.publisher_domain, timeout=timeout, client=http
1675-
)
1676-
federated_props = get_properties_by_agent(data, agent_url)
1677-
federated_ids = {
1678-
p.get("property_id")
1679-
for p in federated_props
1680-
if p.get("property_id")
1681-
}
1682-
except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError) as exc:
1683-
return PublisherDivergence(
1684-
publisher_domain=entry.publisher_domain,
1685-
directory_properties_authorized=entry.properties_authorized,
1686-
federated_properties_found=0,
1687-
# None = count-only mode; IDs unavailable from directory
1688-
missing_in_inline=None,
1689-
missing_in_federated=None,
1690-
child_fetch_error=str(exc),
1691-
)
1755+
async with semaphore:
1756+
try:
1757+
data = await fetch_adagents(
1758+
entry.publisher_domain, timeout=timeout, client=http
1759+
)
1760+
federated_props = get_properties_by_agent(data, agent_url)
1761+
federated_ids = {
1762+
p.get("property_id") for p in federated_props if p.get("property_id")
1763+
}
1764+
except (
1765+
AdagentsNotFoundError,
1766+
AdagentsValidationError,
1767+
AdagentsTimeoutError,
1768+
httpx.HTTPError,
1769+
OSError,
1770+
ValueError,
1771+
) as exc:
1772+
return PublisherDivergence(
1773+
publisher_domain=entry.publisher_domain,
1774+
directory_properties_authorized=entry.properties_authorized,
1775+
federated_properties_found=0,
1776+
# None = count-only mode; IDs unavailable from directory
1777+
missing_in_inline=None,
1778+
missing_in_federated=None,
1779+
child_fetch_error=str(exc) or type(exc).__name__,
1780+
)
16921781

16931782
fed_count = len(federated_ids)
16941783
# Count-only comparison: directory does not currently return

0 commit comments

Comments
 (0)