2020 DefaultRequestHandler ,
2121)
2222from a2a .types import (
23+ Artifact ,
24+ InternalError ,
25+ Message ,
26+ Part ,
27+ Role ,
2328 Task ,
29+ TaskArtifactUpdateEvent ,
2430 TaskIdParams ,
2531 TaskNotCancelableError ,
2632 TaskNotFoundError ,
33+ TaskQueryParams ,
2734 TaskState ,
35+ TaskStatus ,
36+ TaskStatusUpdateEvent ,
37+ TextPart ,
2838)
2939from a2a .utils .errors import ServerError
3040from fastapi import FastAPI , Request
8494 build_session_state_repository ,
8595 initialize_state_repository ,
8696)
87- from .task_store import build_database_engine , build_task_store , initialize_task_store
97+ from .task_store import (
98+ TaskStoreOperationError ,
99+ build_database_engine ,
100+ build_task_store ,
101+ initialize_task_store ,
102+ )
88103
89104logger = logging .getLogger (__name__ )
105+ TASK_STORE_ERROR_TYPE = "TASK_STORE_UNAVAILABLE"
90106
91107__all__ = [
92108 "_RequestBodyTooLargeError" ,
134150class OpencodeRequestHandler (DefaultRequestHandler ):
135151 """Custom request handler to gracefully handle client disconnects and prevent dead loops."""
136152
153+ @staticmethod
154+ def _task_store_failure_message (operation : str ) -> str :
155+ if operation == "get" :
156+ return "Task store unavailable while loading task state."
157+ if operation == "save" :
158+ return "Task store unavailable while persisting task state."
159+ if operation == "delete" :
160+ return "Task store unavailable while deleting task state."
161+ return "Task store unavailable."
162+
163+ @classmethod
164+ def _task_store_failure_metadata (cls , operation : str ) -> dict [str , dict [str , dict [str , str ]]]:
165+ return {
166+ "opencode" : {
167+ "error" : {
168+ "type" : TASK_STORE_ERROR_TYPE ,
169+ "operation" : operation ,
170+ }
171+ }
172+ }
173+
174+ @classmethod
175+ def _task_store_server_error (cls , exc : TaskStoreOperationError ) -> ServerError :
176+ return ServerError (
177+ error = InternalError (message = cls ._task_store_failure_message (exc .operation ))
178+ )
179+
180+ @classmethod
181+ def _task_store_failure_task (
182+ cls ,
183+ * ,
184+ task_id : str ,
185+ context_id : str ,
186+ operation : str ,
187+ ) -> Task :
188+ message_text = cls ._task_store_failure_message (operation )
189+ error_message = Message (
190+ message_id = f"{ task_id } :task-store-error" ,
191+ role = Role .agent ,
192+ parts = [Part (root = TextPart (text = message_text ))],
193+ task_id = task_id ,
194+ context_id = context_id ,
195+ )
196+ return Task (
197+ id = task_id ,
198+ context_id = context_id ,
199+ status = TaskStatus (state = TaskState .failed , message = error_message ),
200+ history = [error_message ],
201+ metadata = cls ._task_store_failure_metadata (operation ),
202+ )
203+
204+ @classmethod
205+ def _task_store_failure_events (
206+ cls ,
207+ * ,
208+ task_id : str ,
209+ context_id : str ,
210+ operation : str ,
211+ ) -> tuple [TaskArtifactUpdateEvent , TaskStatusUpdateEvent ]:
212+ message_text = cls ._task_store_failure_message (operation )
213+ return (
214+ TaskArtifactUpdateEvent (
215+ task_id = task_id ,
216+ context_id = context_id ,
217+ artifact = Artifact (
218+ artifact_id = f"{ task_id } :error" ,
219+ parts = [Part (root = TextPart (text = message_text ))],
220+ ),
221+ append = False ,
222+ last_chunk = True ,
223+ ),
224+ TaskStatusUpdateEvent (
225+ task_id = task_id ,
226+ context_id = context_id ,
227+ status = TaskStatus (state = TaskState .failed ),
228+ metadata = cls ._task_store_failure_metadata (operation ),
229+ final = True ,
230+ ),
231+ )
232+
233+ @staticmethod
234+ def _resolve_context_id_from_params (params , task_id : str ) -> str : # noqa: ANN001
235+ message = getattr (params , "message" , None )
236+ return (
237+ getattr (message , "contextId" , None ) or getattr (message , "context_id" , None ) or task_id
238+ )
239+
240+ async def on_get_task (
241+ self ,
242+ params : TaskQueryParams ,
243+ context = None ,
244+ ) -> Task | None :
245+ try :
246+ return await super ().on_get_task (params , context )
247+ except TaskStoreOperationError as exc :
248+ raise self ._task_store_server_error (exc ) from exc
249+
137250 async def on_cancel_task (
138251 self ,
139252 params : TaskIdParams ,
140253 context = None ,
141254 ) -> Task | None :
142- task = await self .task_store .get (params .id , context )
143- if not task :
144- raise ServerError (error = TaskNotFoundError ())
145-
146- # Idempotent contract:
147- # repeated cancel on already-canceled task returns current terminal state.
148- if task .status .state == TaskState .canceled :
149- return task
150-
151- if task .status .state in TERMINAL_TASK_STATES :
152- raise ServerError (
153- error = TaskNotCancelableError (
154- message = f"Task cannot be canceled - current state: { task .status .state } "
155- )
156- )
157255 try :
158- return await super ().on_cancel_task (params , context )
159- except ServerError as exc :
160- # Race-safe idempotency: task may become canceled between pre-check and super call.
161- if isinstance (exc .error , TaskNotCancelableError ):
162- refreshed = await self .task_store .get (params .id , context )
163- if refreshed and refreshed .status .state == TaskState .canceled :
164- return refreshed
165- raise
256+ task = await self .task_store .get (params .id , context )
257+ if not task :
258+ raise ServerError (error = TaskNotFoundError ())
259+
260+ # Idempotent contract:
261+ # repeated cancel on already-canceled task returns current terminal state.
262+ if task .status .state == TaskState .canceled :
263+ return task
264+
265+ if task .status .state in TERMINAL_TASK_STATES :
266+ raise ServerError (
267+ error = TaskNotCancelableError (
268+ message = f"Task cannot be canceled - current state: { task .status .state } "
269+ )
270+ )
271+ try :
272+ return await super ().on_cancel_task (params , context )
273+ except ServerError as exc :
274+ # Race-safe idempotency: task may become canceled between pre-check and super call.
275+ if isinstance (exc .error , TaskNotCancelableError ):
276+ refreshed = await self .task_store .get (params .id , context )
277+ if refreshed and refreshed .status .state == TaskState .canceled :
278+ return refreshed
279+ raise
280+ except TaskStoreOperationError as exc :
281+ raise self ._task_store_server_error (exc ) from exc
166282
167283 async def on_resubscribe_to_task (
168284 self ,
169285 params : TaskIdParams ,
170286 context = None ,
171287 ):
172- task = await self .task_store .get (params .id , context )
173- if not task :
174- raise ServerError (error = TaskNotFoundError ())
288+ try :
289+ task = await self .task_store .get (params .id , context )
290+ if not task :
291+ raise ServerError (error = TaskNotFoundError ())
175292
176- # Subscribe contract: terminal tasks replay once and then close stream.
177- if task .status .state in TERMINAL_TASK_STATES :
178- yield task
179- return
293+ # Subscribe contract: terminal tasks replay once and then close stream.
294+ if task .status .state in TERMINAL_TASK_STATES :
295+ yield task
296+ return
180297
181- async for event in super ().on_resubscribe_to_task (params , context ):
182- yield event
298+ async for event in super ().on_resubscribe_to_task (params , context ):
299+ yield event
300+ except TaskStoreOperationError as exc :
301+ raise self ._task_store_server_error (exc ) from exc
183302
184303 async def on_message_send_stream (self , params , context = None ):
185304 (
@@ -202,6 +321,18 @@ async def on_message_send_stream(self, params, context=None):
202321 await self ._send_push_notification_if_needed (task_id , result_aggregator )
203322 yield event
204323 stream_completed = True
324+ except TaskStoreOperationError as exc :
325+ logger .exception (
326+ "Task store operation failed during streaming task_id=%s operation=%s" ,
327+ task_id ,
328+ exc .operation ,
329+ )
330+ for event in self ._task_store_failure_events (
331+ task_id = task_id ,
332+ context_id = self ._resolve_context_id_from_params (params , task_id ),
333+ operation = exc .operation ,
334+ ):
335+ yield event
205336 except (asyncio .CancelledError , GeneratorExit ):
206337 logger .warning ("Client disconnected. Cancelling producer task %s" , task_id )
207338 producer_task .cancel ()
@@ -253,6 +384,17 @@ async def push_notification_callback() -> None:
253384 if bg_consume_task is not None :
254385 bg_consume_task .set_name (f"continue_consuming:{ task_id } " )
255386 self ._track_background_task (bg_consume_task )
387+ except TaskStoreOperationError as exc :
388+ logger .exception (
389+ "Task store operation failed during message/send task_id=%s operation=%s" ,
390+ task_id ,
391+ exc .operation ,
392+ )
393+ return self ._task_store_failure_task (
394+ task_id = task_id ,
395+ context_id = self ._resolve_context_id_from_params (params , task_id ),
396+ operation = exc .operation ,
397+ )
256398 except Exception :
257399 logger .exception ("Agent execution failed" )
258400 raise
@@ -276,9 +418,6 @@ async def push_notification_callback() -> None:
276418 pass
277419
278420 if not result :
279- from a2a .types import InternalError
280- from a2a .utils .errors import ServerError
281-
282421 raise ServerError (error = InternalError ())
283422
284423 if hasattr (result , "id" ) and result .id :
0 commit comments