Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions crates/tui/src/core/engine/approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
//! or whenever a tool requests live user input (`await_user_input`). Channels
//! and engine state stay private to the parent module.

use std::time::Duration;

use crate::core::events::Event;
use crate::tools::spec::ToolError;
use crate::tools::user_input::{UserInputRequest, UserInputResponse};

const USER_INPUT_TIMEOUT: Duration = Duration::from_secs(300);

use super::Engine;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -123,22 +127,43 @@ impl Engine {
format!("Request cancelled while awaiting user input{suffix}"),
));
}
decision = self.rx_user_input.recv() => {
let Some(decision) = decision else {
return Err(ToolError::execution_failed(
"User input channel closed".to_string(),
));
};
match decision {
UserInputDecision::Submitted { id, response } if id == tool_id => {
return Ok(response);
result = tokio::time::timeout(USER_INPUT_TIMEOUT, self.rx_user_input.recv()) => {
match result {
Ok(Some(decision)) => {
match decision {
UserInputDecision::Submitted { id, response } if id == tool_id => {
return Ok(response);
}
UserInputDecision::Cancelled { id } if id == tool_id => {
return Err(ToolError::execution_failed(
"User input cancelled".to_string(),
));
}
_ => continue,
}
}
UserInputDecision::Cancelled { id } if id == tool_id => {
Ok(None) => {
return Err(ToolError::execution_failed(
"User input cancelled".to_string(),
"User input channel closed".to_string(),
));
}
Err(_) => {
let _ = self
.tx_event
.send(Event::Status {
message: format!(
"User input timed out after {}s",
USER_INPUT_TIMEOUT.as_secs()
),
})
.await;
return Err(ToolError::execution_failed(
format!(
"User input timed out after {}s",
USER_INPUT_TIMEOUT.as_secs()
),
));
}
_ => continue,
}
}
Comment on lines +130 to 168
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Timeout resets on every mismatched decision

tokio::time::timeout(USER_INPUT_TIMEOUT, ...) is constructed fresh on each loop iteration. When a UserInputDecision arrives for a different tool_id (the _ => continue branch), the loop restarts and the 5-minute clock resets. This means the total wait is unbounded: any stream of unrelated decisions (e.g., a concurrent user-input tool call, or a stale channel message from a prior request) keeps deferring the timeout indefinitely, defeating the disconnected-GUI protection.

The fix is to start the sleep outside the loop so it counts down from a single point in time, then select on it as a separate arm.

Fix in Codex Fix in Claude Code Fix in Cursor

}
Expand Down
51 changes: 51 additions & 0 deletions crates/tui/src/runtime_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,25 @@ struct DecideApprovalResponse {
delivered: bool,
}

#[derive(Debug, Deserialize)]
struct SubmitUserInputBody {
answers: Vec<UserInputAnswerBody>,
}

#[derive(Debug, Deserialize)]
struct UserInputAnswerBody {
id: String,
label: String,
value: String,
}

#[derive(Debug, Serialize)]
struct SubmitUserInputResponse {
ok: bool,
input_id: String,
delivered: bool,
}

#[derive(Debug, Serialize)]
struct RuntimeInfoResponse {
bind_host: String,
Expand Down Expand Up @@ -500,6 +519,10 @@ pub fn build_router(state: RuntimeApiState) -> Router {
.route("/v1/threads/{id}/compact", post(compact_thread))
.route("/v1/threads/{id}/events", get(stream_thread_events))
.route("/v1/approvals/{approval_id}", post(decide_approval))
.route(
"/v1/user-input/{thread_id}/{input_id}",
post(submit_user_input),
)
.route("/v1/tasks", get(list_tasks).post(create_task))
.route("/v1/tasks/{id}", get(get_task))
.route("/v1/tasks/{id}/cancel", post(cancel_task))
Expand Down Expand Up @@ -984,6 +1007,34 @@ async fn decide_approval(
}))
}

async fn submit_user_input(
State(state): State<RuntimeApiState>,
Path((thread_id, input_id)): Path<(String, String)>,
Json(req): Json<SubmitUserInputBody>,
) -> Result<Json<SubmitUserInputResponse>, ApiError> {
use crate::tools::user_input::{UserInputAnswer, UserInputResponse};
let answers: Vec<UserInputAnswer> = req
.answers
.into_iter()
.map(|a| UserInputAnswer {
id: a.id,
label: a.label,
value: a.value,
})
.collect();
let response = UserInputResponse { answers };
let delivered = state
.runtime_threads
.submit_user_input(&thread_id, &input_id, response)
.await
.map_err(map_thread_err)?;
Ok(Json(SubmitUserInputResponse {
ok: true,
input_id,
delivered,
}))
Comment on lines +1031 to +1035
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The delivered field is structurally always true here — submit_user_input either returns Ok(true) or propagates an error (in which case the handler already returned early). This misleads API consumers who may interpret it as confirmation that the answer was actually consumed by a waiting tool.

Suggested change
Ok(Json(SubmitUserInputResponse {
ok: true,
input_id,
delivered,
}))
Ok(Json(SubmitUserInputResponse {
ok: true,
input_id,
delivered: true,
}))

Fix in Codex Fix in Claude Code Fix in Cursor

}
Comment on lines +1010 to +1036
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Stale or wrong input_id silently returns 200 OK

submit_user_input sends the decision onto the channel and always returns Ok(true). If a GUI POSTs with a wrong or already-expired input_id, the decision is silently discarded by the _ => continue branch in await_user_input — the caller gets {"ok":true,"delivered":true} even though no waiting tool received it.

This diverges from the approval flow, where deliver_external_approval checks whether a pending entry exists and returns false (surfaced as HTTP 404) when none is found. Without a similar pending-input registry, a GUI client has no way to detect that it POSTed too late or used the wrong ID, which can cause silent hangs on the GUI side.

Fix in Codex Fix in Claude Code Fix in Cursor


async fn runtime_info(State(state): State<RuntimeApiState>) -> Json<RuntimeInfoResponse> {
Json(RuntimeInfoResponse {
bind_host: state.bind_host.clone(),
Expand Down
37 changes: 37 additions & 0 deletions crates/tui/src/runtime_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,30 @@ impl RuntimeThreadManager {
}
}

pub async fn submit_user_input(
&self,
thread_id: &str,
input_id: &str,
response: crate::tools::user_input::UserInputResponse,
) -> Result<bool> {
let active = self.active.lock().await;
let Some(state) = active.engines.get(thread_id) else {
bail!("thread '{thread_id}' not found");
};
state.engine.submit_user_input(input_id, response).await?;
Ok(true)
}

#[allow(dead_code)]
pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> {
Comment on lines +850 to +851
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 cancel_user_input is dead code with no REST endpoint, meaning GUIs have no way to signal that the user dismissed the input dialog. This leaves the engine waiting the full 5 minutes on a cancelled interaction. Consider either exposing a DELETE /v1/user-input/{thread_id}/{input_id} endpoint or removing the #[allow(dead_code)] suppression until the endpoint is wired up.

Suggested change
#[allow(dead_code)]
pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> {
pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> {

Fix in Codex Fix in Claude Code Fix in Cursor

let active = self.active.lock().await;
let Some(state) = active.engines.get(thread_id) else {
bail!("thread '{thread_id}' not found");
};
state.engine.cancel_user_input(input_id).await?;
Ok(true)
}

#[allow(dead_code)]
pub fn pending_approvals_count(&self) -> usize {
self.pending_approvals
Expand Down Expand Up @@ -2782,6 +2806,19 @@ impl RuntimeThreadManager {
}
}
}
EngineEvent::UserInputRequired { id, request } => {
self.emit_event(
&thread_id,
Some(&turn_id),
None,
"user_input.required",
json!({
"id": id,
"request": request,
}),
)
.await?;
}
EngineEvent::Status { message } => {
let item = TurnItemRecord {
schema_version: CURRENT_RUNTIME_SCHEMA_VERSION,
Expand Down
Loading