File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -36,16 +36,17 @@ async def run(self):
3636 try :
3737 self .log .info ('Opening stream from %s' , stream_url )
3838
39- request_options = {}
40- if self .options .get ("readTimeoutMs" ) is not None :
41- request_options ["timeout" ] = aiohttp .ClientTimeout (
42- sock_read = millis_to_seconds (self .options .get ("readTimeoutMs" ))
43- )
44-
39+ # Always set total=None to disable aiohttp's default 5-minute total
40+ # timeout, which would silently kill long-lived SSE connections.
4541 connect = AsyncConnectStrategy .http (
4642 url = stream_url ,
4743 headers = self .options .get ("headers" ),
48- aiohttp_request_options = request_options if request_options else None ,
44+ aiohttp_request_options = {
45+ "timeout" : aiohttp .ClientTimeout (
46+ total = None ,
47+ sock_read = millis_to_seconds (self .options .get ("readTimeoutMs" )),
48+ )
49+ },
4950 )
5051 sse = AsyncSSEClient (
5152 connect ,
Original file line number Diff line number Diff line change @@ -134,7 +134,8 @@ async def _all_generator(self):
134134 if result is not None :
135135 yield result
136136
137- lines = _AsyncBufferedLineReader .lines_from (self .__connection_result .stream )
137+ current_result = self .__connection_result
138+ lines = _AsyncBufferedLineReader .lines_from (current_result .stream )
138139 reader = _AsyncSSEReader (lines , self .__last_event_id , None )
139140 error : Optional [Exception ] = None
140141 try :
@@ -143,14 +144,14 @@ async def _all_generator(self):
143144 yield ec
144145 if self .__interrupted :
145146 break
146- self .__connection_result = None
147147 except Exception as e :
148148 if self .__closed :
149149 return
150150 error = e
151- self .__connection_result = None
152151 finally :
153152 self .__last_event_id = reader .last_event_id
153+ await current_result .close ()
154+ self .__connection_result = None
154155
155156 self ._compute_next_retry_delay ()
156157 fail_or_continue , self .__current_error_strategy = (
You can’t perform that action at this time.
0 commit comments