feat(server/idempotency): complete PgBackend for multi-worker durable replay#555
Merged
Conversation
… replay Replaces the NotImplementedError scaffold with a real psycopg3-async-pool implementation. Multi-worker AdCP sellers can now declare IdempotencySupported(supported=True) and actually dedupe across workers. * PgBackend(pool=...) — async pool, per-call connection acquisition. * await create_schema() — idempotent CREATE TABLE IF NOT EXISTS + CREATE INDEX. JSONB response, COLLATE "C" on text PK columns to prevent locale-driven cross-tenant collisions. * ON CONFLICT (scope_key, key) DO UPDATE … WHERE existing.expires_at <= now() is FIRST-WRITER-WINS under concurrent put — concurrent writers racing the same fresh slot cannot violate the cache invariant "same (scope, key) → same payload_hash". * Naive datetime from expires_at raises ValueError with a schema-drift hint instead of silent UTC coercion (fail-fast policy). * Validates table_name through the same ASCII identifier regex as the other PG backends. * Optional adcp[pg] extra; ImportError with install hint when missing. Security hardening alongside the backend: * IdempotencyStore._extract_scope_key rejects tenant_id / caller_identity values containing the U+001E scope separator. Without this, tenant="A\\x1eB" + principal="X" would collide with tenant="A" + principal="B\\x1eX" — multi-tenant isolation defeated. * Log lines use _scope_log_id(scope_key) — sha256-truncated identifier, not the raw principal id. PII / commercial identity data no longer lands in centralized log sinks verbatim. Docstring caveats covered: atomicity (v1 separate-tx commit; non- idempotent handler side effects need handler-level dedup or co-tx follow-on); schema bootstrap (CREATE TABLE IF NOT EXISTS no-ops on pre-existing wrong-shape tables — Alembic-first deployments must copy the DDL verbatim); JSON-safe contract; per-principal rate limiting at the auth tier and delete_expired sweeps. Tests: * 15 unit tests with mocked psycopg pool. * 9 conformance tests against real Postgres (skipped without ADCP_PG_TEST_URL): first-writer-wins concurrent put, expired-row overwrite, end-to-end IdempotencyStore replay, scope_key isolation. * 4 scope-separator-rejection unit tests. * CI workflow extended to run the new conformance suite. Closes #548. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous test_put_overwrites_via_on_conflict asserted last-writer-wins behavior. The new ON CONFLICT … WHERE expires_at <= now() guard makes a sequential put against a non-expired slot a no-op. Update the test name + assertion to match the contract; the underlying SQL is correct. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #548. Replaces the
NotImplementedErrorscaffold with a real psycopg3-async-pool-backedPgBackend. Multi-worker AdCP sellers can now declareIdempotencySupported(supported=True)and actually dedupe across workers.PgBackend(pool=...)— async pool, per-call connection acquisition (matchesPgWebhookDeliverySupervisor/PgReplayStore)await create_schema()— idempotent CREATE TABLE IF NOT EXISTS + INDEX. JSONB response,COLLATE "C"on PK text columnsON CONFLICT … DO UPDATE … WHERE existing.expires_at <= now()— concurrent writers racing the same fresh slot can't violate the cache invariant "same (scope, key) → same payload_hash"ValueErrorwith schema-drift hint (fail-fast over silent UTC coercion)table_name;adcp[pg]extra;ImportErrorwhen missingSecurity hardening alongside the backend
IdempotencyStore._extract_scope_keyrejectstenant_id/caller_identitycontaining the U+001E scope separator. Without this,tenant="A\x1eB"+principal="X"collides withtenant="A"+principal="B\x1eX"— multi-tenant isolation defeated_scope_log_id(scope_key)— sha256-truncated identifier instead of the raw principal id. PII no longer lands in centralized log sinksDocstring caveats
CREATE TABLE IF NOT EXISTSno-ops on a pre-existing wrong-shape table — Alembic-first deployments must copy the DDL verbatimjson.dumps; no datetime/Decimal/set/bytesdelete_expired()as a sweepTest plan
tests/test_pg_idempotency_backend.pyADCP_PG_TEST_URL): first-writer-wins concurrent put, expired-row overwrite, scope isolation, end-to-endIdempotencyStorereplay throughPgBackendtests/test_server_idempotency.pyruff check+mypycleanReviewed by
Pre-PR review by
code-reviewer,security-reviewer,python-expert. Material concerns addressed:ON CONFLICT DO UPDATErace (python-expert) — addedWHERE expires_at <= now()for first-writer-wins (was last-writer-wins)\x1eseparator confusion (security-expert) — added validator in_extract_scope_key, fail-closed on either tenant or principal containing the separator_scope_log_idreplaces rawscope_keyin WARNING/DEBUG lines🤖 Generated with Claude Code