Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions codex-rs/exec/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,33 @@ pub enum Color {
#[default]
Auto,
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;

#[test]
fn resume_parses_prompt_after_global_flags() {
let cli = Cli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
"echo resume-with-global-flags-after-subcommand",
]);

let Some(Command::Resume(args)) = cli.command else {
panic!("expected resume command");
};
assert_eq!(
args.session_id.as_deref(),
Some("echo resume-with-global-flags-after-subcommand")
);
assert_eq!(args.prompt, None);
}
}
128 changes: 94 additions & 34 deletions codex-rs/exec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::collections::HashSet;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use supports_color::Stream;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;

Expand All @@ -72,6 +76,13 @@ enum InitialOperation {
},
}

#[derive(Clone)]
struct ThreadEventEnvelope {
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
event: Event,
}

pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
Expand Down Expand Up @@ -326,19 +337,19 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let thread_manager = ThreadManager::new(
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Exec,
);
));
let default_model = thread_manager
.get_models_manager()
.get_default_model(&config.model, &config, RefreshStrategy::OnlineIfUncached)
.await;

// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewThread {
thread_id: _,
thread_id: primary_thread_id,
thread,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command.as_ref() {
Expand Down Expand Up @@ -420,40 +431,47 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any

info!("Codex initialized with event: {session_configured:?}");

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ThreadEventEnvelope>();
let attached_threads = Arc::new(Mutex::new(HashSet::from([primary_thread_id])));
spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone());

{
let thread = thread.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any in-flight task.
thread.submit(Op::Interrupt).await.ok();
}
});
}

{
let thread_manager = Arc::clone(&thread_manager);
let attached_threads = Arc::clone(&attached_threads);
let tx = tx.clone();
let mut thread_created_rx = thread_manager.subscribe_thread_created();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any in‑flight task.
thread.submit(Op::Interrupt).await.ok();

// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = thread.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");

let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(e) = tx.send(event) {
error!("Error sending event: {e:?}");
break;
match thread_created_rx.recv().await {
Ok(thread_id) => {
if attached_threads.lock().await.contains(&thread_id) {
continue;
}
match thread_manager.get_thread(thread_id).await {
Ok(thread) => {
attached_threads.lock().await.insert(thread_id);
spawn_thread_listener(thread_id, thread, tx.clone());
}
if is_shutdown_complete {
info!("Received shutdown event, exiting event loop.");
break;
Err(err) => {
warn!("failed to attach listener for thread {thread_id}: {err}")
}
},
Err(e) => {
error!("Error receiving event: {e:?}");
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
Expand Down Expand Up @@ -492,7 +510,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
while let Some(envelope) = rx.recv().await {
let ThreadEventEnvelope {
thread_id,
thread,
event,
} = envelope;
if let EventMsg::ElicitationRequest(ev) = &event.msg {
// Automatically cancel elicitation requests in exec mode.
thread
Expand All @@ -506,15 +529,16 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
let shutdown = event_processor.process_event(event);
if thread_id != primary_thread_id && matches!(shutdown, CodexStatus::InitiateShutdown) {
continue;
}
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
thread.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
}
CodexStatus::Shutdown => break,
}
}
event_processor.print_final_output();
Expand All @@ -525,6 +549,42 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
Ok(())
}

fn spawn_thread_listener(
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
tx: tokio::sync::mpsc::UnboundedSender<ThreadEventEnvelope>,
) {
tokio::spawn(async move {
loop {
match thread.next_event().await {
Ok(event) => {
debug!("Received event: {event:?}");

let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(err) = tx.send(ThreadEventEnvelope {
thread_id,
thread: Arc::clone(&thread),
event,
}) {
error!("Error sending event: {err:?}");
break;
}
if is_shutdown_complete {
info!(
"Received shutdown event for thread {thread_id}, exiting event loop."
);
break;
}
}
Err(err) => {
error!("Error receiving event: {err:?}");
break;
}
}
}
});
}

async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
Expand Down
34 changes: 34 additions & 0 deletions codex-rs/exec/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,37 @@ fn main() -> anyhow::Result<()> {
Ok(())
})
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;

#[test]
fn top_cli_parses_resume_prompt_after_config_flag() {
let cli = TopCli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--config",
"reasoning_level=xhigh",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
"echo resume-with-global-flags-after-subcommand",
]);

let Some(codex_exec::Command::Resume(args)) = cli.inner.command else {
panic!("expected resume command");
};
assert_eq!(
args.session_id.as_deref(),
Some("echo resume-with-global-flags-after-subcommand")
);
assert_eq!(args.prompt, None);
assert_eq!(cli.config_overrides.raw_overrides.len(), 1);
assert_eq!(cli.config_overrides.raw_overrides[0], "reasoning_level=xhigh");
}
}
Loading