Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,11 @@ def sync_turn(
agent_thinking=thinking,
)
if _is_verifier_feedback_prompt(user):
self._submit_verifier_feedback(user, assistant, ts_ms)
feedback_submitted = True
try:
self._submit_verifier_feedback(user, assistant, ts_ms)
feedback_submitted = True
except Exception as err:
logger.warning("MemOS: verifier feedback submit failed — %s", err)
except Exception as err:
if not self._is_transport_closed(err):
logger.warning("MemOS: sync_turn turn.end failed — %s", err)
Expand All @@ -853,8 +856,11 @@ def sync_turn(
agent_thinking=thinking,
)
if _is_verifier_feedback_prompt(user) and not feedback_submitted:
self._submit_verifier_feedback(user, assistant, ts_ms)
feedback_submitted = True
try:
self._submit_verifier_feedback(user, assistant, ts_ms)
feedback_submitted = True
except Exception as err:
logger.warning("MemOS: verifier feedback submit failed — %s", err)
except Exception:
logger.exception(
"MemOS: sync_turn failed after bridge reconnect; "
Expand Down Expand Up @@ -1603,6 +1609,10 @@ def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> N
self._open_session(session_id, timeout=timeout)

def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool:
if self._bridge and not self._bridge.is_running():
with contextlib.suppress(Exception):
self._bridge.close()
self._bridge = None
if self._bridge:
return True
try:
Expand Down Expand Up @@ -1708,7 +1718,7 @@ def _turn_end(
}
if agent_thinking:
payload["agentThinking"] = agent_thinking
result = self._bridge.request("turn.end", payload)
result = self._bridge.request("turn.end", payload, timeout=75.0)
# Capture the trace ID for feedback submission
if result and isinstance(result, dict):
trace_ids = result.get("traceIds", [])
Expand Down Expand Up @@ -1743,7 +1753,7 @@ def _submit_verifier_feedback(
# Include the last trace ID if available
if self._last_trace_id:
payload["traceId"] = self._last_trace_id
self._bridge.request("feedback.submit", payload)
self._bridge.request("feedback.submit", payload, timeout=10.0)


# ─── Discovery entry points ───────────────────────────────────────────────
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import shutil
import subprocess
import threading
import time

from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -220,11 +221,24 @@ def close(self) -> None:
self._closed = True
with contextlib.suppress(Exception):
self._proc.stdin.close()
# DON'T wait() or kill() the bridge process. If it has an
# active viewer (HTTP server), it will stay alive as a daemon
# so the memory panel remains accessible between `hermes chat`
# sessions. If it's headless (viewer port was taken), it will
# notice stdin EOF and exit on its own.
# This client owns a non-daemon stdio bridge process. Let it
# drain briefly on stdin EOF, then terminate it so reconnect
# storms do not accumulate orphaned `bridge.cts --agent=hermes`
# workers. The long-lived viewer daemon is started elsewhere
# with `--daemon`, so killing this child does not remove the UI.
deadline = time.time() + 1.5
while self._proc.poll() is None and time.time() < deadline:
time.sleep(0.05)
if self._proc.poll() is None:
with contextlib.suppress(Exception):
self._proc.terminate()
try:
self._proc.wait(timeout=2.0)
except Exception:
with contextlib.suppress(Exception):
self._proc.kill()
with contextlib.suppress(Exception):
self._proc.wait(timeout=1.0)
# unblock any pending waiters
with self._lock:
for entry in list(self._pending.values()):
Expand All @@ -236,6 +250,9 @@ def close(self) -> None:
entry["event"].set()
self._pending.clear()

def is_running(self) -> bool:
return self._proc.poll() is None

# ─── Internals ──

def _read_loop(self) -> None:
Expand Down
40 changes: 39 additions & 1 deletion apps/memos-local-plugin/tests/python/test_bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import json
import sys
import threading
import time
import unittest

from pathlib import Path
Expand Down Expand Up @@ -43,6 +44,8 @@ def __init__(self, *_args, **_kwargs) -> None:
self._stdin_lines: list[str] = []
self.stdout = _ServerStream()
self.stderr = io.StringIO()
self._terminated = False
self._killed = False

# Patch the write path so writes accumulate in `_stdin_lines`
# and the server can peek at incoming requests.
Expand All @@ -57,10 +60,18 @@ def _write(s: str) -> int:

# The client just needs wait/kill to exist; they are no-ops here.
def wait(self, timeout: float | None = None) -> int:
if timeout is not None and not self._terminated and not self._killed:
raise TimeoutError("fake process still running")
return 0

def poll(self) -> int | None:
return 0 if (self._terminated or self._killed) else None

def terminate(self) -> None:
self._terminated = True

def kill(self) -> None:
pass
self._killed = True


class _ServerStream(io.StringIO):
Expand Down Expand Up @@ -289,6 +300,21 @@ def test_close_is_idempotent(self) -> None:
client.close()
client.close() # second call must not raise

def test_close_terminates_stdio_bridge_process(self) -> None:
client = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
assert self._fake is not None
started = time.time()
client.close()
self.assertLess(time.time() - started, 3.5)
self.assertTrue(self._fake._terminated or self._fake._killed)

def test_is_running_reflects_subprocess_state(self) -> None:
client = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
assert self._fake is not None
self.assertTrue(client.is_running())
self._fake.terminate()
self.assertFalse(client.is_running())


class MemTensorProviderTests(unittest.TestCase):
"""Exercise `MemTensorProvider` against a mocked bridge."""
Expand Down Expand Up @@ -468,6 +494,9 @@ def test_sync_turn_transport_closed_logs_error_if_retry_fails(self) -> None:
"""Retry failures are surfaced explicitly instead of silent loss."""

class BrokenBridge:
def is_running(self):
return True

def close(self):
pass

Expand All @@ -477,6 +506,9 @@ def request(self, method, params=None, **_kwargs):
return {}

class RetryFailBridge:
def is_running(self):
return True

def request(self, method, params=None):
if method == "session.open":
return {"sessionId": (params or {}).get("sessionId", "sess")}
Expand Down Expand Up @@ -511,6 +543,9 @@ class BrokenBridge:
def __init__(self):
self.closed = False

def is_running(self):
return True

def close(self):
self.closed = True

Expand All @@ -523,6 +558,9 @@ class HealthyBridge:
def __init__(self):
self.calls = []

def is_running(self):
return True

def register_host_handler(self, _method, _handler):
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self) -> None:
self.calls: list[tuple[str, dict]] = []
self.closed = False
self.host_handlers: dict[str, object] = {}
self.running = True

def register_host_handler(self, method: str, handler: object) -> None:
self.host_handlers[method] = handler
Expand Down Expand Up @@ -57,12 +58,17 @@ def request(self, method: str, params: dict | None = None, **_kwargs: object) ->

def close(self) -> None:
self.closed = True
self.running = False

def is_running(self) -> bool:
return self.running


class FailingSessionOpenBridge(FakeBridge):
def request(self, method: str, params: dict | None = None, **_kwargs: object) -> dict:
if method == "session.open":
self.closed = True
self.running = False
raise RuntimeError("session.open did not respond")
return super().request(method, params, **_kwargs)

Expand Down Expand Up @@ -155,6 +161,52 @@ def bridge_factory() -> FakeBridge:
self.assertEqual(turn_end["episodeId"], "episode-from-turn-start")
self.assertEqual(turn_end["toolCalls"][0]["name"], "read_file")

def test_sync_turn_recreates_dead_bridge_before_turn_end(self) -> None:
dead_bridge = FakeBridge()
dead_bridge.running = False
recovered_bridge = FakeBridge()

with (
patch("memos_provider.ensure_bridge_running", return_value=True),
patch("memos_provider.MemosBridgeClient", return_value=recovered_bridge),
):
provider = memos_provider.MemTensorProvider()
provider._bridge = dead_bridge
provider._session_id = "host-session"
provider._episode_id = "episode-old"
provider.sync_turn("检查 viewer", "viewer 正常")

self.assertTrue(dead_bridge.closed)
methods = [method for method, _params in recovered_bridge.calls]
self.assertEqual(methods, ["session.open", "turn.end"])

def test_sync_turn_ignores_feedback_submit_timeout(self) -> None:
class FeedbackFailBridge(FakeBridge):
def request(self, method: str, params: dict | None = None, **_kwargs: object) -> dict:
if method == "feedback.submit":
self.calls.append((method, params or {}))
raise RuntimeError("feedback timeout")
return super().request(method, params, **_kwargs)

bridge = FeedbackFailBridge()
with (
patch("memos_provider.ensure_bridge_running", return_value=True),
patch("memos_provider.MemosBridgeClient", return_value=bridge),
self.assertLogs("memos_provider", level="WARNING") as logs,
):
provider = memos_provider.MemTensorProvider()
provider.initialize("host-session")
provider.on_turn_start(1, "r <= -0.5 verifier feedback: 这次路径错了")
provider.prefetch("r <= -0.5 verifier feedback: 这次路径错了")
provider.sync_turn(
"r <= -0.5 verifier feedback: 这次路径错了",
"收到,下次避免这个路径。",
)

methods = [method for method, _params in bridge.calls]
self.assertEqual(methods, ["session.open", "turn.start", "turn.end", "feedback.submit"])
self.assertIn("verifier feedback submit failed", "\n".join(logs.output))

def test_delegation_recovers_when_initial_bridge_open_timed_out(self) -> None:
recovered_bridge = FakeBridge()
bridge_attempts = [FailingSessionOpenBridge(), recovered_bridge]
Expand Down