|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Minimal example app using CodexClient. |
| 3 | +
|
| 4 | +Usage: |
| 5 | + python examples/basic_conversation.py "Add a smoke test" |
| 6 | +
|
| 7 | +Flags: |
| 8 | + --model MODEL Model slug (default: gpt-5) |
| 9 | + --sandbox {read-only,workspace-write,danger-full-access} |
| 10 | + --approval {untrusted,on-failure,on-request,never} |
| 11 | + --auto-approve Auto‑approve exec/patch requests |
| 12 | + --exit-on-complete Exit after the first completed turn (default: prompt for another turn) |
| 13 | + --allow-apply-patch Enable the apply patch tool (disabled by default) |
| 14 | +
|
| 15 | +This script starts a stateful conversation, submits one user turn, then streams |
| 16 | +events to stdout. For approval requests, it either auto‑approves (when the flag |
| 17 | +is set) or prompts interactively. |
| 18 | +""" |
| 19 | + |
| 20 | +from __future__ import annotations |
| 21 | + |
| 22 | +import argparse |
| 23 | +import sys |
| 24 | +from typing import Any, cast |
| 25 | + |
| 26 | +from codex import CodexClient, CodexConfig |
| 27 | +from codex.config import ApprovalPolicy, SandboxMode |
| 28 | +from codex.protocol.types import ReviewDecision |
| 29 | + |
| 30 | + |
| 31 | +def _etype(ev: Any) -> str: |
| 32 | + msg = getattr(ev, "msg", None) |
| 33 | + root = getattr(msg, "root", None) |
| 34 | + if root is not None and hasattr(root, "type"): |
| 35 | + val = root.type |
| 36 | + return cast(str, val) if isinstance(val, str) else "unknown" |
| 37 | + if isinstance(msg, dict): |
| 38 | + return cast("str", msg.get("type", "unknown")) |
| 39 | + return getattr(msg, "type", "unknown") or "unknown" |
| 40 | + |
| 41 | + |
| 42 | +def parse_args(argv: list[str]) -> argparse.Namespace: |
| 43 | + p = argparse.ArgumentParser(description="CodexClient example") |
| 44 | + p.add_argument("prompt", help="User prompt to send as a turn") |
| 45 | + p.add_argument("--model", default="gpt-5") |
| 46 | + p.add_argument( |
| 47 | + "--sandbox", |
| 48 | + choices=["read-only", "workspace-write", "danger-full-access"], |
| 49 | + default="workspace-write", |
| 50 | + ) |
| 51 | + p.add_argument( |
| 52 | + "--approval", |
| 53 | + choices=["untrusted", "on-failure", "on-request", "never"], |
| 54 | + default="on-request", |
| 55 | + ) |
| 56 | + p.add_argument("--auto-approve", action="store_true") |
| 57 | + p.add_argument("--exit-on-complete", action="store_true") |
| 58 | + p.add_argument("--allow-apply-patch", action="store_true") |
| 59 | + return p.parse_args(argv) |
| 60 | + |
| 61 | + |
| 62 | +def main(argv: list[str]) -> int: |
| 63 | + args = parse_args(argv) |
| 64 | + |
| 65 | + # Map to enums for clarity (strings would also work) |
| 66 | + sandbox_map = { |
| 67 | + "read-only": SandboxMode.READ_ONLY, |
| 68 | + "workspace-write": SandboxMode.WORKSPACE_WRITE, |
| 69 | + "danger-full-access": SandboxMode.DANGER_FULL_ACCESS, |
| 70 | + } |
| 71 | + approval_map = { |
| 72 | + "untrusted": ApprovalPolicy.UNTRUSTED, |
| 73 | + "on-failure": ApprovalPolicy.ON_FAILURE, |
| 74 | + "on-request": ApprovalPolicy.ON_REQUEST, |
| 75 | + "never": ApprovalPolicy.NEVER, |
| 76 | + } |
| 77 | + |
| 78 | + cfg = CodexConfig( |
| 79 | + model=args.model, |
| 80 | + include_apply_patch_tool=bool(args.allow_apply_patch), # default: disabled |
| 81 | + ) |
| 82 | + client = CodexClient(config=cfg) |
| 83 | + conv = client.start_conversation() |
| 84 | + |
| 85 | + # Send the initial turn with per‑turn overrides. |
| 86 | + conv.submit_user_turn( |
| 87 | + args.prompt, |
| 88 | + sandbox_mode=sandbox_map[args.sandbox], |
| 89 | + approval_policy=approval_map[args.approval], |
| 90 | + ) |
| 91 | + |
| 92 | + print("[codex] streaming events...", flush=True) |
| 93 | + for ev in conv: |
| 94 | + et = _etype(ev) |
| 95 | + |
| 96 | + if et == "session_configured": |
| 97 | + model = getattr(getattr(ev.msg, "root", ev.msg), "model", "?") |
| 98 | + print(f"session configured (model={model})") |
| 99 | + |
| 100 | + elif et == "agent_message": |
| 101 | + msg = getattr(getattr(ev.msg, "root", ev.msg), "message", "") |
| 102 | + print(f"assistant: {msg}") |
| 103 | + |
| 104 | + elif et == "agent_message_delta": |
| 105 | + delta = getattr(getattr(ev.msg, "root", ev.msg), "delta", "") |
| 106 | + print(delta, end="", flush=True) |
| 107 | + |
| 108 | + # Tool-call logging: MCP tools, exec, web search, and patch apply |
| 109 | + elif et == "mcp_tool_call_begin": |
| 110 | + root = getattr(ev.msg, "root", ev.msg) |
| 111 | + inv = getattr(root, "invocation", None) |
| 112 | + server = getattr(inv, "server", "?") |
| 113 | + tool = getattr(inv, "tool", "?") |
| 114 | + mcp_args = getattr(inv, "arguments", None) |
| 115 | + call_id = getattr(root, "call_id", "?") |
| 116 | + print(f"[tool] mcp begin id={call_id} {server}:{tool} args={mcp_args}") |
| 117 | + |
| 118 | + elif et == "mcp_tool_call_end": |
| 119 | + root = getattr(ev.msg, "root", ev.msg) |
| 120 | + inv = getattr(root, "invocation", None) |
| 121 | + server = getattr(inv, "server", "?") |
| 122 | + tool = getattr(inv, "tool", "?") |
| 123 | + call_id = getattr(root, "call_id", "?") |
| 124 | + duration = getattr(root, "duration", None) |
| 125 | + res = getattr(root, "result", None) |
| 126 | + ok = getattr(res, "Ok", None) |
| 127 | + err = getattr(res, "Err", None) |
| 128 | + if err is not None: |
| 129 | + print( |
| 130 | + f"[tool] mcp end id={call_id} {server}:{tool} ERROR: {err} (duration={duration})" |
| 131 | + ) |
| 132 | + else: |
| 133 | + # Avoid printing large payloads; just show a short summary if present |
| 134 | + out_type = getattr(ok, "type", None) if ok is not None else None |
| 135 | + print( |
| 136 | + f"[tool] mcp end id={call_id} {server}:{tool} ok type={out_type} (duration={duration})" |
| 137 | + ) |
| 138 | + |
| 139 | + elif et == "exec_command_begin": |
| 140 | + root = getattr(ev.msg, "root", ev.msg) |
| 141 | + cmd = getattr(root, "command", []) |
| 142 | + cwd = getattr(root, "cwd", "") |
| 143 | + call_id = getattr(root, "call_id", "?") |
| 144 | + print(f"[tool] exec begin id={call_id} cwd={cwd} cmd={' '.join(cmd)}") |
| 145 | + |
| 146 | + elif et == "exec_command_end": |
| 147 | + root = getattr(ev.msg, "root", ev.msg) |
| 148 | + call_id = getattr(root, "call_id", "?") |
| 149 | + exit_code = getattr(root, "exit_code", "?") |
| 150 | + duration = getattr(root, "duration", None) |
| 151 | + stdout = getattr(root, "stdout", "") or "" |
| 152 | + stderr = getattr(root, "stderr", "") or "" |
| 153 | + print( |
| 154 | + f"[tool] exec end id={call_id} exit={exit_code} duration={duration} stdout={len(stdout)}B stderr={len(stderr)}B" |
| 155 | + ) |
| 156 | + |
| 157 | + elif et == "web_search_begin": |
| 158 | + call_id = getattr(getattr(ev.msg, "root", ev.msg), "call_id", "?") |
| 159 | + print(f"[tool] web_search begin id={call_id}") |
| 160 | + |
| 161 | + elif et == "web_search_end": |
| 162 | + root = getattr(ev.msg, "root", ev.msg) |
| 163 | + call_id = getattr(root, "call_id", "?") |
| 164 | + query = getattr(root, "query", "?") |
| 165 | + print(f"[tool] web_search end id={call_id} query={query}") |
| 166 | + |
| 167 | + elif et == "patch_apply_begin": |
| 168 | + root = getattr(ev.msg, "root", ev.msg) |
| 169 | + call_id = getattr(root, "call_id", "?") |
| 170 | + auto = getattr(root, "auto_approved", False) |
| 171 | + changes = getattr(root, "changes", {}) or {} |
| 172 | + print( |
| 173 | + f"[tool] patch begin id={call_id} files={len(changes)} auto_approved={bool(auto)}" |
| 174 | + ) |
| 175 | + |
| 176 | + elif et == "patch_apply_end": |
| 177 | + root = getattr(ev.msg, "root", ev.msg) |
| 178 | + call_id = getattr(root, "call_id", "?") |
| 179 | + success = getattr(root, "success", False) |
| 180 | + print(f"[tool] patch end id={call_id} success={bool(success)}") |
| 181 | + |
| 182 | + elif et == "exec_approval_request": |
| 183 | + root = getattr(ev.msg, "root", ev.msg) |
| 184 | + cmd = getattr(root, "command", []) |
| 185 | + cwd = getattr(root, "cwd", "") |
| 186 | + print("\n[approval request] exec:", cmd, "in", cwd) |
| 187 | + if args.auto_approve or _prompt_yes_no("Approve exec? [y/N] "): |
| 188 | + conv.approve_exec(ev.id, ReviewDecision.approved) |
| 189 | + else: |
| 190 | + conv.approve_exec(ev.id, ReviewDecision.denied) |
| 191 | + |
| 192 | + elif et == "apply_patch_approval_request": |
| 193 | + print("\n[approval request] apply_patch") |
| 194 | + if args.auto_approve or _prompt_yes_no("Approve patch? [y/N] "): |
| 195 | + conv.approve_patch(ev.id, ReviewDecision.approved) |
| 196 | + else: |
| 197 | + conv.approve_patch(ev.id, ReviewDecision.denied) |
| 198 | + |
| 199 | + elif et == "task_complete": |
| 200 | + last = getattr(getattr(ev.msg, "root", ev.msg), "last_agent_message", None) |
| 201 | + if last: |
| 202 | + print("\n[task complete]", last) |
| 203 | + # Either exit immediately or allow the user to continue the conversation |
| 204 | + if args.exit_on_complete: |
| 205 | + conv.shutdown() |
| 206 | + else: |
| 207 | + try: |
| 208 | + nxt = input("\nAnother prompt (blank to quit): ").strip() |
| 209 | + except (EOFError, KeyboardInterrupt): |
| 210 | + nxt = "" |
| 211 | + if nxt: |
| 212 | + conv.submit_user_turn(nxt) |
| 213 | + else: |
| 214 | + conv.shutdown() |
| 215 | + |
| 216 | + elif et == "shutdown_complete": |
| 217 | + print("[codex] shutdown complete") |
| 218 | + break |
| 219 | + |
| 220 | + elif et == "stream_error": |
| 221 | + root = getattr(ev.msg, "root", ev.msg) |
| 222 | + print("[stream error]", getattr(root, "message", "?")) |
| 223 | + |
| 224 | + return 0 |
| 225 | + |
| 226 | + |
| 227 | +def _prompt_yes_no(prompt: str) -> bool: |
| 228 | + try: |
| 229 | + ans = input(prompt).strip().lower() |
| 230 | + return ans in {"y", "yes"} |
| 231 | + except (EOFError, KeyboardInterrupt): |
| 232 | + return False |
| 233 | + |
| 234 | + |
| 235 | +if __name__ == "__main__": # pragma: no cover |
| 236 | + raise SystemExit(main(sys.argv[1:])) |
0 commit comments