Skip to content

Commit 27093ec

Browse files
committed
fix(exec): add per-chunk streaming timeout to prevent indefinite hangs
1 parent c398212 commit 27093ec

1 file changed

Lines changed: 27 additions & 1 deletion

File tree

src/cortex-exec/src/runner.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ const DEFAULT_TIMEOUT_SECS: u64 = 600;
3232
/// Default timeout for a single LLM request (2 minutes).
3333
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120;
3434

35+
/// Per-chunk timeout during streaming responses.
36+
/// Prevents indefinite hangs when connections stall mid-stream.
37+
/// See cortex_common::http_client for timeout hierarchy documentation.
38+
const STREAMING_CHUNK_TIMEOUT_SECS: u64 = 30;
39+
3540
/// Maximum retries for transient errors.
3641
const MAX_RETRIES: usize = 3;
3742

@@ -555,7 +560,28 @@ impl ExecRunner {
555560
let mut partial_tool_calls: std::collections::HashMap<String, (String, String)> =
556561
std::collections::HashMap::new();
557562

558-
while let Some(event) = stream.next().await {
563+
loop {
564+
// Apply per-chunk timeout to prevent indefinite hangs when connections stall
565+
let event = match tokio::time::timeout(
566+
Duration::from_secs(STREAMING_CHUNK_TIMEOUT_SECS),
567+
stream.next(),
568+
)
569+
.await
570+
{
571+
Ok(Some(event)) => event,
572+
Ok(None) => break, // Stream ended normally
573+
Err(_) => {
574+
tracing::warn!(
575+
"Stream chunk timeout after {}s",
576+
STREAMING_CHUNK_TIMEOUT_SECS
577+
);
578+
return Err(CortexError::Provider(format!(
579+
"Streaming timeout: no response chunk received within {}s",
580+
STREAMING_CHUNK_TIMEOUT_SECS
581+
)));
582+
}
583+
};
584+
559585
match event? {
560586
ResponseEvent::Delta(delta) => {
561587
if self.options.streaming {

0 commit comments

Comments
 (0)