Skip to content

Commit e767ff9

Browse files
committed
fix(streaming): add buffer size limit to StreamProcessor
1 parent c398212 commit e767ff9

1 file changed

Lines changed: 9 additions & 1 deletion

File tree

src/cortex-engine/src/streaming.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ use futures::Stream;
1515
use serde::{Deserialize, Serialize};
1616
use tokio::sync::mpsc;
1717

18+
/// Maximum number of events to buffer before dropping old ones.
19+
/// Prevents unbounded memory growth if drain_events() is not called regularly.
20+
const MAX_BUFFER_SIZE: usize = 10_000;
21+
1822
/// Token usage for streaming.
1923
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
2024
pub struct StreamTokenUsage {
@@ -213,7 +217,7 @@ impl StreamProcessor {
213217
Self {
214218
state: StreamState::Idle,
215219
content: StreamContent::new(),
216-
buffer: VecDeque::new(),
220+
buffer: VecDeque::with_capacity(1024), // Pre-allocate reasonable capacity
217221
start_time: None,
218222
first_token_time: None,
219223
last_event_time: None,
@@ -284,6 +288,10 @@ impl StreamProcessor {
284288
}
285289
}
286290

291+
// Enforce buffer size limit to prevent unbounded memory growth
292+
if self.buffer.len() >= MAX_BUFFER_SIZE {
293+
self.buffer.pop_front();
294+
}
287295
self.buffer.push_back(event);
288296
}
289297

0 commit comments

Comments
 (0)