Skip to content

Commit 1123458

Browse files
bokelleyclaude
andauthored
feat(server/idempotency): complete PgBackend for multi-worker durable replay (#555)
* feat(server/idempotency): complete PgBackend for multi-worker durable replay Replaces the NotImplementedError scaffold with a real psycopg3-async-pool implementation. Multi-worker AdCP sellers can now declare IdempotencySupported(supported=True) and actually dedupe across workers. * PgBackend(pool=...) — async pool, per-call connection acquisition. * await create_schema() — idempotent CREATE TABLE IF NOT EXISTS + CREATE INDEX. JSONB response, COLLATE "C" on text PK columns to prevent locale-driven cross-tenant collisions. * ON CONFLICT (scope_key, key) DO UPDATE … WHERE existing.expires_at <= now() is FIRST-WRITER-WINS under concurrent put — concurrent writers racing the same fresh slot cannot violate the cache invariant "same (scope, key) → same payload_hash". * Naive datetime from expires_at raises ValueError with a schema-drift hint instead of silent UTC coercion (fail-fast policy). * Validates table_name through the same ASCII identifier regex as the other PG backends. * Optional adcp[pg] extra; ImportError with install hint when missing. Security hardening alongside the backend: * IdempotencyStore._extract_scope_key rejects tenant_id / caller_identity values containing the U+001E scope separator. Without this, tenant="A\\x1eB" + principal="X" would collide with tenant="A" + principal="B\\x1eX" — multi-tenant isolation defeated. * Log lines use _scope_log_id(scope_key) — sha256-truncated identifier, not the raw principal id. PII / commercial identity data no longer lands in centralized log sinks verbatim. Docstring caveats covered: atomicity (v1 separate-tx commit; non- idempotent handler side effects need handler-level dedup or co-tx follow-on); schema bootstrap (CREATE TABLE IF NOT EXISTS no-ops on pre-existing wrong-shape tables — Alembic-first deployments must copy the DDL verbatim); JSON-safe contract; per-principal rate limiting at the auth tier and delete_expired sweeps. Tests: * 15 unit tests with mocked psycopg pool. * 9 conformance tests against real Postgres (skipped without ADCP_PG_TEST_URL): first-writer-wins concurrent put, expired-row overwrite, end-to-end IdempotencyStore replay, scope_key isolation. * 4 scope-separator-rejection unit tests. * CI workflow extended to run the new conformance suite. Closes #548. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(server/idempotency): fix conformance test for first-writer-wins put The previous test_put_overwrites_via_on_conflict asserted last-writer-wins behavior. The new ON CONFLICT … WHERE expires_at <= now() guard makes a sequential put against a non-expired slot a no-op. Update the test name + assertion to match the contract; the underlying SQL is correct. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6cf71cb commit 1123458

7 files changed

Lines changed: 868 additions & 65 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ jobs:
8484
pytest tests/conformance/signing/test_pg_replay_store.py \
8585
tests/conformance/signing/test_pg_replay_store_e2e.py \
8686
tests/conformance/decisioning/test_pg_buyer_agent_registry.py \
87+
tests/conformance/decisioning/test_pg_idempotency_backend.py \
8788
-v
8889
8990
conventional-commits:

src/adcp/server/idempotency/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ async def get_adcp_capabilities(self, params, context=None):
4747
4848
- :class:`MemoryBackend` — in-process dict with TTL; use for tests and
4949
single-process reference implementations.
50-
- :class:`PgBackend` — scaffold for a SQLAlchemy/asyncpg-backed store that can
51-
commit cache writes atomically with business writes. Implementation arrives
52-
in a follow-up PR.
50+
- :class:`PgBackend` — Postgres-backed store for multi-worker durable
51+
replay. Requires the ``adcp[pg]`` extra. ``await
52+
backend.create_schema()`` once at boot; commits go through a fresh
53+
pool connection (separate from the handler's transaction in v1 —
54+
co-tx wiring is a v1.1 affordance).
5355
"""
5456

5557
from adcp.server.idempotency.backends import (

src/adcp/server/idempotency/backends.py

Lines changed: 244 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
1. Retrieve a cached response by ``(principal_id, idempotency_key)``, honoring
66
the seller's replay TTL.
77
2. Atomically commit ``(payload_hash, response)`` on a fresh key. Atomicity
8-
with the handler's business writes is the backend's choice — :class:`MemoryBackend`
9-
makes no such guarantee; :class:`PgBackend` (follow-up) will when the handler
10-
uses the same engine.
8+
with the handler's business writes is the backend's choice —
9+
:class:`MemoryBackend` makes no such guarantee; :class:`PgBackend` shares
10+
a connection pool so adopters with the same Postgres can compose their
11+
handler's transaction with the cache write (v1 commits in a separate
12+
pool connection — co-tx wiring is a v1.1 affordance).
1113
1214
Backends expose async methods. The in-process :class:`MemoryBackend` is
1315
synchronous under the hood but wrapped in ``async`` signatures so the store
@@ -17,12 +19,44 @@
1719
from __future__ import annotations
1820

1921
import asyncio
22+
import json
23+
import re
2024
import time
2125
from abc import ABC, abstractmethod
2226
from collections.abc import Callable
2327
from dataclasses import dataclass
28+
from datetime import datetime, timezone
2429
from typing import Any
2530

31+
try:
32+
import psycopg # noqa: F401
33+
import psycopg_pool # noqa: F401
34+
35+
_PG_AVAILABLE = True
36+
except ImportError:
37+
_PG_AVAILABLE = False
38+
39+
_PG_INSTALL_HINT = (
40+
"PgBackend requires psycopg3 and psycopg-pool. "
41+
"Install the 'pg' extra: `pip install 'adcp[pg]'`."
42+
)
43+
44+
# Byte-level ASCII identifier guard — same rationale as PgReplayStore /
45+
# PgWebhookDeliverySupervisor. ``str.islower()`` accepts non-ASCII Unicode
46+
# letters which would format verbatim into SQL as a different table than
47+
# configured.
48+
_SAFE_IDENTIFIER_RE = re.compile(r"^[a-z_][a-z0-9_]{0,62}$")
49+
50+
DEFAULT_IDEMPOTENCY_TABLE = "adcp_idempotency"
51+
52+
53+
def _safe_identifier(name: str) -> str:
54+
if not _SAFE_IDENTIFIER_RE.fullmatch(name):
55+
raise ValueError(
56+
f"Table name must match [a-z_][a-z0-9_]{{0,62}} (ASCII only), got {name!r}"
57+
)
58+
return name
59+
2660

2761
@dataclass(frozen=True)
2862
class CachedResponse:
@@ -150,72 +184,227 @@ async def _size(self) -> int:
150184

151185

152186
class PgBackend(IdempotencyBackend):
153-
"""PostgreSQL-backed store — **scaffold, not yet implemented**.
154-
155-
.. warning::
156-
Calling ``PgBackend(...)`` raises ``NotImplementedError`` today. Use
157-
:class:`MemoryBackend` for tests, or implement your own
158-
:class:`IdempotencyBackend` subclass against the database of your
159-
choice until this implementation lands. Tracked at
160-
https://github.com/adcontextprotocol/adcp-client-python/issues/182.
161-
162-
**Design intent.** Share a transaction with the handler's business
163-
writes so the cache entry commits atomically with side effects. Without
164-
that, a crash between ``handler success`` and ``cache commit`` causes
165-
the retry to re-execute the handler, duplicating side effects.
166-
167-
**Schema sketch for the implementer.**
187+
"""PostgreSQL-backed :class:`IdempotencyBackend`.
188+
189+
Multi-worker durable replay cache. Adopters running ≥2 processes wire
190+
this in place of :class:`MemoryBackend` so a retry that lands on a
191+
different worker still replays the cached response.
192+
193+
Example::
194+
195+
from psycopg_pool import AsyncConnectionPool
196+
from adcp.server.idempotency import IdempotencyStore, PgBackend
197+
198+
pool = AsyncConnectionPool("postgresql://...", min_size=2, max_size=10)
199+
backend = PgBackend(pool=pool)
200+
await backend.create_schema() # idempotent; safe to call on every boot
201+
202+
store = IdempotencyStore(backend=backend, ttl_seconds=86400)
203+
204+
**Atomicity caveat (v1).** ``put`` commits on a fresh pool connection —
205+
the cache write is NOT in the same transaction as the handler's
206+
business writes. A crash between handler success and cache commit
207+
leaves the slot empty; the next retry re-executes the handler.
208+
Idempotent handlers absorb this without harm. **Handlers with
209+
non-idempotent side effects** (e.g., ``INSERT INTO media_buys``
210+
without a unique constraint on the buyer's idempotency_key) need
211+
either: (a) handler-level dedupe via a database unique constraint
212+
that maps to the same key the SDK uses, or (b) the co-tx variant
213+
once it ships. Co-tx — handler passes its own connection so the
214+
cache write commits atomically with side effects — is planned as a
215+
follow-on enhancement.
216+
217+
**Schema bootstrap caveat.** :meth:`create_schema` uses
218+
``CREATE TABLE IF NOT EXISTS`` — if a table with the same name but
219+
a different shape already exists (Alembic migration drift, manual
220+
DDL with ``response JSON`` instead of ``JSONB``, missing
221+
``COLLATE "C"``), this method is a no-op and the backend will run
222+
against the wrong column types. If you manage the schema with
223+
Alembic / dbmate, copy the DDL inside :meth:`create_schema`
224+
verbatim into a migration revision — keep ``COLLATE "C"`` and
225+
``JSONB`` identical — and skip calling :meth:`create_schema` at
226+
boot.
227+
228+
**Response payload contract.** :attr:`CachedResponse.response` is
229+
serialized via ``json.dumps`` for the JSONB column. Values must be
230+
JSON-safe — no ``datetime``, ``Decimal``, ``set``, or ``bytes``.
231+
Coerce in your handler before returning.
232+
233+
**Cardinality / DoS.** This backend has no row cap; only TTL
234+
bounds the table size. Per AdCP spec, per-principal rate limiting
235+
at the auth tier is required — the backend trusts that. Schedule
236+
:meth:`delete_expired` as a cron / pg_cron / app-loop sweep
237+
(``get`` self-filters expired rows, but they accumulate on disk
238+
until something deletes them).
239+
240+
**Schema.** Created idempotently by :meth:`create_schema`:
168241
169242
.. code-block:: sql
170243
171-
CREATE TABLE adcp_idempotency (
172-
scope_key TEXT COLLATE "C" NOT NULL,
173-
key TEXT COLLATE "C" NOT NULL,
174-
payload_hash TEXT NOT NULL,
175-
response JSONB NOT NULL,
244+
CREATE TABLE IF NOT EXISTS adcp_idempotency (
245+
scope_key TEXT COLLATE "C" NOT NULL,
246+
key TEXT COLLATE "C" NOT NULL,
247+
payload_hash TEXT NOT NULL,
248+
response JSONB NOT NULL,
176249
expires_at TIMESTAMPTZ NOT NULL,
177250
PRIMARY KEY (scope_key, key)
178251
);
179-
180-
Notes:
181-
182-
* ``COLLATE "C"`` (or ``CITEXT`` with a deliberate case policy) — avoid
183-
the default locale collation on the identifier columns. On some
184-
locales ``Principal-A`` and ``principal-a`` compare equal, which
185-
would collapse distinct tenants into the same cache slot.
186-
* ``scope_key`` is already composed from ``(tenant_id, caller_identity)``
187-
by the store — Postgres sees it as an opaque string. Queries MUST
188-
still filter on ``scope_key`` in the ``WHERE`` clause even with the
189-
composite PK — row-level security (RLS) enforced via a policy like
190-
``USING (scope_key = current_setting('adcp.scope_key')::text)`` gives
191-
belt-and-suspenders protection against accidental cross-tenant reads
192-
in future handlers.
193-
* ``get`` uses ``SELECT ... WHERE expires_at > now()``.
194-
* ``put`` uses ``INSERT ... ON CONFLICT (scope_key, key) DO UPDATE``.
195-
* Accept a SQLAlchemy/asyncpg session factory so the caller can thread
196-
the handler's transaction through for atomic commit — the atomicity
197-
guarantee is the whole reason to use a SQL backend.
252+
CREATE INDEX IF NOT EXISTS adcp_idempotency_expires_idx
253+
ON adcp_idempotency (expires_at);
254+
255+
``COLLATE "C"`` on identifier columns avoids locale-driven equivalence
256+
(``Principal-A`` ≡ ``principal-a`` under Turkish/locale-aware
257+
collations) collapsing distinct tenants into the same cache slot.
258+
259+
:param pool: ``psycopg_pool.AsyncConnectionPool`` owned by the caller.
260+
Each operation acquires a short-lived connection. We don't open,
261+
own, or close the pool.
262+
:param table_name: Override the default table name. Useful for
263+
multi-tenant schema scoping. Default ``adcp_idempotency``.
264+
265+
:raises ImportError: when psycopg/psycopg-pool are not installed.
266+
Install via the ``pg`` extra: ``pip install 'adcp[pg]'``.
267+
:raises ValueError: when ``table_name`` is not a safe ASCII
268+
identifier (``[a-z_][a-z0-9_]{0,62}``).
198269
"""
199270

200-
def __init__(self, *args: Any, **kwargs: Any) -> None:
201-
raise NotImplementedError(
202-
"PgBackend is scaffolded but not yet implemented. Use MemoryBackend "
203-
"for tests, or implement your own IdempotencyBackend subclass "
204-
"against your database of choice until the PgBackend implementation "
205-
"lands. Tracking: "
206-
"https://github.com/adcontextprotocol/adcp-client-python/issues/182."
271+
def __init__(
272+
self,
273+
*,
274+
pool: Any, # psycopg_pool.AsyncConnectionPool — Any avoids runtime psycopg import
275+
table_name: str = DEFAULT_IDEMPOTENCY_TABLE,
276+
) -> None:
277+
if not _PG_AVAILABLE:
278+
raise ImportError(_PG_INSTALL_HINT)
279+
self._pool = pool
280+
self._table = _safe_identifier(table_name)
281+
282+
# Pre-format SQL once. Validated identifier so f-string interpolation
283+
# is byte-safe; values always go through %s parameterization. Same
284+
# convention as PgWebhookDeliverySupervisor / PgReplayStore.
285+
t = self._table
286+
self._sql_get = (
287+
f"SELECT payload_hash, response, expires_at " # noqa: S608
288+
f"FROM {t} WHERE scope_key = %s AND key = %s AND expires_at > now()"
207289
)
290+
# First-writer-wins under concurrent put. The store's pre-check
291+
# ("slot is empty or expired") is NOT a lock — two workers can
292+
# both see an empty slot and race into put. With a naive
293+
# last-writer-wins ON CONFLICT, the second put would overwrite
294+
# the first's payload_hash, violating the cache invariant
295+
# "same (scope, key) → same hash". The WHERE on the UPDATE
296+
# arm restricts the overwrite to actually-expired rows: a
297+
# concurrent fresh write becomes a no-op, both callers
298+
# observe an equivalent cached entry from the first writer.
299+
self._sql_put = (
300+
f"INSERT INTO {t} " # noqa: S608
301+
f"(scope_key, key, payload_hash, response, expires_at) "
302+
f"VALUES (%s, %s, %s, %s::jsonb, %s) "
303+
f"ON CONFLICT (scope_key, key) DO UPDATE SET "
304+
f" payload_hash = EXCLUDED.payload_hash, "
305+
f" response = EXCLUDED.response, "
306+
f" expires_at = EXCLUDED.expires_at "
307+
f"WHERE {t}.expires_at <= now()"
308+
)
309+
self._sql_delete_expired = f"DELETE FROM {t} WHERE expires_at <= %s" # noqa: S608
310+
311+
async def create_schema(self) -> None:
312+
"""Bootstrap the table + index. Idempotent.
313+
314+
Safe to call on every app boot. Each DDL statement is executed
315+
separately — psycopg does not split on ``;``.
316+
"""
317+
t = self._table
318+
statements = [
319+
f"""CREATE TABLE IF NOT EXISTS {t} (
320+
scope_key TEXT COLLATE "C" NOT NULL,
321+
key TEXT COLLATE "C" NOT NULL,
322+
payload_hash TEXT NOT NULL,
323+
response JSONB NOT NULL,
324+
expires_at TIMESTAMPTZ NOT NULL,
325+
PRIMARY KEY (scope_key, key)
326+
)""",
327+
# Partial-free expiry index — cheap eviction sweep.
328+
f"""CREATE INDEX IF NOT EXISTS {t}_expires_idx
329+
ON {t} (expires_at)""",
330+
]
331+
async with self._pool.connection() as conn:
332+
for stmt in statements:
333+
await conn.execute(stmt)
208334

209-
async def get(self, scope_key: str, key: str) -> CachedResponse | None: # pragma: no cover
210-
raise NotImplementedError
335+
async def get(self, scope_key: str, key: str) -> CachedResponse | None:
336+
"""Read the cached entry, filtering expired rows in the WHERE clause.
337+
338+
Lazy expiry — expired rows stay on disk until ``delete_expired``
339+
sweeps them. ``get`` self-filters via ``expires_at > now()`` so a
340+
stale row never replays.
341+
"""
342+
async with self._pool.connection() as conn:
343+
cur = await conn.execute(self._sql_get, (scope_key, key))
344+
row = await cur.fetchone()
345+
if row is None:
346+
return None
347+
payload_hash, response, expires_at = row
348+
return CachedResponse(
349+
payload_hash=payload_hash,
350+
response=response if isinstance(response, dict) else json.loads(response),
351+
expires_at_epoch=_to_epoch(expires_at),
352+
)
211353

212354
async def put(
213355
self,
214356
scope_key: str,
215357
key: str,
216358
entry: CachedResponse,
217-
) -> None: # pragma: no cover
218-
raise NotImplementedError
359+
) -> None:
360+
"""Atomic upsert under ``(scope_key, key)``.
361+
362+
``ON CONFLICT DO UPDATE`` because the store only calls ``put``
363+
after verifying the slot is empty or expired — an overwrite in
364+
that window is a legitimate retry of the write itself.
365+
"""
366+
expires_at_dt = datetime.fromtimestamp(entry.expires_at_epoch, tz=timezone.utc)
367+
async with self._pool.connection() as conn:
368+
await conn.execute(
369+
self._sql_put,
370+
(
371+
scope_key,
372+
key,
373+
entry.payload_hash,
374+
json.dumps(entry.response),
375+
expires_at_dt,
376+
),
377+
)
219378

220-
async def delete_expired(self, now_epoch: float | None = None) -> int: # pragma: no cover
221-
raise NotImplementedError
379+
async def delete_expired(self, now_epoch: float | None = None) -> int:
380+
"""Best-effort sweep of expired entries. Returns rows removed."""
381+
cutoff = now_epoch if now_epoch is not None else time.time()
382+
cutoff_dt = datetime.fromtimestamp(cutoff, tz=timezone.utc)
383+
async with self._pool.connection() as conn:
384+
cur = await conn.execute(self._sql_delete_expired, (cutoff_dt,))
385+
return cur.rowcount or 0
386+
387+
388+
def _to_epoch(dt: Any) -> float:
389+
"""Convert a psycopg-returned ``TIMESTAMPTZ`` to epoch seconds.
390+
391+
psycopg returns ``datetime`` for ``TIMESTAMPTZ`` columns. A
392+
tz-naive datetime here means schema drift — adopters managing the
393+
schema with Alembic / dbmate may have created the column as
394+
``TIMESTAMP WITHOUT TIME ZONE`` instead of ``TIMESTAMPTZ``. Per
395+
project fail-fast policy, raise rather than silently coerce —
396+
silent UTC defaults will produce wrong replay windows when the
397+
server's local time is not UTC.
398+
"""
399+
if not isinstance(dt, datetime):
400+
return float(dt)
401+
if dt.tzinfo is None:
402+
raise ValueError(
403+
"PgBackend received a naive datetime from expires_at. "
404+
"This usually means the column was created as "
405+
"TIMESTAMP WITHOUT TIME ZONE instead of TIMESTAMPTZ — "
406+
"adopter Alembic migration drift from the SDK schema. "
407+
"Recreate the column as TIMESTAMPTZ (see "
408+
"PgBackend.create_schema for the canonical DDL)."
409+
)
410+
return float(dt.timestamp())

0 commit comments

Comments
 (0)