Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/adcp/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from __future__ import annotations

from adcp.testing.decisioning import build_asgi_app, make_request_context
from adcp.testing.decisioning import build_asgi_app, build_test_client, make_request_context
from adcp.testing.test_helpers import (
CREATIVE_AGENT_CONFIG,
TEST_AGENT_A2A_CONFIG,
Expand All @@ -39,6 +39,7 @@

__all__ = [
"build_asgi_app",
"build_test_client",
"make_request_context",
"test_agent",
"test_agent_a2a",
Expand Down
110 changes: 107 additions & 3 deletions src/adcp/testing/decisioning.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Test helpers for the v6 DecisioningPlatform framework.

Two adopter-facing helpers that close gaps surfaced by the salesagent
Three adopter-facing helpers that close gaps surfaced by the salesagent
v3.12 → 4.x migration:

* :func:`make_request_context` — build a
Expand All @@ -16,18 +16,29 @@
default ``auto_emit_completion_webhooks=False`` skips the F12 boot
gate that otherwise refuses to start a sales platform without a
webhook sender wired.

* :func:`build_test_client` — async context manager that combines
:func:`build_asgi_app`, ``asgi_lifespan.LifespanManager``, and
``httpx.AsyncClient`` into a single ``async with`` block. Requires
``asgi-lifespan`` (included in ``adcp[dev]``).
"""

from __future__ import annotations

import asyncio
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from adcp.decisioning.context import RequestContext
from adcp.decisioning.types import Account

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Mapping, Sequence
from datetime import datetime

import httpx

from adcp.decisioning import (
AuthInfo,
BuyerAgent,
Expand Down Expand Up @@ -124,6 +135,7 @@ def build_asgi_app(
name: str | None = None,
advertise_all: bool = False,
auto_emit_completion_webhooks: bool = False,
allowed_hosts: Sequence[str] | None = None,
**factory_kwargs: Any,
) -> Any:
"""Build a Starlette ASGI app for in-process integration tests.
Expand All @@ -150,6 +162,12 @@ def build_asgi_app(
:param auto_emit_completion_webhooks: Forwarded to
:func:`create_adcp_server_from_platform`. Default ``False`` for
test ergonomics — production :func:`serve` defaults to ``True``.
:param allowed_hosts: Host header values the MCP transport-security
layer will accept. ``None`` → FastMCP's loopback-only default
(``localhost``, ``127.0.0.1``, ``[::1]``). Pass the hostname
embedded in your ``base_url`` when using a non-loopback test
address (e.g. ``["test"]`` for ``base_url="http://test"``).
:func:`build_test_client` sets this automatically.
:param factory_kwargs: Forwarded to
:func:`create_adcp_server_from_platform` (executor, registry,
webhook_sender, etc.).
Expand All @@ -168,8 +186,94 @@ def build_asgi_app(
**factory_kwargs,
)
server_name = name or type(platform).__name__
mcp = create_mcp_server(handler, name=server_name, advertise_all=advertise_all)
mcp = create_mcp_server(
handler,
name=server_name,
advertise_all=advertise_all,
allowed_hosts=allowed_hosts,
)
return mcp.streamable_http_app()


__all__ = ["build_asgi_app", "make_request_context"]
@asynccontextmanager
async def build_test_client(
platform: DecisioningPlatform,
*,
base_url: str = "http://test",
name: str | None = None,
advertise_all: bool = False,
auto_emit_completion_webhooks: bool = False,
follow_redirects: bool = True,
headers: Mapping[str, str] | None = None,
**factory_kwargs: Any,
) -> AsyncIterator[httpx.AsyncClient]:
"""Async context manager yielding an ``httpx.AsyncClient`` wired against
the platform's ASGI app via ``httpx.ASGITransport`` + ``LifespanManager``.

Collapses the four-line boilerplate that every in-process integration test
previously needed — ``build_asgi_app`` + ``LifespanManager`` +
``httpx.AsyncClient`` — into a single ``async with`` block::

async with build_test_client(platform) as client:
resp = await client.post("/mcp/", json=...)

The context manager starts the ASGI lifespan on entry and shuts down both
the client and the lifespan manager on exit. ``build_test_client(...)``
itself is an ``AbstractAsyncContextManager[httpx.AsyncClient]``; the
yielded object is a plain ``httpx.AsyncClient``.

Requires ``asgi-lifespan`` (included in ``adcp[dev]``). Raises
:class:`ImportError` with an actionable message if it is not installed.

:param platform: The :class:`DecisioningPlatform` instance under test.
:param base_url: Base URL for all requests. Default ``"http://test"``.
The hostname is extracted and added to the transport-security
``allowed_hosts`` list automatically — no manual wiring needed.
:param name: Server name forwarded to :func:`build_asgi_app`.
:param advertise_all: Forwarded to :func:`build_asgi_app`.
:param auto_emit_completion_webhooks: Forwarded to :func:`build_asgi_app`.
:param follow_redirects: Forwarded to ``httpx.AsyncClient``. Default
``True`` — FastMCP's streamable-HTTP endpoint can issue a 307
redirect (``/mcp`` → ``/mcp/``) and callers shouldn't have to
handle it manually.
:param headers: Default headers attached to every request. Useful for
auth tests: ``headers={"x-adcp-auth": "tok_..."}``. ``None`` →
no default headers.
:param factory_kwargs: Forwarded to
:func:`create_adcp_server_from_platform` via :func:`build_asgi_app`
(executor, registry, webhook_sender, etc.).
"""
try:
from asgi_lifespan import LifespanManager
except ImportError as exc:
raise ImportError(
"asgi-lifespan is required for build_test_client. "
"Install it with: pip install 'adcp[dev]'"
) from exc

import httpx as _httpx

hostname = urlparse(base_url).hostname or "localhost"
# validate_capabilities_response_shape (called by create_adcp_server_from_platform)
# uses asyncio.run(), which raises if a loop is already running. Run the sync
# builder in a thread so it gets a clean loop.
app = await asyncio.to_thread(
build_asgi_app,
platform,
name=name,
advertise_all=advertise_all,
auto_emit_completion_webhooks=auto_emit_completion_webhooks,
allowed_hosts=[hostname],
**factory_kwargs,
)
async with LifespanManager(app):
async with _httpx.AsyncClient(
transport=_httpx.ASGITransport(app=app),
base_url=base_url,
headers=headers,
follow_redirects=follow_redirects,
) as client:
yield client


__all__ = ["build_asgi_app", "build_test_client", "make_request_context"]
98 changes: 97 additions & 1 deletion tests/test_testing_decisioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from datetime import datetime, timezone

import httpx
import pytest

from adcp.decisioning import (
Expand All @@ -13,7 +14,7 @@
)
from adcp.decisioning.context import RequestContext
from adcp.decisioning.types import Account
from adcp.testing import build_asgi_app, make_request_context
from adcp.testing import build_asgi_app, build_test_client, make_request_context

# ---- make_request_context ----

Expand Down Expand Up @@ -179,3 +180,98 @@ class _BrokenPlatform(DecisioningPlatform):

with pytest.raises(AdcpError):
build_asgi_app(_BrokenPlatform())


# ---- build_asgi_app: allowed_hosts ----


def test_build_asgi_app_forwards_allowed_hosts() -> None:
"""``allowed_hosts=`` reaches ``create_mcp_server`` — construction
succeeds and the app is a callable."""
platform = _SalesPlatformWithMethods()
app = build_asgi_app(platform, allowed_hosts=["test"])
assert callable(app)


# ---- build_test_client ----


async def test_build_test_client_yields_httpx_async_client() -> None:
"""The context manager yields an ``httpx.AsyncClient`` instance."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform) as client:
assert isinstance(client, httpx.AsyncClient)


async def test_build_test_client_default_base_url() -> None:
"""Default ``base_url="http://test"`` is used when not overridden."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform) as client:
assert str(client.base_url) == "http://test"


async def test_build_test_client_custom_base_url() -> None:
"""``base_url`` override is forwarded to the client."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform, base_url="http://localhost") as client:
assert str(client.base_url) == "http://localhost"


async def test_build_test_client_can_make_request() -> None:
"""The yielded client can actually reach the mounted MCP endpoint."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform) as client:
resp = await client.post(
"/mcp/",
json={
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": {"name": "test", "version": "1.0"},
},
},
headers={
"content-type": "application/json",
"accept": "application/json, text/event-stream",
},
)
assert resp.status_code == 200


async def test_build_test_client_headers_kwarg() -> None:
"""Default ``headers=`` are attached to the client — not silently dropped."""
platform = _SalesPlatformWithMethods()
async with build_test_client(
platform, headers={"x-custom": "value"}
) as client:
assert "x-custom" in dict(client.headers)


async def test_build_test_client_follow_redirects_default_true() -> None:
"""``follow_redirects`` defaults to ``True`` on the yielded client."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform) as client:
assert client.follow_redirects is True


async def test_build_test_client_follow_redirects_override() -> None:
"""``follow_redirects=False`` is respected."""
platform = _SalesPlatformWithMethods()
async with build_test_client(platform, follow_redirects=False) as client:
assert client.follow_redirects is False


def test_build_test_client_raises_import_error_without_asgi_lifespan() -> None:
"""Missing ``asgi-lifespan`` raises ``ImportError`` with an actionable message."""
import sys
import unittest.mock

platform = _SalesPlatformWithMethods()
with unittest.mock.patch.dict(sys.modules, {"asgi_lifespan": None}):
with pytest.raises(ImportError, match="asgi-lifespan is required"):
import asyncio

asyncio.run(build_test_client(platform).__aenter__())
Loading