@@ -55,18 +55,36 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5555 lastId = id ;
5656
5757 if ( fields && fields . length >= 2 ) {
58- if ( fields [ 1 ] === END_SENTINEL && i === entries . length - 1 ) {
59- controller . close ( ) ;
60- return ;
58+ // Extract the data field from the Redis entry
59+ // Fields format: ["field1", "value1", "field2", "value2", ...]
60+ let data : string | null = null ;
61+
62+ for ( let j = 0 ; j < fields . length ; j += 2 ) {
63+ if ( fields [ j ] === "data" ) {
64+ data = fields [ j + 1 ] ;
65+ break ;
66+ }
6167 }
6268
63- if ( fields [ 1 ] !== END_SENTINEL ) {
64- controller . enqueue ( fields [ 1 ] ) ;
69+ // Handle legacy entries that don't have field names (just data at index 1)
70+ if ( data === null && fields . length >= 2 ) {
71+ data = fields [ 1 ] ;
6572 }
6673
67- if ( signal . aborted ) {
68- controller . close ( ) ;
69- return ;
74+ if ( data ) {
75+ if ( data === END_SENTINEL && i === entries . length - 1 ) {
76+ controller . close ( ) ;
77+ return ;
78+ }
79+
80+ if ( data !== END_SENTINEL ) {
81+ controller . enqueue ( data ) ;
82+ }
83+
84+ if ( signal . aborted ) {
85+ controller . close ( ) ;
86+ return ;
87+ }
7088 }
7189 }
7290 }
@@ -127,10 +145,14 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
127145 async ingestData (
128146 stream : ReadableStream < Uint8Array > ,
129147 runId : string ,
130- streamId : string
148+ streamId : string ,
149+ resumeFromChunk ?: number
131150 ) : Promise < Response > {
132151 const redis = new Redis ( this . options . redis ?? { } ) ;
133152 const streamKey = `stream:${ runId } :${ streamId } ` ;
153+ const startChunk = resumeFromChunk ?? 0 ;
154+ // Start counting from the resume point, not from 0
155+ let currentChunkIndex = startChunk ;
134156
135157 async function cleanup ( ) {
136158 try {
@@ -151,9 +173,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
151173 break ;
152174 }
153175
154- logger . debug ( "[RedisRealtimeStreams][ingestData] Reading data" , {
176+ // Write each chunk with its index
177+ logger . debug ( "[RedisRealtimeStreams][ingestData] Writing chunk" , {
155178 streamKey,
156179 runId,
180+ chunkIndex : currentChunkIndex ,
181+ resumeFromChunk : startChunk ,
157182 value,
158183 } ) ;
159184
@@ -163,9 +188,13 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
163188 "~" ,
164189 String ( env . REALTIME_STREAM_MAX_LENGTH ) ,
165190 "*" ,
191+ "chunkIndex" ,
192+ currentChunkIndex . toString ( ) ,
166193 "data" ,
167194 value
168195 ) ;
196+
197+ currentChunkIndex ++ ;
169198 }
170199
171200 // Send the END_SENTINEL and set TTL with a pipeline.
@@ -200,4 +229,50 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
200229 await cleanup ( ) ;
201230 }
202231 }
232+
233+ async getLastChunkIndex ( runId : string , streamId : string ) : Promise < number > {
234+ const redis = new Redis ( this . options . redis ?? { } ) ;
235+ const streamKey = `stream:${ runId } :${ streamId } ` ;
236+
237+ try {
238+ // Get the last entry from the stream using XREVRANGE
239+ const entries = await redis . xrevrange ( streamKey , "+" , "-" , "COUNT" , 1 ) ;
240+
241+ if ( ! entries || entries . length === 0 ) {
242+ // No entries in stream, return -1 to indicate no chunks received
243+ return - 1 ;
244+ }
245+
246+ const [ _id , fields ] = entries [ 0 ] ;
247+
248+ // Find the chunkIndex field
249+ for ( let i = 0 ; i < fields . length ; i += 2 ) {
250+ if ( fields [ i ] === "chunkIndex" ) {
251+ const chunkIndex = parseInt ( fields [ i + 1 ] , 10 ) ;
252+ logger . debug ( "[RedisRealtimeStreams][getLastChunkIndex] Found last chunk" , {
253+ streamKey,
254+ chunkIndex,
255+ } ) ;
256+ return chunkIndex ;
257+ }
258+ }
259+
260+ // If no chunkIndex field found (legacy entries), return -1
261+ logger . warn ( "[RedisRealtimeStreams][getLastChunkIndex] No chunkIndex found in entry" , {
262+ streamKey,
263+ } ) ;
264+ return - 1 ;
265+ } catch ( error ) {
266+ logger . error ( "[RedisRealtimeStreams][getLastChunkIndex] Error getting last chunk:" , {
267+ error,
268+ streamKey,
269+ } ) ;
270+ // Return -1 to indicate we don't know what the server has
271+ return - 1 ;
272+ } finally {
273+ await redis . quit ( ) . catch ( ( err ) => {
274+ logger . error ( "[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:" , { err } ) ;
275+ } ) ;
276+ }
277+ }
203278}
0 commit comments