Skip to content

Commit 456dfdd

Browse files
Use event codec for replayed activity results
1 parent 4d586c5 commit 456dfdd

6 files changed

Lines changed: 32 additions & 12 deletions

File tree

src/durable_workflow/client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ async def start_workflow(
351351
memo: dict[str, Any] | None = None,
352352
search_attributes: dict[str, Any] | None = None,
353353
) -> WorkflowHandle:
354-
encoded_input = serializer.encode(input if input is not None else [])
355354
body: dict[str, Any] = {
356355
"workflow_id": workflow_id,
357356
"workflow_type": workflow_type,

src/durable_workflow/workflow.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ class ReplayOutcome:
273273
commands: list[Command]
274274

275275

276+
def _decode_history_result(payload: dict[str, Any], fallback_codec: str | None) -> Any:
277+
codec = payload.get("payload_codec") or fallback_codec
278+
return serializer.decode_envelope(payload.get("result"), codec=codec)
279+
280+
276281
def replay(
277282
workflow_cls: type,
278283
history_events: Iterable[dict[str, Any]],
@@ -301,14 +306,14 @@ def replay(
301306
etype = ev.get("event_type")
302307
payload = ev.get("payload") or {}
303308
if etype in ("ActivityCompleted", "activity_completed"):
304-
resolved_results.append(serializer.decode(payload.get("result"), codec=payload_codec))
309+
resolved_results.append(_decode_history_result(payload, payload_codec))
305310
elif etype in ("TimerFired", "timer_fired"):
306311
resolved_results.append(None)
307312
elif etype in (
308313
"SideEffectRecorded", "side_effect_recorded",
309314
"ChildRunCompleted", "child_run_completed",
310315
):
311-
resolved_results.append(serializer.decode(payload.get("result"), codec=payload_codec))
316+
resolved_results.append(_decode_history_result(payload, payload_codec))
312317
elif etype in ("ChildRunFailed", "child_run_failed"):
313318
resolved_results.append(ChildWorkflowFailed(
314319
payload.get("message", "child workflow failed")

tests/integration/test_polyglot.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,22 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
169169
server_cmd2 = cmd2.to_server_command(task_queue)
170170

171171
# Debug: inspect command before asserting
172-
print(f"\n=== Replay outcome ===")
172+
print("\n=== Replay outcome ===")
173173
print(f"Command type: {server_cmd2.get('type')}")
174174
print(f"Full command: {server_cmd2}")
175175
if server_cmd2.get("type") == "fail_workflow":
176176
print(f"Failure message: {server_cmd2.get('message', 'N/A')}")
177177
print(f"Failure details: {server_cmd2.get('details', 'N/A')}")
178178
import json
179179
print(f"History events count: {len(history2)}")
180-
print(f"Last few history events:")
180+
print("Last few history events:")
181181
for evt in history2[-3:]:
182182
print(f" - {evt.get('event_type')}: {json.dumps(evt, indent=2)[:200]}")
183183

184-
assert server_cmd2["type"] == "complete_workflow", \
185-
f"Expected complete_workflow but got {server_cmd2['type']}: {server_cmd2.get('message', 'no error message')}"
184+
assert server_cmd2["type"] == "complete_workflow", (
185+
f"Expected complete_workflow but got {server_cmd2['type']}: "
186+
f"{server_cmd2.get('message', 'no error message')}"
187+
)
186188

187189
# Verify workflow result includes PHP activity output
188190
workflow_result = server_cmd2.get("result", {})

tests/test_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import httpx
77
import pytest
88

9+
from durable_workflow import serializer
910
from durable_workflow.client import CONTROL_PLANE_VERSION, PROTOCOL_VERSION, Client, WorkflowExecution, WorkflowHandle
1011
from durable_workflow.errors import (
1112
InvalidArgument,
@@ -16,7 +17,6 @@
1617
WorkflowAlreadyStarted,
1718
WorkflowNotFound,
1819
)
19-
from durable_workflow import serializer
2020

2121

2222
def _mock_response(status: int = 200, json_data: dict | None = None, text: str = "") -> httpx.Response:
@@ -76,7 +76,6 @@ async def test_success(self, client: Client) -> None:
7676
body = call_args.kwargs.get("json") or call_args[1].get("json")
7777
assert body["workflow_type"] == "greeter"
7878
assert body["input"]["codec"] == "avro"
79-
import json as _json
8079
assert serializer.decode(body["input"]["blob"], codec="avro") == ["hello"]
8180

8281
@pytest.mark.asyncio

tests/test_replay.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from durable_workflow import workflow
3+
from durable_workflow import serializer, workflow
44
from durable_workflow.errors import ChildWorkflowFailed
55
from durable_workflow.workflow import (
66
CompleteWorkflow,
@@ -15,7 +15,6 @@
1515
WorkflowContext,
1616
replay,
1717
)
18-
from durable_workflow import serializer
1918

2019

2120
@workflow.defn(name="simple-return")
@@ -80,6 +79,22 @@ def test_completed_activity_triggers_completion(self) -> None:
8079
assert isinstance(cmd, CompleteWorkflow)
8180
assert cmd.result == {"greeting": "hello, world"}
8281

82+
def test_completed_activity_uses_event_payload_codec(self) -> None:
83+
history = [
84+
{
85+
"event_type": "ActivityCompleted",
86+
"payload": {
87+
"result": serializer.encode("hello, avro", codec="avro"),
88+
"payload_codec": "avro",
89+
},
90+
},
91+
]
92+
outcome = replay(OneActivity, history, ["world"])
93+
assert len(outcome.commands) == 1
94+
cmd = outcome.commands[0]
95+
assert isinstance(cmd, CompleteWorkflow)
96+
assert cmd.result == {"greeting": "hello, avro"}
97+
8398
def test_server_command_shape(self) -> None:
8499
outcome = replay(OneActivity, [], ["world"])
85100
cmd = outcome.commands[0]

tests/test_schedules.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import httpx
77
import pytest
88

9+
from durable_workflow import serializer
910
from durable_workflow.client import (
1011
Client,
1112
ScheduleAction,
@@ -21,7 +22,6 @@
2122
ScheduleAlreadyExists,
2223
ScheduleNotFound,
2324
)
24-
from durable_workflow import serializer
2525

2626

2727
def _mock_response(status: int = 200, json_data: dict | None = None) -> httpx.Response:

0 commit comments

Comments
 (0)