Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 102 additions & 18 deletions src/opencode_agent_hub/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ def _wait_for_coordinator_activity(
class InjectionTask:
session_id: str
text: str
agent: str | None = None


@dataclass
Expand Down Expand Up @@ -1867,6 +1868,13 @@ def start_coordinator() -> bool:
COORDINATOR_SESSION_ID = session_id
ORIENTED_SESSIONS.add(session_id)

# Log the coordinator's agent if we can detect it
coordinator_agent_name = get_agent_from_session_messages(session_id)
if coordinator_agent_name:
log.info(f"Coordinator is using agent: {coordinator_agent_name}")
else:
log.info("Coordinator agent not yet detected (will be logged after first message)")

# Register coordinator as an agent so other agents can message it
coordinator_agent = {
"id": "coordinator",
Expand Down Expand Up @@ -1972,14 +1980,15 @@ def notify_coordinator_new_agent(agent_id: str, directory: str) -> None:

before_ms = int(time.time() * 1000)
notification = f"NEW_AGENT: {agent_id} at {directory}"
inject_message(COORDINATOR_SESSION_ID, notification)
# Notification to coordinator should use coordinator's own agent
inject_message(COORDINATOR_SESSION_ID, notification, agent="coordinator")
log.info(f"Notified coordinator of new agent: {agent_id}")

if not _wait_for_coordinator_activity(COORDINATOR_SESSION_ID, before_ms, timeout_seconds=5):
log.warning(
f"Coordinator showed no activity after NEW_AGENT {agent_id}; retrying notification once"
)
inject_message(COORDINATOR_SESSION_ID, notification)
inject_message(COORDINATOR_SESSION_ID, notification, agent="coordinator")


def poll_coordinator_cost() -> None:
Expand Down Expand Up @@ -2012,6 +2021,14 @@ def poll_coordinator_cost() -> None:
if not isinstance(messages, list):
return

# Log the coordinator's current agent from most recent message
if messages:
most_recent = messages[-1] # Messages are in chronological order
info = most_recent.get("info", {})
agent = info.get("agent")
if agent:
log.debug(f"Coordinator using agent: {agent}")

# Sum token usage from all assistant messages
total_input = 0
total_output = 0
Expand Down Expand Up @@ -2180,16 +2197,24 @@ def find_sessions_for_agent(agent: dict, sessions: list[dict]) -> list[dict]:
return [matching[0]]


def inject_message_sync(session_id: str, text: str) -> bool:
def inject_message_sync(session_id: str, text: str, agent: str | None = None) -> bool:
"""Inject message into OpenCode session (synchronous, with retries).

Uses /prompt_async endpoint which triggers LLM invocation even when idle.
The /message endpoint with noReply:false only adds to context without
actually invoking the LLM when the session is idle.

Args:
session_id: Target session ID
text: Message text to inject
agent: Optional agent ID to use for handling this message. When provided,
OpenCode will use this agent instead of the default.
"""
payload = {
payload: dict[str, Any] = {
"parts": [{"type": "text", "text": text}],
}
if agent:
payload["agent"] = agent

for attempt in range(INJECTION_RETRIES):
try:
Expand All @@ -2202,7 +2227,10 @@ def inject_message_sync(session_id: str, text: str) -> bool:
)
# prompt_async returns 204 No Content on success
if resp.status_code in (200, 204):
log.info(f"Injected message into session {session_id[:8]}... (prompt_async)")
agent_info = f" [agent={agent}]" if agent else ""
log.info(
f"Injected message into session {session_id[:8]}... (prompt_async){agent_info}"
)
metrics.inc("agent_hub_injections_total")
return True
else:
Expand All @@ -2219,9 +2247,15 @@ def inject_message_sync(session_id: str, text: str) -> bool:
return False


def inject_message(session_id: str, text: str) -> None:
"""Queue message for async injection (non-blocking)."""
_injection_queue.put(InjectionTask(session_id=session_id, text=text))
def inject_message(session_id: str, text: str, agent: str | None = None) -> None:
"""Queue message for async injection (non-blocking).

Args:
session_id: Target session ID
text: Message text to inject
agent: Optional agent ID to use for handling this message
"""
_injection_queue.put(InjectionTask(session_id=session_id, text=text, agent=agent))


def injection_worker(shutdown_event: threading.Event) -> None:
Expand All @@ -2233,7 +2267,7 @@ def injection_worker(shutdown_event: threading.Event) -> None:
continue

try:
inject_message_sync(task.session_id, task.text)
inject_message_sync(task.session_id, task.text, task.agent)
except Exception as e:
log.error(f"Injection worker error: {e}")
finally:
Expand Down Expand Up @@ -2371,6 +2405,47 @@ def generate_agent_id_for_session(session: dict[str, Any]) -> str:
return f"session-{session_id[:12]}" if session_id else "unknown-session"


def get_agent_from_session_messages(session_id: str) -> str | None:
"""Get the current agent for a session from the most recent message.

Queries the OpenCode SQLite database to find the agent used in the
most recent message for this session. This reflects the actual running
agent, not just the configured default.

Args:
session_id: The session ID to look up

Returns:
The agent name (e.g., "kimi", "claude", " coordinator") or None
"""
if not session_id or not OPENCODE_DB_PATH.exists():
return None

try:
conn = sqlite3.connect(f"file:{OPENCODE_DB_PATH}?mode=ro", uri=True, timeout=5)
conn.row_factory = sqlite3.Row
try:
# Get the most recent message for this session
row = conn.execute(
"SELECT data FROM message WHERE session_id = ? ORDER BY time_created DESC LIMIT 1",
(session_id,),
).fetchone()

if row and row["data"]:
msg_data = json.loads(row["data"])
# Agent is stored in the message data
agent = msg_data.get("agent")
if isinstance(agent, str) and agent:
log.debug(f"Detected agent '{agent}' from session {session_id[:12]}")
return agent
finally:
conn.close()
except (sqlite3.Error, json.JSONDecodeError, OSError) as e:
log.debug(f"Failed to get agent from session messages: {e}")

return None


def get_or_create_agent_for_session(
session: dict[str, Any], agents: dict[str, dict[str, Any]]
) -> dict[str, Any]:
Expand All @@ -2380,20 +2455,23 @@ def get_or_create_agent_for_session(
identity per session, allowing multiple TUI sessions in the same directory
to have separate agent identities.

The agent ID is derived from the session's slug (if available) or session ID,
ensuring uniqueness across all sessions.
The agent ID is derived from the session's actual running agent (queried
from the SQLite database), or falls back to the session slug/ID.
"""
session_id = cast(str, session.get("id", ""))
directory = cast(str, session.get("directory", ""))

# Check if we already have a mapping for this session
if session_id in SESSION_AGENTS:
agent_id = SESSION_AGENTS[session_id]["agentId"]
if agent_id in agents:
return agents[agent_id]

# Generate unique agent ID from session
agent_id = generate_agent_id_for_session(session)
directory = cast(str, session.get("directory", ""))

# Try to get the actual agent from the session's most recent message
detected_agent = get_agent_from_session_messages(session_id)
# Use detected agent, or fall back to session slug/ID
agent_id = detected_agent or generate_agent_id_for_session(session)

# Handle conflicts by appending session ID fragment
if agent_id in agents and agents[agent_id].get("sessionId") != session_id:
Expand Down Expand Up @@ -2519,7 +2597,8 @@ def orient_session(

# Inject minimal orientation to the agent
orientation = format_orientation(agent, all_agents)
inject_message(session_id, orientation)
# Orientation comes from the daemon, use the target agent's ID as context
inject_message(session_id, orientation, agent=agent_id)

# Notify coordinator of new agent (coordinator will reach out to capture task)
notify_coordinator_new_agent(cast(str, agent_id), cast(str, directory))
Expand Down Expand Up @@ -2593,7 +2672,7 @@ def check_orientation_retries(agents: dict[str, dict[str, Any]]) -> None:
agent or {"id": agent_id, "projectPath": ""},
agents,
)
inject_message(session_id, orientation)
inject_message(session_id, orientation, agent=agent_id)
pending["retries"] = retries + 1
pending["oriented_at"] = now # Reset timer for next retry window
metrics.inc("agent_hub_orientation_retries_total")
Expand Down Expand Up @@ -2795,9 +2874,13 @@ def process_message_file(path: Path, agents: dict[str, dict[str, Any]]) -> None:
)
if matching_sessions:
notification = format_notification(msg, cast(str, agent["id"]))
# Use the target agent's ID so the message is handled by their configured agent
target_agent_id = cast(str, agent["id"])
for session in matching_sessions:
log.info(f"Injecting message into session {session['id']} for agent {agent['id']}")
inject_message(cast(str, session["id"]), notification)
log.info(
f"Injecting message into session {session['id']} for agent {target_agent_id}"
)
inject_message(cast(str, session["id"]), notification, agent=target_agent_id)
delivered = True
else:
log.info(f"No session found for agent {agent['id']}")
Expand Down Expand Up @@ -3115,6 +3198,7 @@ def main() -> None:
log.info(f"Watching agents: {AGENTS_DIR}")
log.info(f"OpenCode API: {OPENCODE_URL}")
log.info(f"Message TTL: {MESSAGE_TTL_SECONDS}s, GC interval: {GC_INTERVAL_SECONDS}s")
log.info("Agent detection: enabled (detects agent from session messages)")
if COORDINATOR_ENABLED:
log.info(f"Coordinator: enabled, dir={COORDINATOR_DIR}")
else:
Expand Down
Loading
Loading