Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- `trusted_endpoints`: registered URLs may now contain FastAPI/Express-style path placeholders. `{id}` matches exactly one path segment, `{rest:path}` matches any subtree. Plain URLs without `{` keep exact-match semantics — no migration needed for existing rows. Both `is_trusted_endpoint` and the snapshot tamper-check inside `evaluate_handoff` honor the new syntax. Closes #14.

## 0.2.0

- Added `provably.configure_indexing(enable_indexing: bool)`: one-call bootstrap (`initialize_runtime` + `init_interceptor` + `enable` / `disable`) for sender agents.
Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,31 @@ URLs are normalized (lowercase scheme + host, default ports collapsed, trailing
slash dropped) before any read or write so that `https://API.EXAMPLE.COM/x/`
and `https://api.example.com/x` collide on the same row.

#### Path-pattern entries

Concrete URLs match exactly. To authorize a family of URLs with a single entry —
useful for templated routes like `/customers/{id}` or runtime-generated ids —
register the URL with FastAPI/Express-style placeholders:

| Placeholder | Matches | Example |
|---|---|---|
| `{name}` | exactly one path segment (no `/`) | `https://api.example.com/customers/{id}` matches `…/customers/42` but **not** `…/customers/42/orders` |
| `{name:path}` | any subtree (including `/` separators) | `https://api.example.com/customers/{rest:path}` matches both `…/customers/42` and `…/customers/42/orders` |

The placeholder name (`id`, `rest`, …) is purely descriptive and does not affect
matching. Plain URLs without `{` characters keep exact-match semantics — no
behavior change for existing entries.

```sql
-- Register a templated route once instead of enumerating every concrete id
INSERT INTO trusted_endpoints (org_id, normalized_url, display_label, entry_type)
VALUES ('my-org', 'https://api.example.com/customers/{id}', 'Customers (by id)', 'endpoint');
```

`is_trusted_endpoint` and the snapshot tamper-check inside `evaluate_handoff`
both honor the same matching rules, so a claim against `…/customers/42` will
pass both gates when only the templated entry is registered.

## Public API

All public symbols are re-exported from the top-level `provably` namespace. See
Expand Down
89 changes: 86 additions & 3 deletions src/provably/trusted_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import re
from functools import lru_cache
from typing import TYPE_CHECKING
from urllib.parse import urlparse

Expand All @@ -12,6 +14,58 @@

_DDL_DONE = False

# ---------------------------------------------------------------------------
# Pattern matching
#
# A registered URL may contain FastAPI/Express-style path placeholders so a single
# entry can authorize a family of concrete URLs:
#
# {name} — matches one path segment (no '/'). E.g. /customers/{id} matches
# /customers/123 but NOT /customers/123/orders.
# {name:path} — matches any subtree, including '/' separators. E.g.
# /customers/{rest:path} matches both /customers/123 and
# /customers/123/orders.
#
# Plain URLs (no '{' character) keep exact-match semantics — no behavior change for
# existing entries.
# ---------------------------------------------------------------------------

_PLACEHOLDER_RE = re.compile(r"\{[^}/]+(?::path)?\}")


@lru_cache(maxsize=512)
def _compile_pattern(registered: str) -> re.Pattern[str] | None:
"""Compile a registered URL into a regex if it has placeholders, else return None.

Cache keeps regex compilation off the hot per-request path.
"""
if "{" not in registered:
return None
parts: list[str] = []
cursor = 0
has_placeholder = False
for match in _PLACEHOLDER_RE.finditer(registered):
parts.append(re.escape(registered[cursor : match.start()]))
is_path = ":path" in match.group(0)
parts.append(".+?" if is_path else "[^/]+?")
cursor = match.end()
has_placeholder = True
if not has_placeholder:
return None
parts.append(re.escape(registered[cursor:]))
try:
return re.compile(f"^{''.join(parts)}$")
except re.error:
return None


def _matches_registered(claim_url: str, registered: str) -> bool:
"""``True`` when ``claim_url`` exactly matches ``registered`` or matches its pattern."""
if claim_url == registered:
return True
pattern = _compile_pattern(registered)
return pattern is not None and pattern.match(claim_url) is not None


def normalize_url_for_trust(url: str) -> str:
"""Return the canonical form of ``url`` used for trust look-ups.
Expand Down Expand Up @@ -74,14 +128,21 @@ def ensure_trusted_endpoints_table(conn: psycopg2.extensions.connection) -> None


def is_trusted_endpoint(url: str, org_id: str, conn: psycopg2.extensions.connection) -> bool:
"""Return whether ``url`` is currently allowlisted for ``org_id``; normalizes URL before look-up."""
"""Return whether ``url`` is currently allowlisted for ``org_id``.

Two-phase lookup: exact match first (fast path, single indexed query), then a
pattern-match scan over only the rows containing ``{`` in their ``normalized_url``.
Plain URLs without placeholders never enter the slow path, so existing exact-match
registries see no perf regression.
"""
if not url or not org_id:
return False
norm = normalize_url_for_trust(url)
if not norm:
return False
_ensure_trusted_table(conn)
with conn.cursor() as cur:
# Fast path: exact match.
cur.execute(
"""
SELECT 1 FROM trusted_endpoints
Expand All @@ -90,7 +151,21 @@ def is_trusted_endpoint(url: str, org_id: str, conn: psycopg2.extensions.connect
""",
(org_id, norm),
)
return cur.fetchone() is not None
if cur.fetchone() is not None:
return True
# Slow path: pattern entries only.
cur.execute(
"""
SELECT normalized_url FROM trusted_endpoints
WHERE org_id = %s AND entry_type = 'endpoint' AND revoked_at IS NULL
AND normalized_url LIKE '%%{%%'
""",
(org_id,),
)
for (registered,) in cur.fetchall():
if _matches_registered(norm, str(registered or "")):
return True
return False


def list_trusted_endpoints(
Expand Down Expand Up @@ -208,7 +283,15 @@ def check_claim_endpoints_are_trusted(

registry = {n for url in hp.trusted_endpoint_registry if (n := normalize_url_for_trust(str(url)))}
if registry:
missing = list(dict.fromkeys(u for u in claim_urls if u not in registry))
pattern_entries = [r for r in registry if "{" in r]
missing: list[str] = []
for claim_url in claim_urls:
if claim_url in registry:
continue
if any(_matches_registered(claim_url, entry) for entry in pattern_entries):
continue
missing.append(claim_url)
missing = list(dict.fromkeys(missing))
if missing:
raise ValueError(f"handoff has endpoints missing from trusted snapshot: {', '.join(missing)}")

Expand Down
135 changes: 135 additions & 0 deletions tests/unit/test_trusted_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pytest

from provably.trusted_endpoints import (
_compile_pattern,
_matches_registered,
is_trusted_endpoint,
list_trusted_endpoints,
normalize_url_for_trust,
Expand Down Expand Up @@ -46,6 +48,139 @@ def test_is_trusted_queries_normalized_row(monkeypatch: pytest.MonkeyPatch) -> N
assert args[1][1] == "https://x.com/a"


# ---------------------------------------------------------------------------
# Pattern matching ({name} and {name:path} placeholders)
# ---------------------------------------------------------------------------


@pytest.mark.parametrize(
"registered",
[
"https://api.example.com/customers",
"https://api.example.com/customers/123",
"https://example.com",
],
)
def test_compile_pattern_returns_none_for_plain_urls(registered: str) -> None:
assert _compile_pattern(registered) is None


def test_pattern_single_segment_matches_one_path_segment() -> None:
pattern = _compile_pattern("https://api.example.com/customers/{id}")
assert pattern is not None
assert pattern.match("https://api.example.com/customers/123") is not None
assert pattern.match("https://api.example.com/customers/abc-DEF") is not None
# Must NOT swallow additional path segments
assert pattern.match("https://api.example.com/customers/123/orders") is None
# Must NOT match a different prefix
assert pattern.match("https://api.example.com/clients/123") is None
# Must NOT match the bare prefix without an id segment
assert pattern.match("https://api.example.com/customers/") is None


def test_pattern_path_placeholder_matches_subtree() -> None:
pattern = _compile_pattern("https://api.example.com/customers/{rest:path}")
assert pattern is not None
assert pattern.match("https://api.example.com/customers/123") is not None
assert pattern.match("https://api.example.com/customers/123/orders/456") is not None
# Still anchored at the prefix
assert pattern.match("https://api.example.com/clients/123") is None


def test_pattern_multiple_placeholders() -> None:
pattern = _compile_pattern("https://api.example.com/customers/{cust}/orders/{order}")
assert pattern is not None
assert pattern.match("https://api.example.com/customers/c1/orders/o9") is not None
assert pattern.match("https://api.example.com/customers/c1/orders/o9/items/x") is None


def test_matches_registered_falls_back_to_exact() -> None:
assert _matches_registered("https://x.com/a", "https://x.com/a") is True
assert _matches_registered("https://x.com/a", "https://x.com/b") is False


def test_matches_registered_uses_pattern_when_present() -> None:
assert _matches_registered("https://x.com/customers/9", "https://x.com/customers/{id}") is True
assert _matches_registered("https://x.com/customers/9/orders", "https://x.com/customers/{id}") is False


def test_is_trusted_endpoint_matches_pattern_entry(monkeypatch: pytest.MonkeyPatch) -> None:
"""A claim URL matching a registered ``{id}`` pattern is trusted via the slow path."""
monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None)
conn = MagicMock()
cur = MagicMock()
conn.cursor.return_value.__enter__ = lambda *_: cur
conn.cursor.return_value.__exit__ = lambda *_: None
# First query (exact match) misses; second query (pattern entries) returns one row.
cur.fetchone.return_value = None
cur.fetchall.return_value = [("https://api.example.com/customers/{id}",)]

assert is_trusted_endpoint("https://api.example.com/customers/42", "org-1", conn) is True
# Exact-then-pattern: two execute calls.
assert cur.execute.call_count == 2


def test_is_trusted_endpoint_rejects_nonmatching_pattern(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None)
conn = MagicMock()
cur = MagicMock()
conn.cursor.return_value.__enter__ = lambda *_: cur
conn.cursor.return_value.__exit__ = lambda *_: None
cur.fetchone.return_value = None
# Registered pattern allows /customers/{id} only — claim hits a deeper path.
cur.fetchall.return_value = [("https://api.example.com/customers/{id}",)]

assert is_trusted_endpoint("https://api.example.com/customers/42/orders", "org-1", conn) is False


def test_snapshot_check_accepts_pattern_match(monkeypatch: pytest.MonkeyPatch) -> None:
"""The snapshot tamper-check must honor pattern entries the same way the live DB check does."""
from provably.handoff.types import HandoffClaim, HandoffPayload
from provably.trusted_endpoints import check_claim_endpoints_are_trusted

# Live DB check is exercised separately; stub it as trusting whatever made it past
# the snapshot check (returns True).
monkeypatch.setattr("provably.trusted_endpoints.is_trusted_endpoint", lambda *_a, **_kw: True)
monkeypatch.setattr("psycopg2.connect", lambda *_a, **_kw: MagicMock())

payload = HandoffPayload(
provably_org_id="org-1",
trusted_endpoint_registry=["https://api.example.com/customers/{id}"],
claims=[
HandoffClaim(
action_name="get_customer",
request_payload={"url": "https://api.example.com/customers/42", "method": "GET"},
)
],
)

# Should NOT raise — pattern entry covers the concrete URL.
check_claim_endpoints_are_trusted(payload, postgres_url="postgresql://x")


def test_snapshot_check_rejects_url_outside_pattern(monkeypatch: pytest.MonkeyPatch) -> None:
from provably.handoff.types import HandoffClaim, HandoffPayload
from provably.trusted_endpoints import check_claim_endpoints_are_trusted

monkeypatch.setattr("provably.trusted_endpoints.is_trusted_endpoint", lambda *_a, **_kw: True)
monkeypatch.setattr("psycopg2.connect", lambda *_a, **_kw: MagicMock())

payload = HandoffPayload(
provably_org_id="org-1",
trusted_endpoint_registry=["https://api.example.com/customers/{id}"],
claims=[
HandoffClaim(
action_name="get_orders",
# Goes one segment deeper than {id} permits.
request_payload={"url": "https://api.example.com/customers/42/orders", "method": "GET"},
)
],
)

with pytest.raises(ValueError, match="missing from trusted snapshot"):
check_claim_endpoints_are_trusted(payload, postgres_url="postgresql://x")


def test_list_trusted_endpoints_excludes_given_urls(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("provably.trusted_endpoints._ensure_trusted_table", lambda _c: None)
conn = MagicMock()
Expand Down