Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions clients/bridge_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
reject_pending_write,
validate_pending_write,
)
from .probe import (
ProbeOutcome,
arm_probe,
await_probe,
probe_registry_size,
resolve_probe,
)
from .risk import RiskLevel, risk_classify
from .text_relay import RelayResult, relay_text

Expand All @@ -74,4 +81,6 @@
"InterceptResult", "classify_tool", "intercept", "pending_summary",
# Dispatch / text relay
"pop_bridge_metadata", "RelayResult", "relay_text",
# Capability probe
"ProbeOutcome", "arm_probe", "await_probe", "probe_registry_size", "resolve_probe",
]
3 changes: 3 additions & 0 deletions clients/bridge_core/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class AuditEvent(str, Enum):
NEEDS_REVISION = "needs_revision"
CHAIN_VERIFIED = "chain_verified"
CHAIN_BROKEN = "chain_broken"
NARRATED_BUT_NOT_EXECUTED = "narrated_but_not_executed"
RING2_CAPABILITY_FAILED = "ring2_capability_failed"
RING2_CAPABILITY_VERIFIED = "ring2_capability_verified"


def append_audit_event(
Expand Down
10 changes: 10 additions & 0 deletions clients/bridge_core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class BridgeContext:
# (e.g. bridge "reflection" semantically maps to Stack "hypothesis")
layer_translation: dict[str, str] = field(default_factory=lambda: {"reflection": "hypothesis"})

# Ring 2 capability probe — DEFAULTS TO FALSE (detector mode).
#
# When False (default): a probe timeout records an audit event and sets a
# capability flag, but NEVER disables Ring 2 for this connection. The OpenAI
# bridge leaves this False; its Ring 2 dispatch path is byte-for-byte unchanged.
#
# When True (opt-in hard-gate): a probe timeout disables Ring 2 for this
# connection/session only. Global module state is never mutated.
require_ring2_probe: bool = False

# Convenience accessors

@property
Expand Down
19 changes: 18 additions & 1 deletion clients/bridge_core/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from typing import Any

from .context import BridgeContext
from .pending_writes import Proposal, ValidationError, create_pending_write, list_pending_writes
from .pending_writes import (
Proposal,
ValidationError,
create_pending_write,
get_proposal_by_id,
list_pending_writes,
)
from .risk import risk_classify

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -153,6 +159,17 @@ def classify_tool(ctx: BridgeContext, tool_name: str, args: dict | None = None)
return {"tool": tool_name, "ring": 3, "blocked": True}


def verify_proposal(ctx: BridgeContext, proposal_id: str) -> dict:
"""
Verify whether a claimed proposal actually exists and its hash is intact.

Delegates to pending_writes.get_proposal_by_id. Returns the verification
dict directly — found=False for a missing proposal (the canonical signal
that a narrated-but-not-dispatched write never landed in the queue).
"""
return get_proposal_by_id(ctx, proposal_id)


def pending_summary(ctx: BridgeContext) -> str:
"""Quick human-readable summary of the substrate's pending queue."""
all_pending = list_pending_writes(ctx, status="pending")
Expand Down
54 changes: 54 additions & 0 deletions clients/bridge_core/pending_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,60 @@ def needs_revision_pending_write(
return proposal


def get_proposal_by_id(ctx: BridgeContext, proposal_id: str) -> dict:
"""
Load a proposal by id (full UUID or 8-char prefix) and verify its audit hash.

Returns a verification dict that is safe to return to read-only callers:
- On missing proposal: {"found": False, "proposal_id": <id>, "error": "not_found"}
- On success: {"found": True, "proposal_id": ..., "tool": ...,
"status": ..., "substrate": ..., "timestamp": ...,
"risk_level": ..., "audit_hash": ...,
"chain_valid": bool, "error": None | "hash_mismatch"}

chain_valid is True when the stored audit_hash matches the hash recomputed from
the creation-time snapshot (mutable lifecycle fields restored to their initial
values, matching the exact snapshot hashed in create_pending_write).
"""
try:
proposal, _path = _load_proposal(ctx, proposal_id)
except FileNotFoundError:
return {"found": False, "proposal_id": proposal_id, "error": "not_found"}

# Reconstruct the creation-time snapshot: same field exclusions as
# _precondition_check and create_pending_write use when computing audit_hash.
# The hash covers all fields except audit_hash, with lifecycle mutables
# restored to their creation-time values.
_MUTABLE = {
"status", "reviewed_by", "reviewed_at", "revision_notes",
"commit_result", "audit_hash",
}
d = proposal.to_dict()
creation_snapshot = {k: v for k, v in d.items() if k not in _MUTABLE}
creation_snapshot["status"] = "pending"
creation_snapshot["reviewed_by"] = None
creation_snapshot["reviewed_at"] = None
creation_snapshot["revision_notes"] = None
creation_snapshot["commit_result"] = None

recomputed = hash_pending_write(creation_snapshot, proposal.prev_hash)
chain_valid = recomputed == proposal.audit_hash
error = None if chain_valid else "hash_mismatch"

return {
"found": True,
"proposal_id": proposal.proposal_id,
"tool": proposal.tool,
"status": proposal.status,
"substrate": proposal.substrate,
"timestamp": proposal.timestamp,
"risk_level": proposal.risk_level,
"audit_hash": proposal.audit_hash,
"chain_valid": chain_valid,
"error": error,
}


def list_pending_writes(ctx: BridgeContext, status: str | None = None) -> list[dict]:
"""List proposals, optionally filtered by status. Returns summary dicts."""
ctx.pending_writes_dir.mkdir(parents=True, exist_ok=True)
Expand Down
114 changes: 114 additions & 0 deletions clients/bridge_core/probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

"""
Generic per-connection Ring 2 capability probe.

A probe detects, at connect time, whether a given substrate connector
actually dispatches Ring 2 (write-class) tool calls to our SSE handler —
as opposed to narrating a write that never arrives.

Usage pattern
─────────────
1. At connect time (in the substrate's SSE handler):
probe_key = arm_probe(connection_id)

2. In the per-tool dispatch handler, when the sentinel tool arrives:
resolve_probe(connection_id)

3. Back in the connect handler, concurrently with serving tools:
outcome = await await_probe(connection_id, timeout=PROBE_TIMEOUT_SECONDS)
# "verified" | "failed"

Design constraints
──────────────────
- Pure and testable: no SSE, no MCP, no globals beyond the registry dict.
- No cross-connection leakage: each probe is keyed by a per-connection UUID.
- Guaranteed cleanup: await_probe removes the registry entry in a finally
block regardless of whether the Future resolved or timed out.
- Thread-safety: asyncio.Future is created on the running event loop; this
module must be used from a single asyncio event loop (standard for ASGI).
"""

import asyncio
import logging
from typing import Literal

logger = logging.getLogger(__name__)

# Registry: connection_id → asyncio.Future[None]
# Populated by arm_probe, resolved by resolve_probe, consumed + cleaned by await_probe.
_PROBE_REGISTRY: dict[str, asyncio.Future[None]] = {}

ProbeOutcome = Literal["verified", "failed"]


def arm_probe(probe_key: str) -> None:
"""
Register an awaitable Future for `probe_key`.

Should be called once per connection before any tool dispatch can
arrive. If a probe is already armed for the same key (should not
happen in normal operation), the existing Future is replaced —
this prevents a stale Future from blocking a new connection that
happens to reuse the same key.
"""
loop = asyncio.get_running_loop()
_PROBE_REGISTRY[probe_key] = loop.create_future()
logger.debug("probe: armed for key=%s", probe_key)


def resolve_probe(probe_key: str) -> bool:
"""
Signal that the sentinel tool arrived for `probe_key`.

Returns True if a Future was found and resolved; False if no probe
was armed for this key (e.g. probing not enabled for this connection).
Safe to call even when no probe is armed — sentinel handling code
can always call this without checking first.
"""
fut = _PROBE_REGISTRY.get(probe_key)
if fut is None:
logger.debug("probe: resolve called but no probe armed for key=%s", probe_key)
return False
if not fut.done():
fut.set_result(None)
logger.debug("probe: resolved for key=%s", probe_key)
return True


async def await_probe(probe_key: str, timeout: float) -> ProbeOutcome:
"""
Await the probe Future for up to `timeout` seconds.

Returns:
"verified" — sentinel arrived within the timeout window.
"failed" — asyncio.TimeoutError; sentinel never arrived.

ALWAYS removes the registry entry in a finally block, so no Future
leaks regardless of outcome. If no Future is registered for this
key (arm_probe was not called), returns "failed" immediately.
"""
fut = _PROBE_REGISTRY.get(probe_key)
if fut is None:
logger.warning(
"probe: await_probe called but no probe armed for key=%s — "
"returning 'failed' without timeout wait",
probe_key,
)
return "failed"

try:
await asyncio.wait_for(asyncio.shield(fut), timeout=timeout)
logger.debug("probe: verified for key=%s", probe_key)
return "verified"
except asyncio.TimeoutError:
logger.debug("probe: timeout for key=%s (%.1fs)", probe_key, timeout)
return "failed"
finally:
_PROBE_REGISTRY.pop(probe_key, None)
logger.debug("probe: registry cleaned for key=%s", probe_key)


def probe_registry_size() -> int:
"""Return the current number of armed probes. Exposed for testing only."""
return len(_PROBE_REGISTRY)
Loading
Loading