88from ._api import DeliveryError , Instant , LivenessError , Priority , SendError
99from ._api import Publisher , Topic , ResponseStream , Response
1010from ._header import MsgBeHeader , MsgRelHeader , RspBeHeader , RspRelHeader
11- from ._node import ACK_BASELINE_DEFAULT_TIMEOUT , NodeImpl , SESSION_LIFETIME , TopicImpl
11+ from ._node import ACK_BASELINE_DEFAULT_TIMEOUT , NodeImpl , PublishTracker , SESSION_LIFETIME , TopicImpl
1212from ._transport import TransportArrival
1313
1414_logger = logging .getLogger (__name__ )
@@ -93,33 +93,15 @@ async def __call__(
9393 raise SendError ("Publisher closed" )
9494
9595 tag = self ._topic .next_tag ()
96- lage = self ._topic .lage (Instant .now ().s )
97-
98- hdr : MsgBeHeader | MsgRelHeader
99- if reliable :
100- hdr = MsgRelHeader (
101- topic_log_age = lage ,
102- topic_evictions = self ._topic .evictions ,
103- topic_hash = self ._topic .hash ,
104- tag = tag ,
105- )
106- else :
107- hdr = MsgBeHeader (
108- topic_log_age = lage ,
109- topic_evictions = self ._topic .evictions ,
110- topic_hash = self ._topic .hash ,
111- tag = tag ,
112- )
113-
114- data = hdr .serialize () + bytes (message )
115- writer = self ._topic .ensure_writer ()
96+ payload = bytes (message )
11697
11798 if not reliable :
118- await writer (deadline , self ._priority , data )
99+ writer = self ._topic .ensure_writer ()
100+ await writer (deadline , self ._priority , self ._serialize_message (tag , payload , reliable = False ))
119101 _logger .debug ("Published BE tag=%d topic='%s'" , tag , self ._topic .name )
120102 return
121103
122- await self ._reliable_publish (deadline , tag , data )
104+ await self ._reliable_publish (deadline , tag , payload )
123105
124106 async def request (
125107 self ,
@@ -131,15 +113,7 @@ async def request(
131113 raise SendError ("Publisher closed" )
132114
133115 tag = self ._topic .next_tag ()
134- lage = self ._topic .lage (Instant .now ().s )
135-
136- hdr = MsgRelHeader (
137- topic_log_age = lage ,
138- topic_evictions = self ._topic .evictions ,
139- topic_hash = self ._topic .hash ,
140- tag = tag ,
141- )
142- data = hdr .serialize () + bytes (message )
116+ payload = bytes (message )
143117
144118 # Create response stream before publishing so it's ready to receive.
145119 stream = ResponseStreamImpl (
@@ -150,81 +124,160 @@ async def request(
150124 )
151125 self ._topic .request_futures [tag ] = stream
152126
153- task = self ._node .loop .create_task (self ._request_publish (delivery_deadline , tag , data , stream ))
127+ tracker = self ._prepare_reliable_publish_tracker (tag , delivery_deadline .ns , payload )
128+ try :
129+ initial_window = await self ._reliable_publish_start (delivery_deadline , tag , payload , tracker )
130+ except BaseException :
131+ self ._topic .request_futures .pop (tag , None )
132+ self ._release_reliable_publish_tracker (tag , tracker )
133+ raise
134+
135+ task = self ._node .loop .create_task (
136+ self ._request_publish (delivery_deadline , tag , payload , stream , tracker , initial_window )
137+ )
154138 stream .set_publish_task (task )
155139 return stream
156140
157141 async def _request_publish (
158142 self ,
159143 deadline : Instant ,
160144 tag : int ,
161- data : bytes ,
145+ payload : bytes ,
162146 stream : ResponseStreamImpl ,
147+ tracker : PublishTracker ,
148+ initial_window : tuple [int , bool ],
163149 ) -> None :
164150 try :
165- await self ._reliable_publish (deadline , tag , data )
151+ await self ._reliable_publish_continue (deadline , tag , payload , tracker , initial_window )
166152 except asyncio .CancelledError :
167153 raise
168154 except BaseException as ex :
169155 stream .on_publish_error (ex )
156+ finally :
157+ self ._release_reliable_publish_tracker (tag , tracker )
170158
171159 @staticmethod
172160 def _ack_is_last_attempt (current_ack_deadline_ns : int , current_ack_timeout : float , total_deadline_ns : int ) -> bool :
173161 next_ack_timeout_ns = round (current_ack_timeout * 2 * 1e9 )
174162 remaining_budget_ns = total_deadline_ns - current_ack_deadline_ns
175163 return remaining_budget_ns < next_ack_timeout_ns
176164
177- async def _reliable_publish (self , deadline : Instant , tag : int , data : bytes ) -> None :
178- writer = self ._topic .ensure_writer ()
179- tracker = self ._node .prepare_publish_tracker (self ._topic , tag , deadline .ns , data )
165+ def _serialize_message (self , tag : int , payload : bytes , * , reliable : bool ) -> bytes :
166+ lage = self ._topic .lage (Instant .now ().s )
167+ hdr = (MsgRelHeader if reliable else MsgBeHeader )(
168+ topic_log_age = lage ,
169+ topic_evictions = self ._topic .evictions ,
170+ topic_hash = self ._topic .hash ,
171+ tag = tag ,
172+ )
173+ return hdr .serialize () + payload
174+
175+ @staticmethod
176+ def _reliable_publish_window (deadline_ns : int , ack_timeout : float ) -> tuple [int , bool ] | None :
177+ now_ns = Instant .now ().ns
178+ if now_ns >= deadline_ns :
179+ return None
180+ ack_deadline_ns = min (deadline_ns , now_ns + round (ack_timeout * 1e9 ))
181+ return ack_deadline_ns , PublisherImpl ._ack_is_last_attempt (ack_deadline_ns , ack_timeout , deadline_ns )
182+
183+ def _prepare_reliable_publish_tracker (self , tag : int , deadline_ns : int , payload : bytes ) -> PublishTracker :
184+ tracker = self ._node .prepare_publish_tracker (self ._topic , tag , deadline_ns , payload )
180185 tracker .ack_timeout = self .ack_timeout
181186 self ._topic .publish_futures [tag ] = tracker
182- first_attempt = True
183- try :
184- while True :
185- if tracker .acknowledged and not tracker .remaining :
186- _logger .debug ("Reliable publish ACKed tag=%d topic='%s'" , tag , self ._topic .name )
187- return
188- now_ns = Instant .now ().ns
189- if now_ns >= deadline .ns :
190- break
191- ack_deadline_ns = min (deadline .ns , now_ns + round (tracker .ack_timeout * 1e9 ))
192- last_attempt = self ._ack_is_last_attempt (ack_deadline_ns , tracker .ack_timeout , deadline .ns )
193- tracker .ack_event .clear ()
194- try :
195- tx_deadline = Instant (ns = ack_deadline_ns )
196- if (not first_attempt ) and (len (tracker .remaining ) == 1 ):
197- remote_id = next (iter (tracker .remaining ))
198- await self ._node .transport .unicast (tx_deadline , self ._priority , remote_id , data )
199- else :
200- await writer (tx_deadline , self ._priority , data )
201- except (SendError , OSError ):
202- tracker .compromised = True
203-
204- if tracker .acknowledged and not tracker .remaining :
205- _logger .debug ("Reliable publish ACKed tag=%d topic='%s'" , tag , self ._topic .name )
206- return
207-
208- wait_until_ns = deadline .ns if last_attempt else ack_deadline_ns
209- wait_timeout = max (0.0 , (wait_until_ns - Instant .now ().ns ) * 1e-9 )
210- if wait_timeout > 0 :
211- try :
212- await asyncio .wait_for (tracker .ack_event .wait (), timeout = wait_timeout )
213- except asyncio .TimeoutError :
214- pass
215-
216- if tracker .acknowledged and not tracker .remaining :
217- _logger .debug ("Reliable publish ACKed tag=%d topic='%s'" , tag , self ._topic .name )
218- return
219- if last_attempt :
220- break
221- first_attempt = False
222- tracker .ack_timeout *= 2
187+ return tracker
188+
189+ def _release_reliable_publish_tracker (self , tag : int , tracker : PublishTracker ) -> None :
190+ self ._topic .publish_futures .pop (tag , None )
191+ self ._node .publish_tracker_release (self ._topic , tracker )
192+
193+ async def _send_reliable_publish (
194+ self ,
195+ deadline : Instant ,
196+ tag : int ,
197+ payload : bytes ,
198+ tracker : PublishTracker ,
199+ * ,
200+ first_attempt : bool ,
201+ ) -> None :
202+ data = self ._serialize_message (tag , payload , reliable = True )
203+ if (not first_attempt ) and (len (tracker .remaining ) == 1 ):
204+ remote_id = next (iter (tracker .remaining ))
205+ await self ._node .transport .unicast (deadline , self ._priority , remote_id , data )
206+ else :
207+ writer = self ._topic .ensure_writer ()
208+ await writer (deadline , self ._priority , data )
223209
210+ async def _reliable_publish_start (
211+ self ,
212+ deadline : Instant ,
213+ tag : int ,
214+ payload : bytes ,
215+ tracker : PublishTracker ,
216+ ) -> tuple [int , bool ]:
217+ initial_window = self ._reliable_publish_window (deadline .ns , tracker .ack_timeout )
218+ if initial_window is None :
224219 raise DeliveryError ("Reliable publish not acknowledged before deadline" )
220+ ack_deadline_ns , _ = initial_window
221+ tracker .ack_event .clear ()
222+ try :
223+ await self ._send_reliable_publish (Instant (ns = ack_deadline_ns ), tag , payload , tracker , first_attempt = True )
224+ except SendError :
225+ tracker .compromised = True
226+ raise
227+ except OSError as ex :
228+ tracker .compromised = True
229+ raise SendError ("Reliable publish initial send failed" ) from ex
230+ return initial_window
231+
232+ async def _reliable_publish_continue (
233+ self ,
234+ deadline : Instant ,
235+ tag : int ,
236+ payload : bytes ,
237+ tracker : PublishTracker ,
238+ initial_window : tuple [int , bool ],
239+ ) -> None :
240+ ack_deadline_ns , last_attempt = initial_window
241+ while True :
242+ if tracker .acknowledged and not tracker .remaining :
243+ _logger .debug ("Reliable publish ACKed tag=%d topic='%s'" , tag , self ._topic .name )
244+ return
245+
246+ wait_until_ns = deadline .ns if last_attempt else ack_deadline_ns
247+ wait_timeout = max (0.0 , (wait_until_ns - Instant .now ().ns ) * 1e-9 )
248+ if wait_timeout > 0 :
249+ try :
250+ await asyncio .wait_for (tracker .ack_event .wait (), timeout = wait_timeout )
251+ except asyncio .TimeoutError :
252+ pass
253+
254+ if tracker .acknowledged and not tracker .remaining :
255+ _logger .debug ("Reliable publish ACKed tag=%d topic='%s'" , tag , self ._topic .name )
256+ return
257+ if last_attempt :
258+ break
259+ tracker .ack_timeout *= 2
260+ next_window = self ._reliable_publish_window (deadline .ns , tracker .ack_timeout )
261+ if next_window is None :
262+ break
263+ ack_deadline_ns , last_attempt = next_window
264+ tracker .ack_event .clear ()
265+ try :
266+ await self ._send_reliable_publish (
267+ Instant (ns = ack_deadline_ns ), tag , payload , tracker , first_attempt = False
268+ )
269+ except (SendError , OSError ):
270+ tracker .compromised = True
271+
272+ raise DeliveryError ("Reliable publish not acknowledged before deadline" )
273+
274+ async def _reliable_publish (self , deadline : Instant , tag : int , payload : bytes ) -> None :
275+ tracker = self ._prepare_reliable_publish_tracker (tag , deadline .ns , payload )
276+ try :
277+ initial_window = await self ._reliable_publish_start (deadline , tag , payload , tracker )
278+ await self ._reliable_publish_continue (deadline , tag , payload , tracker , initial_window )
225279 finally :
226- self ._topic .publish_futures .pop (tag , None )
227- self ._node .publish_tracker_release (self ._topic , tracker )
280+ self ._release_reliable_publish_tracker (tag , tracker )
228281
229282 def close (self ) -> None :
230283 if self .closed :
0 commit comments