File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -432,16 +432,28 @@ async def _process_stream(self, response: httpx.Response) -> None:
432432 parser = SSEParser ()
433433 iterator = response .aiter_text ().__aiter__ ()
434434
435+ loop = asyncio .get_running_loop ()
436+ last_event_time = loop .time ()
437+
438+ # Use a short timeout (1s) to allow checking _closed frequently.
439+ # We track elapsed time separately for the real inactivity timeout.
440+ check_timeout = 1.0
441+
435442 while not self ._closed :
436443 try :
437- # Wait for the next chunk with inactivity timeout
438444 chunk = await asyncio .wait_for (
439445 iterator .__anext__ (),
440- timeout = self . _inactivity_timeout ,
446+ timeout = check_timeout ,
441447 )
448+ last_event_time = loop .time ()
442449 except asyncio .TimeoutError :
443- logger .debug ("SSE inactivity timeout, reconnecting..." )
444- break
450+ # Check if we've exceeded the inactivity timeout
451+ elapsed = loop .time () - last_event_time
452+ if elapsed > self ._inactivity_timeout :
453+ logger .debug ("SSE inactivity timeout, reconnecting..." )
454+ break
455+ # Otherwise, just loop and check _closed again
456+ continue
445457 except StopAsyncIteration :
446458 logger .debug ("SSE stream ended" )
447459 break
You can’t perform that action at this time.
0 commit comments