Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 123 additions & 31 deletions codex-rs/exec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ struct ExecRunArgs {
stderr_with_ansi: bool,
}

#[derive(Debug, PartialEq, Eq)]
enum ResumeTarget {
StartNewThread,
ResumePath(PathBuf),
}

fn exec_root_span() -> tracing::Span {
info_span!(
"codex.exec",
Expand Down Expand Up @@ -548,11 +554,11 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
})?;

// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let (primary_thread_id, fallback_session_configured) =
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
let resume_path = resolve_resume_path(&config, args).await?;

if let Some(path) = resume_path {
let (primary_thread_id, fallback_session_configured) = if let Some(ExecCommand::Resume(args)) =
command.as_ref()
{
match resolve_resume_target(&config, args).await? {
ResumeTarget::ResumePath(path) => {
let response: ThreadResumeResponse = send_request_with_response(
&client,
ClientRequest::ThreadResume {
Expand All @@ -566,7 +572,8 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
let session_configured = session_configured_from_thread_resume_response(&response)
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
} else {
}
ResumeTarget::StartNewThread => {
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
Expand All @@ -581,21 +588,22 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
}
} else {
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
request_id: request_ids.next(),
params: thread_start_params_from_config(&config),
},
"thread/start",
)
.await
.map_err(anyhow::Error::msg)?;
let session_configured = session_configured_from_thread_start_response(&response)
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
};
}
} else {
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
request_id: request_ids.next(),
params: thread_start_params_from_config(&config),
},
"thread/start",
)
.await
.map_err(anyhow::Error::msg)?;
let session_configured =
session_configured_from_thread_start_response(&response).map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
};

let primary_thread_id_for_span = primary_thread_id.to_string();
let mut buffered_events = VecDeque::new();
Expand Down Expand Up @@ -1407,10 +1415,10 @@ fn local_external_chatgpt_tokens(
})
}

async fn resolve_resume_path(
async fn resolve_resume_target(
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
) -> anyhow::Result<ResumeTarget> {
if args.last {
let default_provider_filter = vec![config.model_provider_id.clone()];
let filter_cwd = if args.all {
Expand All @@ -1430,22 +1438,26 @@ async fn resolve_resume_path(
)
.await
{
Ok(path) => Ok(path),
Ok(Some(path)) => Ok(ResumeTarget::ResumePath(path)),
Ok(None) => Ok(ResumeTarget::StartNewThread),
Err(e) => {
error!("Error listing threads: {e}");
Ok(None)
Ok(ResumeTarget::StartNewThread)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
if Uuid::parse_str(id_str).is_ok() {
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
let path = if Uuid::parse_str(id_str).is_ok() {
find_thread_path_by_id_str(&config.codex_home, id_str).await?
} else {
let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?;
Ok(path)
find_thread_path_by_name_str(&config.codex_home, id_str).await?
};

match path {
Some(path) => Ok(ResumeTarget::ResumePath(path)),
None => anyhow::bail!("Session not found: {id_str}"),
}
} else {
Ok(None)
Ok(ResumeTarget::StartNewThread)
}
}

Expand Down Expand Up @@ -1628,6 +1640,8 @@ mod tests {
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
use pretty_assertions::assert_eq;
use std::io::Write;
use std::path::Path;
use tempfile::tempdir;
use tracing_opentelemetry::OpenTelemetrySpanExt;

Expand All @@ -1637,6 +1651,43 @@ mod tests {
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer))
}

fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf {
let sessions = codex_home.join("sessions/2024/01/01");
std::fs::create_dir_all(&sessions).expect("create rollout directory");

let file = sessions.join(format!("rollout-2024-01-01T00-00-00-{id}.jsonl"));
let mut rollout = std::fs::File::create(&file).expect("create rollout file");
writeln!(
rollout,
"{}",
serde_json::json!({
"timestamp": "2024-01-01T00:00:00.000Z",
"type": "session_meta",
"payload": {
"id": id,
"timestamp": "2024-01-01T00:00:00Z",
"cwd": ".",
"originator": "test",
"cli_version": "test",
"model_provider": "test-provider"
}
})
)
.expect("write rollout");

file
}

fn make_resume_args(session_id: Option<&str>, last: bool) -> crate::cli::ResumeArgs {
crate::cli::ResumeArgs {
session_id: session_id.map(str::to_string),
last,
all: false,
images: Vec::new(),
prompt: None,
}
}

#[test]
fn exec_defaults_analytics_to_enabled() {
assert_eq!(DEFAULT_ANALYTICS_ENABLED, true);
Expand Down Expand Up @@ -1723,6 +1774,47 @@ mod tests {
assert_eq!(request, expected);
}

#[tokio::test]
async fn resolve_resume_target_errors_for_missing_explicit_session_id() {
let codex_home = tempdir().expect("create temp codex home");
let cwd = tempdir().expect("create temp cwd");
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(cwd.path().to_path_buf()))
.build()
.await
.expect("build default config");
let missing_id = "00000000-0000-0000-0000-000000000000";
let args = make_resume_args(Some(missing_id), false);

let err = resolve_resume_target(&config, &args)
.await
.expect_err("missing explicit session id should error");

assert_eq!(err.to_string(), format!("Session not found: {missing_id}"));
}

#[tokio::test]
async fn resolve_resume_target_returns_existing_explicit_session_path() {
let codex_home = tempdir().expect("create temp codex home");
let cwd = tempdir().expect("create temp cwd");
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(cwd.path().to_path_buf()))
.build()
.await
.expect("build default config");
let session_id = Uuid::new_v4();
let expected = write_minimal_rollout_with_id(codex_home.path(), session_id);
let args = make_resume_args(Some(&session_id.to_string()), false);

let target = resolve_resume_target(&config, &args)
.await
.expect("existing explicit session id should resolve");

assert_eq!(target, ResumeTarget::ResumePath(expected));
}

#[test]
fn decode_prompt_bytes_strips_utf8_bom() {
let input = [0xEF, 0xBB, 0xBF, b'h', b'i', b'\n'];
Expand Down
131 changes: 131 additions & 0 deletions codex-rs/exec/tests/suite/resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ fn last_user_image_count(path: &std::path::Path) -> usize {
last_count
}

fn session_rollout_count(home_path: &std::path::Path) -> usize {
let sessions_dir = home_path.join("sessions");
if !sessions_dir.exists() {
return 0;
}

WalkDir::new(sessions_dir)
.into_iter()
.filter_map(Result::ok)
.filter(|entry| entry.file_type().is_file())
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".jsonl"))
.count()
}

fn extract_thread_started_id_from_jsonl(stdout: &[u8]) -> Option<String> {
let stdout = std::str::from_utf8(stdout).ok()?;
for line in stdout.lines() {
if line.trim().is_empty() {
continue;
}
let item: Value = serde_json::from_str(line).ok()?;
if item.get("type").and_then(|value| value.as_str()) == Some("thread.started") {
return item
.get("thread_id")
.and_then(|value| value.as_str())
.map(str::to_string);
}
}
None
}

fn exec_fixture() -> anyhow::Result<std::path::PathBuf> {
Ok(find_resource!("tests/fixtures/cli_responses_fixture.sse")?)
}
Expand Down Expand Up @@ -559,3 +590,103 @@ fn exec_resume_accepts_images_after_subcommand() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn exec_resume_by_missing_id_fails_without_starting_new_session() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture = exec_fixture()?;
let repo_root = exec_repo_root()?;
let missing_id = "00000000-0000-0000-0000-000000000000";
let marker = format!("resume-missing-id-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");

let output = test
.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("resume")
.arg(missing_id)
.arg(&prompt)
.output()
.context("resume by missing id should return an error")?;

assert!(
!output.status.success(),
"resume by missing id unexpectedly succeeded: {output:?}"
);

let stderr = String::from_utf8(output.stderr)?;
assert!(
stderr.contains(&format!("Session not found: {missing_id}")),
"stderr missing not-found message: {stderr}"
);
assert_eq!(session_rollout_count(test.home_path()), 0);
assert!(
find_session_file_containing_marker(&test.home_path().join("sessions"), &marker).is_none(),
"resume by missing id should not create a new session rollout"
);

Ok(())
}

#[test]
fn exec_resume_by_ephemeral_id_fails_without_starting_new_session() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture = exec_fixture()?;
let repo_root = exec_repo_root()?;
let seed_output = test
.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("--ephemeral")
.arg("--json")
.arg("-C")
.arg(&repo_root)
.arg("echo ephemeral-seed")
.output()
.context("ephemeral seed run should succeed")?;

assert!(
seed_output.status.success(),
"ephemeral seed failed: {seed_output:?}"
);
let ephemeral_session_id = extract_thread_started_id_from_jsonl(&seed_output.stdout)
.context("missing thread.started id in ephemeral json output")?;

let marker = format!("resume-ephemeral-id-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
let output = test
.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("resume")
.arg(&ephemeral_session_id)
.arg(&prompt)
.output()
.context("resume by ephemeral id should return an error")?;

assert!(
!output.status.success(),
"resume by ephemeral id unexpectedly succeeded: {output:?}"
);

let stderr = String::from_utf8(output.stderr)?;
assert!(
stderr.contains(&format!("Session not found: {ephemeral_session_id}")),
"stderr missing ephemeral not-found message: {stderr}"
);
assert_eq!(session_rollout_count(test.home_path()), 0);
assert!(
find_session_file_containing_marker(&test.home_path().join("sessions"), &marker).is_none(),
"resume by ephemeral id should not create a new session rollout"
);

Ok(())
}