Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ working_dir = "/home/agent"
[pool]
max_sessions = 10
session_ttl_hours = 24
# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min).
# prompt_hard_timeout_secs = 1800
# Liveness-check cadence (sec) for the recv loop; see #732. Default: 30.
# liveness_check_secs = 30

[markdown]
tables = "code" # "code" (default) | "bullets" | "off"
Expand Down
331 changes: 236 additions & 95 deletions src/acp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};

/// Pick the most permissive selectable permission option from ACP options.
fn pick_best_option(options: &[Value]) -> Option<String> {
Expand Down Expand Up @@ -149,6 +149,112 @@ fn build_agent_env(
(result, inherited)
}

/// Reader loop body: reads JSON-RPC messages from `reader`, auto-replies
/// `session/request_permission` via `writer`, resolves pending responses,
/// and forwards notifications + stale id-bearing messages to the active
/// subscriber. Extracted as a free generic function so unit tests can drive
/// it with `tokio::io::duplex()` halves instead of a real child process.
pub(crate) async fn run_reader_loop<R, W>(
reader: R,
writer: Arc<Mutex<W>>,
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>>,
notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>>,
) where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {}
Err(e) => {
error!("reader error: {e}");
break;
}
}
let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) {
Ok(m) => m,
Err(_) => continue,
};
debug!(line = line.trim(), "acp_recv");

// Auto-reply session/request_permission
if msg.method.as_deref() == Some("session/request_permission") {
if let Some(id) = msg.id {
let title = msg
.params
.as_ref()
.and_then(|p| p.get("toolCall"))
.and_then(|t| t.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("?");

let outcome = build_permission_response(msg.params.as_ref());
info!(title, %outcome, "auto-respond permission");
let reply = JsonRpcResponse::new(id, outcome);
if let Ok(data) = serde_json::to_string(&reply) {
let mut w = writer.lock().await;
let _ = w.write_all(format!("{data}\n").as_bytes()).await;
let _ = w.flush().await;
}
}
continue;
}

// Response (has id) → resolve pending AND forward to subscriber
if let Some(id) = msg.id {
let mut map = pending.lock().await;
if let Some(tx) = map.remove(&id) {
// Forward to subscriber so they see the completion
let sub = notify_tx.lock().await;
if let Some(ntx) = sub.as_ref() {
// Clone the essential fields for the subscriber
let _ = ntx.send(JsonRpcMessage {
id: Some(id),
method: None,
result: msg.result.clone(),
error: msg.error.clone(),
params: None,
});
}
let _ = tx.send(msg);
continue;
}
// Stale id (#732): pending was already abandoned. Falls through
// to subscriber forwarding; the adapter recv loop filters by
// request_id so it can't leak into the next prompt.
trace!(request_id = id, "stale id-bearing message after abandon");
}

// Notification → forward to subscriber
let sub = notify_tx.lock().await;
if let Some(tx) = sub.as_ref() {
let _ = tx.send(msg);
}
}

// Connection closed — resolve all pending with error
let mut map = pending.lock().await;
for (_, tx) in map.drain() {
let _ = tx.send(JsonRpcMessage {
id: None,
method: None,
result: None,
error: Some(crate::acp::protocol::JsonRpcError {
code: -1,
message: "connection closed".into(),
}),
params: None,
});
}
// Close the notify channel so rx.recv() returns None
let mut sub = notify_tx.lock().await;
*sub = None;
}

impl AcpConnection {
pub async fn spawn(
command: &str,
Expand Down Expand Up @@ -254,99 +360,12 @@ impl AcpConnection {
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let reader_handle = {
let pending = pending.clone();
let notify_tx = notify_tx.clone();
let stdin_clone = stdin.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {}
Err(e) => {
error!("reader error: {e}");
break;
}
}
let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) {
Ok(m) => m,
Err(_) => continue,
};
debug!(line = line.trim(), "acp_recv");

// Auto-reply session/request_permission
if msg.method.as_deref() == Some("session/request_permission") {
if let Some(id) = msg.id {
let title = msg
.params
.as_ref()
.and_then(|p| p.get("toolCall"))
.and_then(|t| t.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("?");

let outcome = build_permission_response(msg.params.as_ref());
info!(title, %outcome, "auto-respond permission");
let reply = JsonRpcResponse::new(id, outcome);
if let Ok(data) = serde_json::to_string(&reply) {
let mut w = stdin_clone.lock().await;
let _ = w.write_all(format!("{data}\n").as_bytes()).await;
let _ = w.flush().await;
}
}
continue;
}

// Response (has id) → resolve pending AND forward to subscriber
if let Some(id) = msg.id {
let mut map = pending.lock().await;
if let Some(tx) = map.remove(&id) {
// Forward to subscriber so they see the completion
let sub = notify_tx.lock().await;
if let Some(ntx) = sub.as_ref() {
// Clone the essential fields for the subscriber
let _ = ntx.send(JsonRpcMessage {
id: Some(id),
method: None,
result: msg.result.clone(),
error: msg.error.clone(),
params: None,
});
}
let _ = tx.send(msg);
continue;
}
}

// Notification → forward to subscriber
let sub = notify_tx.lock().await;
if let Some(tx) = sub.as_ref() {
let _ = tx.send(msg);
}
}

// Connection closed — resolve all pending with error
let mut map = pending.lock().await;
for (_, tx) in map.drain() {
let _ = tx.send(JsonRpcMessage {
id: None,
method: None,
result: None,
error: Some(crate::acp::protocol::JsonRpcError {
code: -1,
message: "connection closed".into(),
}),
params: None,
});
}
// Close the notify channel so rx.recv() returns None
let mut sub = notify_tx.lock().await;
*sub = None;
})
};
let reader_handle = tokio::spawn(run_reader_loop(
stdout,
stdin.clone(),
pending.clone(),
notify_tx.clone(),
));

Ok(Self {
_proc: proc,
Expand Down Expand Up @@ -557,6 +576,26 @@ impl AcpConnection {
self.last_active = Instant::now();
}

/// Drop the pending entry for `request_id` and best-effort send
/// `session/cancel` as a JSON-RPC notification (no id; per ACP spec the
/// agent does not reply). Errors are swallowed: the agent process may
/// already be dead, in which case the stdin write fails harmlessly.
/// See #732.
pub async fn abandon_request(&self, request_id: u64) {
self.pending.lock().await.remove(&request_id);
let Some(session_id) = self.acp_session_id.as_deref() else {
return;
};
let req = json!({
"jsonrpc": "2.0",
"method": "session/cancel",
"params": {"sessionId": session_id},
});
if let Ok(data) = serde_json::to_string(&req) {
let _ = self.send_raw(&data).await;
}
}

/// Return a clone of the stdin handle for lock-free cancel.
pub fn cancel_handle(&self) -> Arc<Mutex<ChildStdin>> {
Arc::clone(&self.stdin)
Expand Down Expand Up @@ -758,3 +797,105 @@ mod tests {
assert!(inherited.is_empty());
}
}

#[cfg(test)]
mod reader_loop_tests {
use super::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{duplex, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot, Mutex};

/// #732 stale-id path: when a response arrives for an id the broker has
/// already abandoned, the reader must (a) not crash, (b) leave `pending`
/// untouched, and (c) still forward the message to whoever is currently
/// subscribed — the adapter recv loop is responsible for filtering by
/// request_id so the stray response never leaks into the next prompt.
#[tokio::test]
async fn stale_id_response_is_forwarded_without_pending_entry() {
let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024);
let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024);

let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(HashMap::new()));
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();
*notify_tx.lock().await = Some(sub_tx);

let writer = Arc::new(Mutex::new(agent_stdin_writer));
let handle = tokio::spawn(run_reader_loop(
agent_stdout_reader,
writer,
pending.clone(),
notify_tx.clone(),
));

let stale = b"{\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"stopReason\":\"ok\"}}\n";
agent_stdout_writer.write_all(stale).await.unwrap();
agent_stdout_writer.flush().await.unwrap();

let forwarded = tokio::time::timeout(
std::time::Duration::from_secs(2),
sub_rx.recv(),
)
.await
.expect("subscriber should receive stale message before timeout")
.expect("subscriber channel should not be closed");
assert_eq!(forwarded.id, Some(42));
assert!(pending.lock().await.is_empty());

drop(agent_stdout_writer);
handle.await.unwrap();
}

/// Matched-id path: when a response's id is in `pending`, the loop must
/// resolve the oneshot AND forward a copy to the subscriber so the
/// adapter's recv loop sees the completion. Guards against regressions
/// that would suppress the forward branch while keeping resolve.
#[tokio::test]
async fn matched_id_response_resolves_pending_and_forwards() {
let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024);
let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024);

let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(HashMap::new()));
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let (resp_tx, resp_rx) = oneshot::channel();
pending.lock().await.insert(7, resp_tx);

let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();
*notify_tx.lock().await = Some(sub_tx);

let writer = Arc::new(Mutex::new(agent_stdin_writer));
let handle = tokio::spawn(run_reader_loop(
agent_stdout_reader,
writer,
pending.clone(),
notify_tx.clone(),
));

let payload = b"{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":{\"stopReason\":\"end_turn\"}}\n";
agent_stdout_writer.write_all(payload).await.unwrap();
agent_stdout_writer.flush().await.unwrap();

let resolved = tokio::time::timeout(std::time::Duration::from_secs(2), resp_rx)
.await
.expect("oneshot should resolve")
.expect("oneshot should not be cancelled");
assert_eq!(resolved.id, Some(7));

let forwarded = tokio::time::timeout(std::time::Duration::from_secs(2), sub_rx.recv())
.await
.expect("subscriber should receive forwarded copy")
.expect("subscriber channel should not be closed");
assert_eq!(forwarded.id, Some(7));
assert!(pending.lock().await.is_empty());

drop(agent_stdout_writer);
handle.await.unwrap();
}
}
Loading
Loading