-
Notifications
You must be signed in to change notification settings - Fork 3k
feat(runtime): bridge user-input events and API to external GUI clients #2133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e660a52
4a6404d
2ab830c
089e57a
b656453
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -296,6 +296,25 @@ struct DecideApprovalResponse { | |||||||||||||||||||||
| delivered: bool, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #[derive(Debug, Deserialize)] | ||||||||||||||||||||||
| struct SubmitUserInputBody { | ||||||||||||||||||||||
| answers: Vec<UserInputAnswerBody>, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #[derive(Debug, Deserialize)] | ||||||||||||||||||||||
| struct UserInputAnswerBody { | ||||||||||||||||||||||
| id: String, | ||||||||||||||||||||||
| label: String, | ||||||||||||||||||||||
| value: String, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #[derive(Debug, Serialize)] | ||||||||||||||||||||||
| struct SubmitUserInputResponse { | ||||||||||||||||||||||
| ok: bool, | ||||||||||||||||||||||
| input_id: String, | ||||||||||||||||||||||
| delivered: bool, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #[derive(Debug, Serialize)] | ||||||||||||||||||||||
| struct RuntimeInfoResponse { | ||||||||||||||||||||||
| bind_host: String, | ||||||||||||||||||||||
|
|
@@ -500,6 +519,10 @@ pub fn build_router(state: RuntimeApiState) -> Router { | |||||||||||||||||||||
| .route("/v1/threads/{id}/compact", post(compact_thread)) | ||||||||||||||||||||||
| .route("/v1/threads/{id}/events", get(stream_thread_events)) | ||||||||||||||||||||||
| .route("/v1/approvals/{approval_id}", post(decide_approval)) | ||||||||||||||||||||||
| .route( | ||||||||||||||||||||||
| "/v1/user-input/{thread_id}/{input_id}", | ||||||||||||||||||||||
| post(submit_user_input), | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| .route("/v1/tasks", get(list_tasks).post(create_task)) | ||||||||||||||||||||||
| .route("/v1/tasks/{id}", get(get_task)) | ||||||||||||||||||||||
| .route("/v1/tasks/{id}/cancel", post(cancel_task)) | ||||||||||||||||||||||
|
|
@@ -984,6 +1007,34 @@ async fn decide_approval( | |||||||||||||||||||||
| })) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async fn submit_user_input( | ||||||||||||||||||||||
| State(state): State<RuntimeApiState>, | ||||||||||||||||||||||
| Path((thread_id, input_id)): Path<(String, String)>, | ||||||||||||||||||||||
| Json(req): Json<SubmitUserInputBody>, | ||||||||||||||||||||||
| ) -> Result<Json<SubmitUserInputResponse>, ApiError> { | ||||||||||||||||||||||
| use crate::tools::user_input::{UserInputAnswer, UserInputResponse}; | ||||||||||||||||||||||
| let answers: Vec<UserInputAnswer> = req | ||||||||||||||||||||||
| .answers | ||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||
| .map(|a| UserInputAnswer { | ||||||||||||||||||||||
| id: a.id, | ||||||||||||||||||||||
| label: a.label, | ||||||||||||||||||||||
| value: a.value, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| .collect(); | ||||||||||||||||||||||
| let response = UserInputResponse { answers }; | ||||||||||||||||||||||
| let delivered = state | ||||||||||||||||||||||
| .runtime_threads | ||||||||||||||||||||||
| .submit_user_input(&thread_id, &input_id, response) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .map_err(map_thread_err)?; | ||||||||||||||||||||||
| Ok(Json(SubmitUserInputResponse { | ||||||||||||||||||||||
| ok: true, | ||||||||||||||||||||||
| input_id, | ||||||||||||||||||||||
| delivered, | ||||||||||||||||||||||
| })) | ||||||||||||||||||||||
|
Comment on lines
+1031
to
+1035
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+1010
to
+1036
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This diverges from the approval flow, where |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async fn runtime_info(State(state): State<RuntimeApiState>) -> Json<RuntimeInfoResponse> { | ||||||||||||||||||||||
| Json(RuntimeInfoResponse { | ||||||||||||||||||||||
| bind_host: state.bind_host.clone(), | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -833,6 +833,30 @@ impl RuntimeThreadManager { | |||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| pub async fn submit_user_input( | ||||||||
| &self, | ||||||||
| thread_id: &str, | ||||||||
| input_id: &str, | ||||||||
| response: crate::tools::user_input::UserInputResponse, | ||||||||
| ) -> Result<bool> { | ||||||||
| let active = self.active.lock().await; | ||||||||
| let Some(state) = active.engines.get(thread_id) else { | ||||||||
| bail!("thread '{thread_id}' not found"); | ||||||||
| }; | ||||||||
| state.engine.submit_user_input(input_id, response).await?; | ||||||||
| Ok(true) | ||||||||
| } | ||||||||
|
|
||||||||
| #[allow(dead_code)] | ||||||||
| pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> { | ||||||||
|
Comment on lines
+850
to
+851
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| let active = self.active.lock().await; | ||||||||
| let Some(state) = active.engines.get(thread_id) else { | ||||||||
| bail!("thread '{thread_id}' not found"); | ||||||||
| }; | ||||||||
| state.engine.cancel_user_input(input_id).await?; | ||||||||
| Ok(true) | ||||||||
| } | ||||||||
|
|
||||||||
| #[allow(dead_code)] | ||||||||
| pub fn pending_approvals_count(&self) -> usize { | ||||||||
| self.pending_approvals | ||||||||
|
|
@@ -2782,6 +2806,19 @@ impl RuntimeThreadManager { | |||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| EngineEvent::UserInputRequired { id, request } => { | ||||||||
| self.emit_event( | ||||||||
| &thread_id, | ||||||||
| Some(&turn_id), | ||||||||
| None, | ||||||||
| "user_input.required", | ||||||||
| json!({ | ||||||||
| "id": id, | ||||||||
| "request": request, | ||||||||
| }), | ||||||||
| ) | ||||||||
| .await?; | ||||||||
| } | ||||||||
| EngineEvent::Status { message } => { | ||||||||
| let item = TurnItemRecord { | ||||||||
| schema_version: CURRENT_RUNTIME_SCHEMA_VERSION, | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::time::timeout(USER_INPUT_TIMEOUT, ...)is constructed fresh on each loop iteration. When aUserInputDecisionarrives for a differenttool_id(the_ => continuebranch), the loop restarts and the 5-minute clock resets. This means the total wait is unbounded: any stream of unrelated decisions (e.g., a concurrent user-input tool call, or a stale channel message from a prior request) keeps deferring the timeout indefinitely, defeating the disconnected-GUI protection.The fix is to start the sleep outside the loop so it counts down from a single point in time, then select on it as a separate arm.