Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions crates/protocol/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
use std::collections::BTreeMap;
use std::path::PathBuf;

use serde::{Deserialize, Serialize};
use serde_json::Value;

pub mod runtime {
use super::*;

pub const RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION: u32 = 1;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeEventEnvelope {
#[serde(default = "default_runtime_event_envelope_schema_version")]
pub schema_version: u32,
pub seq: u64,
pub event: String,
pub kind: String,
pub thread_id: String,
pub turn_id: Option<String>,
pub item_id: Option<String>,
pub timestamp: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at: Option<String>,
Comment thread
greptile-apps[bot] marked this conversation as resolved.
pub payload: Value,
#[serde(default)]
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}

fn default_runtime_event_envelope_schema_version() -> u32 {
RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Envelope<T> {
pub request_id: String,
Expand Down
112 changes: 111 additions & 1 deletion crates/protocol/tests/parity_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use codewhale_protocol::{EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams};
use codewhale_protocol::{
EventFrame, ThreadListParams, ThreadRequest, ThreadResumeParams,
runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope},
};
use serde_json::{Value, json};

#[test]
fn thread_resume_params_round_trip() {
Expand Down Expand Up @@ -48,3 +52,109 @@ fn event_frame_serialization_contains_expected_tag() {
let encoded = serde_json::to_string(&frame).expect("serialize frame");
assert!(encoded.contains("turn_complete"));
}

#[test]
fn runtime_event_envelope_roundtrip() {
let input = json!({
"schema_version": 1,
"seq": 12,
"event": "item.delta",
"kind": "item.delta",
"thread_id": "thr_123",
"turn_id": "turn_456",
"item_id": "item_789",
"timestamp": "2026-02-11T20:18:49.123Z",
"created_at": "2026-02-11T20:18:49.123Z",
"payload": { "delta": "ok", "kind": "agent_message" },
});
let envelope: RuntimeEventEnvelope =
serde_json::from_value(input).expect("deserialize runtime event envelope");
assert_eq!(envelope.schema_version, 1);
assert_eq!(envelope.seq, 12);
assert_eq!(envelope.event, "item.delta");
assert_eq!(envelope.kind, "item.delta");
assert_eq!(envelope.thread_id, "thr_123");

let encoded = serde_json::to_value(&envelope).expect("serialize runtime event envelope");
assert_eq!(encoded["event"], encoded["kind"]);
assert_eq!(encoded["schema_version"], 1);
assert_eq!(encoded["seq"], 12);
assert_eq!(encoded["thread_id"], "thr_123");
assert_eq!(encoded["turn_id"], "turn_456");
assert_eq!(encoded["item_id"], "item_789");
assert_eq!(encoded["timestamp"], "2026-02-11T20:18:49.123Z");
assert_eq!(encoded["created_at"], "2026-02-11T20:18:49.123Z");
assert_eq!(
encoded["payload"],
json!({ "delta": "ok", "kind": "agent_message" })
);
}

#[test]
fn runtime_event_envelope_defaults_to_api_schema_version() {
let input = json!({
"seq": 15,
"event": "thread.started",
"kind": "thread.started",
"thread_id": "thr_default_version",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": {},
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input)
.expect("deserialize runtime event envelope without schema version");

assert_eq!(
envelope.schema_version,
RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION
);
}

#[test]
fn runtime_event_envelope_thread_level_keeps_turn_and_item_ids() {
let input = json!({
"schema_version": 1,
"seq": 14,
"event": "thread.started",
"kind": "thread.started",
"thread_id": "thr_thread",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": { "thread": { "id": "thr_thread" } },
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input)
.expect("deserialize runtime event envelope without thread-level turn/item ids");
assert!(envelope.turn_id.is_none());
assert!(envelope.item_id.is_none());

let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope");
assert!(encoded.get("turn_id").is_some());
assert!(encoded.get("item_id").is_some());
assert!(encoded["turn_id"].is_null());
assert!(encoded["item_id"].is_null());
}

#[test]
fn runtime_event_envelope_preserves_unknown_fields() {
let input: Value = json!({
"schema_version": 1,
"seq": 13,
"event": "turn.completed",
"kind": "turn.completed",
"thread_id": "thr_unknown",
"timestamp": "2026-02-11T20:18:49.123Z",
"payload": {},
"forward_compatibility_hint": "v2-ready",
});
let envelope: RuntimeEventEnvelope = serde_json::from_value(input.clone())
.expect("deserialize runtime event envelope with unknown field");
assert!(envelope.extra.contains_key("forward_compatibility_hint"));

let encoded = serde_json::to_value(envelope).expect("serialize runtime event envelope");
assert_eq!(encoded["forward_compatibility_hint"], "v2-ready");
assert_eq!(encoded["schema_version"], 1);
assert_eq!(encoded["seq"], 13);
assert_eq!(encoded["event"], "turn.completed");
assert_eq!(encoded["kind"], "turn.completed");
assert_eq!(encoded["thread_id"], "thr_unknown");
assert!(encoded["turn_id"].is_null());
assert!(encoded["item_id"].is_null());
}
1 change: 1 addition & 0 deletions crates/tui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ path = "src/bin/deepseek_tui_legacy_shim.rs"
anyhow = "1.0.100"
arboard = "3.4"
codewhale-config = { path = "../config", version = "0.8.46" }
codewhale-protocol = { path = "../protocol", version = "0.8.46" }
codewhale-secrets = { path = "../secrets", version = "0.8.46" }
codewhale-tools = { path = "../tools", version = "0.8.46" }
schemaui = { version = "0.12.0", default-features = false, optional = true }
Expand Down
65 changes: 54 additions & 11 deletions crates/tui/src/runtime_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use chrono::Utc;
use codewhale_protocol::runtime::{RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION, RuntimeEventEnvelope};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -1469,15 +1470,23 @@ async fn stream_turn(
}

fn runtime_event_payload(event: crate::runtime_threads::RuntimeEventRecord) -> serde_json::Value {
json!({
"seq": event.seq,
"timestamp": event.timestamp,
"thread_id": event.thread_id,
"turn_id": event.turn_id,
"item_id": event.item_id,
"event": event.event,
"payload": event.payload,
})
let event_name = event.event.clone();
let timestamp = event.timestamp.to_rfc3339();
let schema_version = RUNTIME_EVENT_ENVELOPE_SCHEMA_VERSION;
let envelope = RuntimeEventEnvelope {
schema_version,
seq: event.seq,
event: event_name.clone(),
kind: event_name,
thread_id: event.thread_id,
turn_id: event.turn_id,
item_id: event.item_id,
timestamp: timestamp.clone(),
created_at: Some(timestamp),
payload: event.payload,
extra: Default::default(),
};
serde_json::to_value(envelope).expect("serialize runtime event envelope")
}

fn map_compat_stream_event(event: &crate::runtime_threads::RuntimeEventRecord) -> Option<SseEvent> {
Expand Down Expand Up @@ -2622,6 +2631,30 @@ mod tests {
chunk_text.contains("event:"),
"expected SSE event chunk, got: {chunk_text}"
);
let (event_name, payload) = parse_sse_frame(&chunk_text)?;
assert_eq!(event_name, "thread.started");
assert!(
event_name.starts_with("item.")
|| event_name.starts_with("turn.")
|| event_name.starts_with("thread.")
|| event_name == "turn.completed"
|| event_name == "turn.started"
|| event_name == "thread.started",
"unexpected first event name: {event_name}"
);
Comment thread
greptile-apps[bot] marked this conversation as resolved.
assert_eq!(payload["event"], payload["kind"]);
assert!(payload.get("turn_id").is_some());
assert!(payload.get("item_id").is_some());
assert!(payload["turn_id"].is_null());
assert!(payload["item_id"].is_null());
assert_eq!(payload["thread_id"], thread_id);
assert!(
payload["schema_version"]
.as_u64()
.is_some_and(|version| version >= 1)
);
assert!(payload.get("seq").and_then(Value::as_u64).is_some());
assert!(payload["payload"].is_object() || payload["payload"].is_array());

handle.abort();
Ok(())
Expand Down Expand Up @@ -2712,7 +2745,15 @@ mod tests {
.await?
.error_for_status()?;
let frame_a = read_first_sse_frame(resp_a).await?;
let (_event_a, payload_a) = parse_sse_frame(&frame_a)?;
let (event_a, payload_a) = parse_sse_frame(&frame_a)?;
assert_eq!(event_a, "thread.started");
assert!(payload_a.get("turn_id").is_some());
assert!(payload_a.get("item_id").is_some());
assert!(payload_a["turn_id"].is_null());
assert!(payload_a["item_id"].is_null());
assert!(payload_a.get("schema_version").is_some());
assert_eq!(payload_a["event"], payload_a["kind"]);
assert_eq!(payload_a["thread_id"], thread_id);
let seq_a = payload_a
.get("seq")
.and_then(Value::as_u64)
Expand All @@ -2727,6 +2768,9 @@ mod tests {
.error_for_status()?;
let frame_b = read_first_sse_frame(resp_b).await?;
let (_event_b, payload_b) = parse_sse_frame(&frame_b)?;
assert!(payload_b.get("schema_version").is_some());
assert_eq!(payload_b["event"], payload_b["kind"]);
assert_eq!(payload_b["thread_id"], thread_id);
let seq_b = payload_b
.get("seq")
.and_then(Value::as_u64)
Expand All @@ -2735,7 +2779,6 @@ mod tests {
seq_b > seq_a,
"expected seq after cursor: {seq_b} <= {seq_a}"
);
assert_eq!(payload_b["thread_id"], thread_id);

handle.abort();
Ok(())
Expand Down
21 changes: 18 additions & 3 deletions docs/RUNTIME_API.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,23 +286,38 @@ Events are append-only with a global monotonic `seq` for replay/resume.

### SSE event stream

The SSE event payload shape:
The SSE event payload shape for `/v1/threads/{id}/events`:

```json
{
"schema_version": 1,
"seq": 42,
"timestamp": "2026-02-11T20:18:49.123Z",
"event": "item.delta",
"kind": "item.delta",
"thread_id": "thr_1234abcd",
"turn_id": "turn_5678efgh",
"item_id": "item_90ab12cd",
"event": "item.delta",
"timestamp": "2026-02-11T20:18:49.123Z",
"created_at": "2026-02-11T20:18:49.123Z",
"payload": {
"delta": "partial output",
"kind": "agent_message"
}
}
```

Compatibility notes:

- `schema_version` is the HTTP/SSE envelope schema version. It is independent of
the runtime store schema used for persisted thread/turn/event records.
- `event` remains the SSE event name in existing clients; it is preserved as-is.
- `kind` mirrors `event` in the stable envelope for typed clients.
- `thread.started`, `turn.started`, and `turn.completed` are emitted as SSE event
names exactly as before.
- `timestamp` remains the canonical event time for schema version 1. `created_at`
is an equivalent alias for clients that use `created_at` naming elsewhere; do
not require both fields to be present.

Common event names: `thread.started`, `thread.forked`, `turn.started`,
`turn.lifecycle`, `turn.steered`, `turn.interrupt_requested`,
`turn.completed`, `item.started`, `item.delta`, `item.completed`,
Expand Down
Loading