Skip to content

Commit 7a384db

Browse files
committed
feat: add per-chunk timeout for SSE streaming to prevent hangs
1 parent d201070 commit 7a384db

1 file changed

Lines changed: 28 additions & 2 deletions

File tree

src/cortex-engine/src/client/cortex.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
//! - Responses API (streaming SSE)
66
//! - Credit system with price verification
77
8+
use std::time::Duration;
9+
810
use async_trait::async_trait;
911
use eventsource_stream::Eventsource;
1012
use futures::StreamExt;
1113
use reqwest::Client;
1214
use serde::{Deserialize, Serialize};
1315
use tokio::sync::mpsc;
16+
use tokio::time::timeout;
1417
use tokio_stream::wrappers::ReceiverStream;
1518

1619
use super::{
@@ -22,6 +25,11 @@ use crate::error::{CortexError, Result};
2225

2326
const DEFAULT_CORTEX_URL: &str = "https://api.cortex.foundation";
2427

28+
/// Timeout in seconds for receiving individual SSE chunks during streaming.
29+
/// If no data is received within this duration, the connection is terminated
30+
/// to prevent indefinite hangs when connections stall mid-stream.
31+
const CHUNK_TIMEOUT_SECS: u64 = 60;
32+
2533
/// Pricing information for a model.
2634
#[derive(Debug, Clone, Serialize, Deserialize)]
2735
pub struct PricingInfo {
@@ -567,8 +575,26 @@ impl ModelClient for CortexClient {
567575
let mut stream = std::pin::pin!(stream);
568576
let mut accumulated_text = String::new();
569577
let mut usage = TokenUsage::default();
570-
571-
while let Some(event_result) = stream.next().await {
578+
let chunk_timeout = Duration::from_secs(CHUNK_TIMEOUT_SECS);
579+
580+
loop {
581+
// Apply per-chunk timeout to prevent indefinite hangs when connections stall
582+
let event_result = match timeout(chunk_timeout, stream.next()).await {
583+
Ok(Some(result)) => result,
584+
Ok(None) => break, // Stream ended normally
585+
Err(_) => {
586+
// Timeout elapsed - no data received within CHUNK_TIMEOUT_SECS
587+
let _ = tx
588+
.send(Err(CortexError::BackendError {
589+
message: format!(
590+
"SSE chunk timeout - no data received for {} seconds",
591+
CHUNK_TIMEOUT_SECS
592+
),
593+
}))
594+
.await;
595+
break;
596+
}
597+
};
572598
match event_result {
573599
Ok(event) => {
574600
if event.data.is_empty() || event.data == "[DONE]" {

0 commit comments

Comments
 (0)