Skip to content

Commit 61fb424

Browse files
Fix #216 (result key) and #207 (history pagination)
- Client.get_result() now reads payload.output instead of payload.result for WorkflowCompleted events, with fallback to result for backwards compatibility - Worker now paginates history when next_history_page_token is present, preventing silent replay failures for large workflows - Added Client.workflow_task_history() method to fetch additional history pages Resolves: #216, #207 Tests: 204/204 unit tests pass, ruff/mypy clean Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 1997d7d commit 61fb424

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

src/durable_workflow/client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ async def get_result(
477477
etype = ev.get("event_type")
478478
payload = ev.get("payload") or {}
479479
if etype in ("WorkflowCompleted", "workflow_completed"):
480-
return serializer.decode(payload.get("result"))
480+
return serializer.decode(payload.get("output") or payload.get("result"))
481481
if etype in ("WorkflowFailed", "workflow_failed"):
482482
raise WorkflowFailed(
483483
payload.get("message", "workflow failed"),
@@ -750,6 +750,23 @@ async def fail_workflow_task(
750750
"POST", f"/worker/workflow-tasks/{task_id}/fail", worker=True, json=body
751751
)
752752

753+
async def workflow_task_history(
754+
self,
755+
*,
756+
task_id: str,
757+
page_token: str,
758+
lease_owner: str,
759+
workflow_task_attempt: int,
760+
) -> Any:
761+
body: dict[str, Any] = {
762+
"page_token": page_token,
763+
"lease_owner": lease_owner,
764+
"workflow_task_attempt": workflow_task_attempt,
765+
}
766+
return await self._request(
767+
"POST", f"/worker/workflow-tasks/{task_id}/history", worker=True, json=body
768+
)
769+
753770
async def poll_activity_task(
754771
self, *, worker_id: str, task_queue: str, timeout: float = 35.0
755772
) -> Any:

src/durable_workflow/worker.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,23 @@ async def _run_workflow_task(self, task: dict[str, Any]) -> list[dict[str, Any]]
6969
wf_type: str = task.get("workflow_type", "")
7070
history = task.get("history_events", [])
7171

72+
# Paginate history if needed
73+
next_page_token = task.get("next_history_page_token")
74+
while next_page_token:
75+
try:
76+
page_data = await self.client.workflow_task_history(
77+
task_id=task_id,
78+
page_token=next_page_token,
79+
lease_owner=self.worker_id,
80+
workflow_task_attempt=attempt,
81+
)
82+
if page_data and page_data.get("history_events"):
83+
history.extend(page_data["history_events"])
84+
next_page_token = page_data.get("next_history_page_token") if page_data else None
85+
except Exception as e:
86+
log.warning("failed to fetch history page for task %s: %s", task_id, e)
87+
break
88+
7289
start_input: list[Any] = []
7390
codec = task.get("payload_codec")
7491
raw_args = task.get("arguments")

0 commit comments

Comments
 (0)