Skip to content

Commit 494e476

Browse files
committed
fix(cli): resolve message queue race conditions and add recovery mechanisms
- Fix React concurrent mode race condition by reading from ref before setState - Remove buggy startStreaming line that conflated busy/paused states - Add resetEarlyReturnState helper for DRY queue state resets - Set isChainInProgressRef synchronously to prevent race conditions - Add 5-minute watchdog timer to recover from stuck queue locks - Simplify queuePaused to only reflect user-initiated pause state - Add unit tests for early return queue state reset scenarios - Delete unused use-committed-value.ts file - Update tests to use real resetEarlyReturnState instead of mock
1 parent 7b81a4a commit 494e476

File tree

4 files changed

+427
-64
lines changed

4 files changed

+427
-64
lines changed

cli/src/hooks/helpers/__tests__/send-message.test.ts

Lines changed: 250 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ ensureEnv()
2828

2929
const { useChatStore } = await import('../../../state/chat-store')
3030
const { createStreamController } = await import('../../stream-state')
31-
const { setupStreamingContext, handleRunError, finalizeQueueState } = await import(
31+
const { setupStreamingContext, handleRunError, finalizeQueueState, resetEarlyReturnState } = await import(
3232
'../send-message'
3333
)
3434
const { createBatchedMessageUpdater } = await import(
@@ -113,7 +113,7 @@ describe('setupStreamingContext', () => {
113113
// Verify stream status reset
114114
expect(streamStatus).toBe('idle')
115115

116-
// Verify queue processing enabled (no isQueuePausedRef)
116+
// Verify queue processing enabled (no pause ref)
117117
expect(canProcessQueue).toBe(true)
118118

119119
// Verify chain in progress reset
@@ -170,7 +170,7 @@ describe('setupStreamingContext', () => {
170170
// Trigger abort
171171
abortController.abort()
172172

173-
// When queue is paused, canProcessQueue should be false
173+
// When queue was paused before streaming, canProcessQueue should be false
174174
expect(canProcessQueue).toBe(false)
175175
})
176176

@@ -374,7 +374,7 @@ describe('finalizeQueueState', () => {
374374
isQueuePausedRef,
375375
})
376376

377-
// When queue is paused, canProcessQueue should be false
377+
// When queue was paused before streaming, canProcessQueue should be false
378378
expect(canProcessQueue).toBe(false)
379379
})
380380
})
@@ -583,7 +583,7 @@ describe('handleRunError', () => {
583583
isQueuePausedRef,
584584
})
585585

586-
// When queue is paused, canProcessQueue should be false
586+
// When queue was paused before streaming, canProcessQueue should be false
587587
expect(canProcessQueue).toBe(false)
588588
})
589589

@@ -718,3 +718,248 @@ describe('handleRunError', () => {
718718
expect(timerController.stopCalls).toContain('error')
719719
})
720720
})
721+
722+
/**
723+
* Tests for early return queue state reset in sendMessage.
724+
* These test the resetEarlyReturnState helper used across multiple early return paths:
725+
* - prepareUserMessage exception
726+
* - validation failure (success: false)
727+
* - validation exception
728+
*/
729+
describe('resetEarlyReturnState', () => {
730+
describe('prepareUserMessage exception path', () => {
731+
test('resets chain in progress to false', () => {
732+
let chainInProgress = true
733+
734+
resetEarlyReturnState({
735+
updateChainInProgress: (value) => { chainInProgress = value },
736+
setCanProcessQueue: () => {},
737+
})
738+
739+
expect(chainInProgress).toBe(false)
740+
})
741+
742+
test('sets canProcessQueue to true when queue is not paused', () => {
743+
let canProcessQueue = false
744+
const isQueuePausedRef = { current: false }
745+
746+
resetEarlyReturnState({
747+
updateChainInProgress: () => {},
748+
setCanProcessQueue: (can) => { canProcessQueue = can },
749+
isQueuePausedRef,
750+
})
751+
752+
expect(canProcessQueue).toBe(true)
753+
})
754+
755+
test('sets canProcessQueue to false when queue is paused', () => {
756+
let canProcessQueue = true
757+
const isQueuePausedRef = { current: true }
758+
759+
resetEarlyReturnState({
760+
updateChainInProgress: () => {},
761+
setCanProcessQueue: (can) => { canProcessQueue = can },
762+
isQueuePausedRef,
763+
})
764+
765+
expect(canProcessQueue).toBe(false)
766+
})
767+
768+
test('resets isProcessingQueueRef to false', () => {
769+
const isProcessingQueueRef = { current: true }
770+
771+
resetEarlyReturnState({
772+
updateChainInProgress: () => {},
773+
setCanProcessQueue: () => {},
774+
isProcessingQueueRef,
775+
})
776+
777+
expect(isProcessingQueueRef.current).toBe(false)
778+
})
779+
780+
test('handles missing isProcessingQueueRef gracefully', () => {
781+
// Should not throw when isProcessingQueueRef is undefined
782+
expect(() => {
783+
resetEarlyReturnState({
784+
updateChainInProgress: () => {},
785+
setCanProcessQueue: () => {},
786+
})
787+
}).not.toThrow()
788+
})
789+
790+
test('handles missing isQueuePausedRef gracefully (defaults to canProcessQueue=true)', () => {
791+
let canProcessQueue = false
792+
793+
resetEarlyReturnState({
794+
updateChainInProgress: () => {},
795+
setCanProcessQueue: (can) => { canProcessQueue = can },
796+
// No isQueuePausedRef - should default to !undefined = true
797+
})
798+
799+
expect(canProcessQueue).toBe(true)
800+
})
801+
})
802+
803+
describe('validation failure path (success: false)', () => {
804+
test('resets all queue state correctly when processing queued message', () => {
805+
let chainInProgress = true
806+
let canProcessQueue = false
807+
const isProcessingQueueRef = { current: true }
808+
const isQueuePausedRef = { current: false }
809+
810+
resetEarlyReturnState({
811+
updateChainInProgress: (value) => { chainInProgress = value },
812+
setCanProcessQueue: (can) => { canProcessQueue = can },
813+
isProcessingQueueRef,
814+
isQueuePausedRef,
815+
})
816+
817+
expect(chainInProgress).toBe(false)
818+
expect(canProcessQueue).toBe(true)
819+
expect(isProcessingQueueRef.current).toBe(false)
820+
})
821+
822+
test('respects queue paused state after validation failure', () => {
823+
let chainInProgress = true
824+
let canProcessQueue = true
825+
const isProcessingQueueRef = { current: true }
826+
const isQueuePausedRef = { current: true }
827+
828+
resetEarlyReturnState({
829+
updateChainInProgress: (value) => { chainInProgress = value },
830+
setCanProcessQueue: (can) => { canProcessQueue = can },
831+
isProcessingQueueRef,
832+
isQueuePausedRef,
833+
})
834+
835+
expect(chainInProgress).toBe(false)
836+
expect(canProcessQueue).toBe(false) // Queue was paused, should stay paused
837+
expect(isProcessingQueueRef.current).toBe(false)
838+
})
839+
})
840+
841+
describe('validation exception path', () => {
842+
test('resets all queue state correctly when validation throws', () => {
843+
let chainInProgress = true
844+
let canProcessQueue = false
845+
const isProcessingQueueRef = { current: true }
846+
const isQueuePausedRef = { current: false }
847+
848+
// Simulating what happens after catching validation exception
849+
resetEarlyReturnState({
850+
updateChainInProgress: (value) => { chainInProgress = value },
851+
setCanProcessQueue: (can) => { canProcessQueue = can },
852+
isProcessingQueueRef,
853+
isQueuePausedRef,
854+
})
855+
856+
expect(chainInProgress).toBe(false)
857+
expect(canProcessQueue).toBe(true)
858+
expect(isProcessingQueueRef.current).toBe(false)
859+
})
860+
861+
test('preserves queue pause state when validation throws', () => {
862+
let canProcessQueue = true
863+
const isQueuePausedRef = { current: true }
864+
const isProcessingQueueRef = { current: true }
865+
866+
resetEarlyReturnState({
867+
updateChainInProgress: () => {},
868+
setCanProcessQueue: (can) => { canProcessQueue = can },
869+
isProcessingQueueRef,
870+
isQueuePausedRef,
871+
})
872+
873+
// Queue was explicitly paused before, should remain paused after error
874+
expect(canProcessQueue).toBe(false)
875+
// But processing lock should be released to allow manual resume
876+
expect(isProcessingQueueRef.current).toBe(false)
877+
})
878+
})
879+
880+
describe('complete early return scenarios', () => {
881+
test('queue can process next message after prepareUserMessage exception', () => {
882+
// Scenario: Message was being processed from queue, prepareUserMessage throws
883+
let chainInProgress = true
884+
let canProcessQueue = false
885+
const isProcessingQueueRef = { current: true }
886+
const isQueuePausedRef = { current: false }
887+
888+
// After exception, reset is called
889+
resetEarlyReturnState({
890+
updateChainInProgress: (value) => { chainInProgress = value },
891+
setCanProcessQueue: (can) => { canProcessQueue = can },
892+
isProcessingQueueRef,
893+
isQueuePausedRef,
894+
})
895+
896+
// Queue should be able to process next message
897+
expect(chainInProgress).toBe(false)
898+
expect(canProcessQueue).toBe(true)
899+
expect(isProcessingQueueRef.current).toBe(false)
900+
})
901+
902+
test('queue can process next message after validation returns success=false', () => {
903+
// Scenario: Message was being processed, validation returns failure
904+
let chainInProgress = true
905+
let canProcessQueue = false
906+
const isProcessingQueueRef = { current: true }
907+
const isQueuePausedRef = { current: false }
908+
909+
resetEarlyReturnState({
910+
updateChainInProgress: (value) => { chainInProgress = value },
911+
setCanProcessQueue: (can) => { canProcessQueue = can },
912+
isProcessingQueueRef,
913+
isQueuePausedRef,
914+
})
915+
916+
// All locks released, queue can continue
917+
expect(chainInProgress).toBe(false)
918+
expect(canProcessQueue).toBe(true)
919+
expect(isProcessingQueueRef.current).toBe(false)
920+
})
921+
922+
test('queue can process next message after validation throws exception', () => {
923+
// Scenario: Message was being processed, validation throws
924+
let chainInProgress = true
925+
let canProcessQueue = false
926+
const isProcessingQueueRef = { current: true }
927+
const isQueuePausedRef = { current: false }
928+
929+
resetEarlyReturnState({
930+
updateChainInProgress: (value) => { chainInProgress = value },
931+
setCanProcessQueue: (can) => { canProcessQueue = can },
932+
isProcessingQueueRef,
933+
isQueuePausedRef,
934+
})
935+
936+
// All locks released, queue can continue
937+
expect(chainInProgress).toBe(false)
938+
expect(canProcessQueue).toBe(true)
939+
expect(isProcessingQueueRef.current).toBe(false)
940+
})
941+
942+
test('queue remains blocked after error if user had paused it', () => {
943+
// Scenario: User paused queue, then an error occurred
944+
// Queue should remain paused after error recovery
945+
let chainInProgress = true
946+
let canProcessQueue = true
947+
const isProcessingQueueRef = { current: true }
948+
const isQueuePausedRef = { current: true } // User explicitly paused
949+
950+
resetEarlyReturnState({
951+
updateChainInProgress: (value) => { chainInProgress = value },
952+
setCanProcessQueue: (can) => { canProcessQueue = can },
953+
isProcessingQueueRef,
954+
isQueuePausedRef,
955+
})
956+
957+
// Chain is no longer in progress
958+
expect(chainInProgress).toBe(false)
959+
// But queue should remain blocked because user paused it
960+
expect(canProcessQueue).toBe(false)
961+
// Processing lock is released though
962+
expect(isProcessingQueueRef.current).toBe(false)
963+
})
964+
})
965+
})

cli/src/hooks/helpers/send-message.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,29 @@ import type { StreamStatus } from '../use-message-queue'
3535
import type { MessageContent, RunState } from '@codebuff/sdk'
3636
import type { MutableRefObject, SetStateAction } from 'react'
3737

38+
/** Resets queue state on early return (before streaming starts). */
39+
export type ResetEarlyReturnStateParams = {
40+
setCanProcessQueue: (can: boolean) => void
41+
updateChainInProgress: (value: boolean) => void
42+
isProcessingQueueRef?: MutableRefObject<boolean>
43+
isQueuePausedRef?: MutableRefObject<boolean>
44+
}
45+
46+
export const resetEarlyReturnState = (params: ResetEarlyReturnStateParams): void => {
47+
const {
48+
setCanProcessQueue,
49+
updateChainInProgress,
50+
isProcessingQueueRef,
51+
isQueuePausedRef,
52+
} = params
53+
54+
updateChainInProgress(false)
55+
setCanProcessQueue(!isQueuePausedRef?.current)
56+
if (isProcessingQueueRef) {
57+
isProcessingQueueRef.current = false
58+
}
59+
}
60+
3861
/** Resets queue state after streaming completes, aborts, or errors. */
3962
export type FinalizeQueueStateParams = {
4063
setStreamStatus: (status: StreamStatus) => void
@@ -164,7 +187,7 @@ export const prepareUserMessage = async (params: {
164187
next = postUserMessage(next)
165188
}
166189
if (next.length > 100) {
167-
return next.slice(-100)
190+
next = next.slice(-100)
168191
}
169192
return next
170193
})

0 commit comments

Comments
 (0)