Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
449 changes: 449 additions & 0 deletions sdk/python/tests/app_server_harness.py

Large diffs are not rendered by default.

168 changes: 168 additions & 0 deletions sdk/python/tests/app_server_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from __future__ import annotations

from collections.abc import AsyncIterator, Iterable, Iterator
from typing import Any

from app_server_harness import (
ev_assistant_message,
ev_completed,
ev_message_item_added,
ev_output_text_delta,
ev_response_created,
sse,
)
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
)
from openai_codex.models import Notification

TINY_PNG_BYTES = bytes(
[
137,
80,
78,
71,
13,
10,
26,
10,
0,
0,
0,
13,
73,
72,
68,
82,
0,
0,
0,
1,
0,
0,
0,
1,
8,
6,
0,
0,
0,
31,
21,
196,
137,
0,
0,
0,
11,
73,
68,
65,
84,
120,
156,
99,
96,
0,
2,
0,
0,
5,
0,
1,
122,
94,
171,
63,
0,
0,
0,
0,
73,
69,
78,
68,
174,
66,
96,
130,
]
)


def response_approval_policy(response: Any) -> str:
"""Return serialized approvalPolicy from a generated thread response."""
return response.model_dump(by_alias=True, mode="json")["approvalPolicy"]


def agent_message_texts(events: list[Notification]) -> list[str]:
"""Extract completed agent-message text from SDK notifications."""
texts: list[str] = []
for event in events:
if not isinstance(event.payload, ItemCompletedNotification):
continue
item = event.payload.item.root
if item.type == "agentMessage":
texts.append(item.text)
return texts


def agent_message_texts_from_items(items: Iterable[Any]) -> list[str]:
"""Extract agent-message text from completed run result items."""
texts: list[str] = []
for item in items:
root = item.root
if root.type == "agentMessage":
texts.append(root.text)
return texts


def next_sync_delta(stream: Iterator[Notification]) -> str:
"""Advance a sync turn stream until the next agent-message text delta."""
for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")


async def next_async_delta(stream: AsyncIterator[Notification]) -> str:
"""Advance an async turn stream until the next agent-message text delta."""
async for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")


def streaming_response(response_id: str, item_id: str, parts: list[str]) -> str:
"""Build an SSE stream with text deltas and a final assistant message."""
return sse(
[
ev_response_created(response_id),
ev_message_item_added(item_id),
*[ev_output_text_delta(part) for part in parts],
ev_assistant_message(item_id, "".join(parts)),
ev_completed(response_id),
]
)


def assistant_message_with_phase(
item_id: str,
text: str,
phase: MessagePhase,
) -> dict[str, Any]:
"""Build an assistant message event carrying app-server phase metadata."""
event = ev_assistant_message(item_id, text)
event["item"] = {**event["item"], "phase": phase.value}
return event


def request_kind(request_path: str) -> str:
"""Classify captured mock-server request paths for compact assertions."""
if request_path.endswith("/responses/compact"):
return "compact"
if request_path.endswith("/responses"):
return "responses"
return request_path
207 changes: 207 additions & 0 deletions sdk/python/tests/test_app_server_approvals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from __future__ import annotations

import asyncio

from app_server_harness import AppServerHarness
from openai_codex import ApprovalMode, AsyncCodex, Codex
from openai_codex.generated.v2_all import AskForApprovalValue, ThreadResumeParams
from app_server_helpers import response_approval_policy


def test_thread_resume_inherits_deny_all_approval_mode(tmp_path) -> None:
"""Resuming a thread should preserve its stored approval mode."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("source seeded", response_id="resume-mode")

with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
resumed = codex.thread_resume(source.id)
resumed_state = codex._client.thread_resume( # noqa: SLF001
resumed.id,
ThreadResumeParams(thread_id=resumed.id),
)

assert {
"final_response": result.final_response,
"resumed_policy": response_approval_policy(resumed_state),
} == {
"final_response": "source seeded",
"resumed_policy": AskForApprovalValue.never.value,
}


def test_thread_fork_inherits_deny_all_approval_mode(tmp_path) -> None:
"""Forking without an override should preserve the source approval mode."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("source seeded", response_id="fork-mode")

with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
forked = codex.thread_fork(source.id)
forked_state = codex._client.thread_resume( # noqa: SLF001
forked.id,
ThreadResumeParams(thread_id=forked.id),
)

assert {
"final_response": result.final_response,
"forked_is_distinct": forked.id != source.id,
"forked_policy": response_approval_policy(forked_state),
} == {
"final_response": "source seeded",
"forked_is_distinct": True,
"forked_policy": AskForApprovalValue.never.value,
}


def test_thread_fork_can_override_approval_mode(tmp_path) -> None:
"""Forking with an explicit approval mode should send an override."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"source seeded",
response_id="fork-override-mode",
)

with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
forked = codex.thread_fork(
source.id,
approval_mode=ApprovalMode.auto_review,
)
forked_state = codex._client.thread_resume( # noqa: SLF001
forked.id,
ThreadResumeParams(thread_id=forked.id),
)

assert {
"final_response": result.final_response,
"forked_policy": response_approval_policy(forked_state),
} == {
"final_response": "source seeded",
"forked_policy": AskForApprovalValue.on_request.value,
}


def test_turn_approval_mode_persists_until_next_turn(tmp_path) -> None:
"""A turn-level approval override should apply to later omitted-arg turns."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1")
harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2")

with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
first_result = thread.run(
"deny this and later turns",
approval_mode=ApprovalMode.deny_all,
)
after_turn_override = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = thread.run("inherit previous approval mode")
after_omitted_turn = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)

assert {
"after_turn_override": response_approval_policy(after_turn_override),
"after_omitted_turn": response_approval_policy(after_omitted_turn),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_turn_override": AskForApprovalValue.never.value,
"after_omitted_turn": AskForApprovalValue.never.value,
"final_responses": ["turn override", "turn inherited"],
}


def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None:
"""Omitted run approval mode should not rewrite the thread's stored setting."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("locked down", response_id="approval-1")
harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2")

with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start(approval_mode=ApprovalMode.deny_all)

first_result = thread.run("keep approvals denied")
after_default_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = thread.run(
"allow auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)

assert {
"after_default_policy": response_approval_policy(after_default_run),
"after_override_policy": response_approval_policy(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_default_policy": AskForApprovalValue.never.value,
"after_override_policy": AskForApprovalValue.on_request.value,
"final_responses": ["locked down", "reviewable"],
}


def test_async_thread_run_approval_mode_persists_until_explicit_override(
tmp_path,
) -> None:
"""Async omitted run approval mode should leave stored settings alone."""

async def scenario() -> None:
"""Use the async client to verify persisted app-server approval state."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"async locked down",
response_id="async-approval-1",
)
harness.responses.enqueue_assistant_message(
"async reviewable",
response_id="async-approval-2",
)

async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all)
first_result = await thread.run("keep async approvals denied")
after_default_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = await thread.run(
"allow async auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)

assert {
"after_default_policy": response_approval_policy(after_default_run),
"after_override_policy": response_approval_policy(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_default_policy": AskForApprovalValue.never.value,
"after_override_policy": AskForApprovalValue.on_request.value,
"final_responses": ["async locked down", "async reviewable"],
}

asyncio.run(scenario())
Loading
Loading