Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/api/codexGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export type TelegramStatus = {
mappedThreads: number
lastError: string
}
export type ThreadReadStateMap = Record<string, string>

async function callRpc<T>(method: string, params?: unknown): Promise<T> {
try {
Expand Down Expand Up @@ -603,6 +604,41 @@ function getErrorMessageFromPayload(payload: unknown, fallback: string): string

export type ThreadTitleCache = { titles: Record<string, string>; order: string[] }

function normalizeThreadReadStateMap(payload: unknown): ThreadReadStateMap {
if (!payload || typeof payload !== 'object' || Array.isArray(payload)) return {}

const next: ThreadReadStateMap = {}
for (const [threadId, readAtIso] of Object.entries(payload as Record<string, unknown>)) {
if (typeof threadId !== 'string' || threadId.length === 0) continue
if (typeof readAtIso !== 'string' || readAtIso.length === 0) continue
next[threadId] = readAtIso
}
return next
}

export async function getThreadReadState(): Promise<ThreadReadStateMap | null> {
try {
const response = await fetch('/codex-api/thread-read-state')
if (!response.ok) return null
const envelope = (await response.json()) as { data?: unknown }
return normalizeThreadReadStateMap(envelope.data)
} catch {
return null
}
}

export async function persistThreadReadState(state: ThreadReadStateMap): Promise<void> {
try {
await fetch('/codex-api/thread-read-state', {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ state }),
})
} catch {
// Best-effort persist
}
}

export async function getThreadTitleCache(): Promise<ThreadTitleCache> {
try {
const response = await fetch('/codex-api/thread-titles')
Expand Down
83 changes: 75 additions & 8 deletions src/composables/useDesktopState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import {
rollbackThread,
getThreadGroups,
getWorkspaceRootsState,
getThreadReadState,
setCodexSpeedMode,
setDefaultModel,
setWorkspaceRootsState,
persistThreadReadState,
getThreadTitleCache,
persistThreadTitle,
generateThreadTitle,
Expand Down Expand Up @@ -74,11 +76,53 @@ function loadReadStateMap(): Record<string, string> {
}
}

function compareReadStateIso(first: string | undefined, second: string | undefined): number {
const left = typeof first === 'string' ? first.trim() : ''
const right = typeof second === 'string' ? second.trim() : ''
if (!left && !right) return 0
if (!left) return -1
if (!right) return 1
return left.localeCompare(right)
}

function mergeReadStateMaps(...maps: Array<Record<string, string> | null | undefined>): Record<string, string> {
const merged: Record<string, string> = {}

for (const map of maps) {
if (!map) continue

for (const [threadId, readAtIso] of Object.entries(map)) {
if (!threadId || !readAtIso) continue
if (compareReadStateIso(readAtIso, merged[threadId]) > 0) {
merged[threadId] = readAtIso
}
}
}

return merged
}

function areReadStateMapsEqual(first: Record<string, string>, second: Record<string, string>): boolean {
const firstEntries = Object.entries(first)
const secondEntries = Object.entries(second)
if (firstEntries.length !== secondEntries.length) return false

for (const [threadId, readAtIso] of firstEntries) {
if (second[threadId] !== readAtIso) return false
}

return true
}

function saveReadStateMap(state: Record<string, string>): void {
if (typeof window === 'undefined') return
window.localStorage.setItem(READ_STATE_STORAGE_KEY, JSON.stringify(state))
}

function hasUnreadThreadUpdate(lastReadIso: string | undefined, updatedAtIso: string): boolean {
return compareReadStateIso(lastReadIso, updatedAtIso) < 0
}

function clamp(value: number, minValue: number, maxValue: number): number {
return Math.min(Math.max(value, minValue), maxValue)
}
Expand Down Expand Up @@ -1022,7 +1066,7 @@ export function useDesktopState() {
const isSelected = selectedThreadId.value === thread.id
const lastReadIso = readStateByThreadId.value[thread.id]
const unreadByEvent = eventUnreadByThreadId.value[thread.id] === true
const unread = !isSelected && !inProgress && (unreadByEvent || lastReadIso !== thread.updatedAtIso)
const unread = !isSelected && !inProgress && (unreadByEvent || hasUnreadThreadUpdate(lastReadIso, thread.updatedAtIso))

return {
...thread,
Expand All @@ -1034,6 +1078,12 @@ export function useDesktopState() {
projectGroups.value = mergeThreadGroups(projectGroups.value, flaggedGroups)
}

function commitReadState(nextState: Record<string, string>): void {
readStateByThreadId.value = nextState
saveReadStateMap(nextState)
void persistThreadReadState(nextState)
}

function insertOptimisticThread(threadId: string, cwd: string, firstMessageText: string): void {
const nowIso = new Date().toISOString()
const normalizedCwd = cwd.trim()
Expand Down Expand Up @@ -1078,8 +1128,7 @@ export function useDesktopState() {
const activeThreadIds = new Set(flatThreads.map((thread) => thread.id))
const nextReadState = pruneThreadStateMap(readStateByThreadId.value, activeThreadIds)
if (nextReadState !== readStateByThreadId.value) {
readStateByThreadId.value = nextReadState
saveReadStateMap(nextReadState)
commitReadState(nextReadState)
}
Comment on lines 1128 to 1132

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Shared read-state gets pruned 🐞 Bug ✓ Correctness

pruneThreadScopedState prunes readStateByThreadId to only the currently loaded threads and now
persists that pruned map to the shared store via commitReadState(). Since thread/list is limited to
100 threads, this deletes read timestamps for older threads from the shared store and can resurrect
unread dots when those threads reappear in the top 100.
Agent Prompt
## Issue description
`pruneThreadScopedState()` prunes `readStateByThreadId` to only the currently loaded thread IDs and then calls `commitReadState()`, which persists that pruned map to the shared store. Because `/thread/list` is capped at 100 threads, this deletes read timestamps for threads outside the current page and defeats the goal of stable unread indicators.

## Issue Context
The UI only fetches 100 threads (`thread/list` limit), but the shared store should retain read timestamps beyond the current in-memory thread list so threads that re-enter the top 100 don't look unread again.

## Fix Focus Areas
- src/composables/useDesktopState.ts[1127-1132]
- src/composables/useDesktopState.ts[1081-1085]
- src/api/codexGateway.ts[93-99]

## Suggested fix
- Split "update local read-state" from "persist to shared store":
  - Keep pruning for `localStorage`/memory if desired.
  - Do **not** call `persistThreadReadState()` when pruning; only persist on positive events (e.g., `markThreadAsRead`, or after merging shared state).
- Alternatively, if you must persist after pruning, first merge with the existing shared store (server-side merge is preferable; see separate finding) so missing keys are not deleted.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

const nextScrollState = pruneThreadStateMap(scrollStateByThreadId.value, activeThreadIds)
if (nextScrollState !== scrollStateByThreadId.value) {
Expand Down Expand Up @@ -1112,11 +1161,10 @@ export function useDesktopState() {
const thread = flattenThreads(sourceGroups.value).find((row) => row.id === threadId)
if (!thread) return

readStateByThreadId.value = {
...readStateByThreadId.value,
const nextReadState = mergeReadStateMaps(readStateByThreadId.value, {
[threadId]: thread.updatedAtIso,
}
saveReadStateMap(readStateByThreadId.value)
})
commitReadState(nextReadState)
if (eventUnreadByThreadId.value[threadId]) {
eventUnreadByThreadId.value = omitKey(eventUnreadByThreadId.value, threadId)
}
Expand Down Expand Up @@ -2140,6 +2188,21 @@ export function useDesktopState() {
}
}

async function syncThreadReadStateFromSharedStore(): Promise<void> {
const sharedState = await getThreadReadState()
if (sharedState === null) return

const mergedState = mergeReadStateMaps(sharedState, readStateByThreadId.value)
if (!areReadStateMapsEqual(readStateByThreadId.value, mergedState)) {
readStateByThreadId.value = mergedState
saveReadStateMap(mergedState)
}

if (!areReadStateMapsEqual(sharedState, mergedState)) {
void persistThreadReadState(mergedState)
}
}

async function requestThreadTitleGeneration(threadId: string, prompt: string, cwd: string | null): Promise<void> {
if (threadTitleById.value[threadId]) return
const trimmed = prompt.trim()
Expand All @@ -2162,7 +2225,11 @@ export function useDesktopState() {
}

try {
const [groups] = await Promise.all([getThreadGroups(), loadThreadTitleCacheIfNeeded()])
const [groups] = await Promise.all([
getThreadGroups(),
loadThreadTitleCacheIfNeeded(),
syncThreadReadStateFromSharedStore(),
])
await hydrateWorkspaceRootsStateIfNeeded(groups)

const nextProjectOrder = mergeProjectOrder(projectOrder.value, groups)
Expand Down
48 changes: 47 additions & 1 deletion src/server/codexAppServerBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ function trimThreadTitleCache(cache: ThreadTitleCache): ThreadTitleCache {
return { titles, order }
}

function normalizeThreadReadStateMap(value: unknown): Record<string, string> {
return normalizeStringRecord(value)
}

function mergeThreadTitleCaches(base: ThreadTitleCache, overlay: ThreadTitleCache): ThreadTitleCache {
const titles = { ...base.titles, ...overlay.titles }
const order: string[] = []
Expand Down Expand Up @@ -530,6 +534,31 @@ async function writeThreadTitleCache(cache: ThreadTitleCache): Promise<void> {
await writeFile(statePath, JSON.stringify(payload), 'utf8')
}

async function readThreadReadStateMap(): Promise<Record<string, string>> {
const statePath = getCodexGlobalStatePath()
try {
const raw = await readFile(statePath, 'utf8')
const payload = asRecord(JSON.parse(raw)) ?? {}
return normalizeThreadReadStateMap(payload['thread-read-state'])
} catch {
return {}
}
}

async function writeThreadReadStateMap(state: Record<string, string>): Promise<void> {
const statePath = getCodexGlobalStatePath()
let payload: Record<string, unknown> = {}
try {
const raw = await readFile(statePath, 'utf8')
payload = asRecord(JSON.parse(raw)) ?? {}
} catch {
payload = {}
}

payload['thread-read-state'] = normalizeThreadReadStateMap(state)
await writeFile(statePath, JSON.stringify(payload), 'utf8')
}
Comment on lines +548 to +560

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Global state write races 🐞 Bug ⛯ Reliability

Multiple endpoints update the same ~/.codex/.codex-global-state.json using independent
read-modify-write cycles without serialization, so concurrent requests can clobber unrelated keys.
Adding frequent thread-read-state writes increases the likelihood of losing workspace roots or
thread title updates.
Agent Prompt
## Issue description
`~/.codex/.codex-global-state.json` is mutated by multiple request handlers (thread titles, workspace roots, and now thread read-state). Each handler performs an uncoordinated read-modify-write of the entire JSON file, allowing concurrent requests to overwrite each other and lose unrelated keys.

## Issue Context
This PR increases write frequency (mark-as-read + pruning persistence), making collisions more likely.

## Fix Focus Areas
- src/server/codexAppServerBridge.ts[401-402]
- src/server/codexAppServerBridge.ts[524-535]
- src/server/codexAppServerBridge.ts[548-560]
- src/server/codexAppServerBridge.ts[656-671]

## Suggested fix
- Introduce a single global-state write path guarded by a per-file async mutex/queue so writes are serialized.
- Perform atomic writes (write to a temp file then `rename`) to reduce partial-write corruption risk.
- Prefer an API like `updateGlobalState((draft) => { draft[key]=... })` that ensures all endpoint updates compose safely.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


function getSessionIndexFileSignature(stats: { mtimeMs: number; size: number }): string {
return `${String(stats.mtimeMs)}:${String(stats.size)}`
}
Expand Down Expand Up @@ -2132,6 +2161,12 @@ export function createCodexBridgeMiddleware(): CodexBridgeMiddleware {
return
}

if (req.method === 'GET' && url.pathname === '/codex-api/thread-read-state') {
const state = await readThreadReadStateMap()
setJson(res, 200, { data: state })
return
}

if (req.method === 'POST' && url.pathname === '/codex-api/thread-search') {
const payload = asRecord(await readJsonBody(req))
const query = typeof payload?.query === 'string' ? payload.query.trim() : ''
Expand Down Expand Up @@ -2181,11 +2216,22 @@ export function createCodexBridgeMiddleware(): CodexBridgeMiddleware {
return
}

if (req.method === 'PUT' && url.pathname === '/codex-api/thread-read-state') {
const payload = await readJsonBody(req)
const record = asRecord(payload)
if (!record) {
setJson(res, 400, { error: 'Invalid body: expected object' })
return
}
await writeThreadReadStateMap(normalizeThreadReadStateMap(record.state ?? record))
setJson(res, 200, { ok: true })
return
Comment on lines +2219 to +2228

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Read-state put overwrites map 🐞 Bug ⛯ Reliability

PUT /codex-api/thread-read-state replaces the entire stored map with the request body instead of
merging with existing state. If multiple browsers/windows are open, a stale client can overwrite and
delete other threads’ read timestamps (lost updates).
Agent Prompt
## Issue description
The shared read-state store is updated with last-write-wins semantics: the server overwrites `thread-read-state` with whatever map the client sends. With two UIs open, a client that hasn't synced recently can erase keys written by the other client.

## Issue Context
Read-state is inherently multi-writer (refreshes, ports, browsers). The server must be robust to out-of-order/stale updates.

## Fix Focus Areas
- src/server/codexAppServerBridge.ts[2219-2228]
- src/server/codexAppServerBridge.ts[537-560]

## Suggested fix
- On PUT, implement a merge with the existing persisted map instead of replacement:
  - `const existing = await readThreadReadStateMap()`
  - `const incoming = normalizeThreadReadStateMap(record.state ?? record)`
  - For each `threadId`, keep the max(readAtIso) by timestamp ordering (ISO lexical compare is OK for `toISOString()`-style strings; otherwise parse and compare).
  - Write the merged result.
- Avoid deleting keys that are absent from the incoming payload (treat PUT as upsert/merge, or change to PATCH semantics).
- Optionally return the merged map (or a version/etag) so clients can converge quickly.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

}

if (req.method === 'GET' && url.pathname === '/codex-api/telegram/status') {
setJson(res, 200, { data: telegramBridge.getStatus() })
return
}

if (req.method === 'GET' && url.pathname === '/codex-api/events') {
res.statusCode = 200
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
Expand Down