Skip to content

Commit fde2752

Browse files
#334: Python SDK defaults to Avro codec for all new payloads
serializer.encode() and envelope() default to avro instead of json. client.start_workflow() sends avro-encoded input envelopes. client.complete_activity_task() defaults to avro codec. worker unsupported-codec fallback changed from json to avro. Tests updated to expect avro-encoded blobs in outgoing payloads.
1 parent f604f9c commit fde2752

7 files changed

Lines changed: 29 additions & 25 deletions

File tree

src/durable_workflow/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ async def start_workflow(
356356
"workflow_id": workflow_id,
357357
"workflow_type": workflow_type,
358358
"task_queue": task_queue,
359-
"input": {"codec": "json", "blob": encoded_input},
359+
"input": serializer.envelope(input if input is not None else []),
360360
"execution_timeout_seconds": execution_timeout_seconds,
361361
"run_timeout_seconds": run_timeout_seconds,
362362
}
@@ -827,7 +827,7 @@ async def complete_activity_task(
827827
activity_attempt_id: str,
828828
lease_owner: str,
829829
result: Any,
830-
codec: str = serializer.JSON_CODEC,
830+
codec: str = serializer.AVRO_CODEC,
831831
) -> Any:
832832
body: dict[str, Any] = {
833833
"activity_attempt_id": activity_attempt_id,

src/durable_workflow/serializer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
SUPPORTED_CODECS = (JSON_CODEC, AVRO_CODEC)
2525

2626

27-
def encode(value: Any, codec: str = JSON_CODEC) -> str:
27+
def encode(value: Any, codec: str = AVRO_CODEC) -> str:
2828
"""Encode a Python value as a payload blob for *codec*.
2929
3030
Raises ``ValueError`` for unknown codecs and
@@ -40,7 +40,7 @@ def encode(value: Any, codec: str = JSON_CODEC) -> str:
4040
)
4141

4242

43-
def envelope(value: Any, codec: str = JSON_CODEC) -> dict[str, str]:
43+
def envelope(value: Any, codec: str = AVRO_CODEC) -> dict[str, str]:
4444
"""Wrap a value in a ``{codec, blob}`` payload envelope."""
4545
return {"codec": codec, "blob": encode(value, codec=codec)}
4646

src/durable_workflow/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ async def _run_activity_task(self, task: dict[str, Any]) -> None:
230230
attempt_number: int = task.get("attempt_number", 1)
231231
raw_args = task.get("arguments")
232232
inbound_codec = task.get("payload_codec") or serializer.JSON_CODEC
233-
result_codec = inbound_codec if inbound_codec in serializer.SUPPORTED_CODECS else serializer.JSON_CODEC
233+
result_codec = inbound_codec if inbound_codec in serializer.SUPPORTED_CODECS else serializer.AVRO_CODEC
234234
try:
235235
args = serializer.decode_envelope(raw_args, codec=inbound_codec) or []
236236
except AvroNotInstalledError as e:

tests/test_client.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
WorkflowAlreadyStarted,
1717
WorkflowNotFound,
1818
)
19+
from durable_workflow import serializer
1920

2021

2122
def _mock_response(status: int = 200, json_data: dict | None = None, text: str = "") -> httpx.Response:
@@ -74,9 +75,9 @@ async def test_success(self, client: Client) -> None:
7475
call_args = mock.call_args
7576
body = call_args.kwargs.get("json") or call_args[1].get("json")
7677
assert body["workflow_type"] == "greeter"
77-
assert body["input"]["codec"] == "json"
78+
assert body["input"]["codec"] == "avro"
7879
import json as _json
79-
assert _json.loads(body["input"]["blob"]) == ["hello"]
80+
assert serializer.decode(body["input"]["blob"], codec="avro") == ["hello"]
8081

8182
@pytest.mark.asyncio
8283
async def test_duplicate_raises(self, client: Client) -> None:
@@ -142,8 +143,8 @@ async def test_signal(self, client: Client) -> None:
142143
call_args = mock.call_args
143144
assert "/signal/my-signal" in call_args[0][1]
144145
body = call_args.kwargs.get("json") or call_args[1].get("json")
145-
assert body["input"]["codec"] == "json"
146-
assert json.loads(body["input"]["blob"]) == ["data"]
146+
assert body["input"]["codec"] == "avro"
147+
assert serializer.decode(body["input"]["blob"], codec="avro") == ["data"]
147148

148149

149150
class TestCancelWorkflow:
@@ -303,8 +304,8 @@ async def test_update(self, client: Client) -> None:
303304
call_args = mock.call_args
304305
assert "/update/my-update" in call_args[0][1]
305306
body = call_args.kwargs.get("json") or call_args[1].get("json")
306-
assert body["input"]["codec"] == "json"
307-
assert json.loads(body["input"]["blob"]) == ["data"]
307+
assert body["input"]["codec"] == "avro"
308+
assert serializer.decode(body["input"]["blob"], codec="avro") == ["data"]
308309
assert body["wait_for"] == "completed"
309310
assert body["wait_timeout_seconds"] == 10
310311

tests/test_replay.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
WorkflowContext,
1616
replay,
1717
)
18+
from durable_workflow import serializer
1819

1920

2021
@workflow.defn(name="simple-return")
@@ -169,8 +170,8 @@ def test_server_command(self) -> None:
169170
cmd = CompleteWorkflow(result={"key": "val"})
170171
server_cmd = cmd.to_server_command("q")
171172
assert server_cmd["type"] == "complete_workflow"
172-
assert server_cmd["result"]["codec"] == "json"
173-
assert '"key"' in server_cmd["result"]["blob"]
173+
assert server_cmd["result"]["codec"] == "avro"
174+
assert serializer.decode(server_cmd["result"]["blob"], codec="avro") == {"key": "val"}
174175

175176

176177
@workflow.defn(name="continue-as-new-wf")
@@ -273,8 +274,8 @@ def test_server_command_shape(self) -> None:
273274
cmd = RecordSideEffect(result={"key": "val"})
274275
sc = cmd.to_server_command("q")
275276
assert sc["type"] == "record_side_effect"
276-
assert sc["result"]["codec"] == "json"
277-
assert '"key"' in sc["result"]["blob"]
277+
assert sc["result"]["codec"] == "avro"
278+
assert serializer.decode(sc["result"]["blob"], codec="avro") == {"key": "val"}
278279

279280

280281
class TestWorkflowContext:

tests/test_schedules.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ScheduleAlreadyExists,
2222
ScheduleNotFound,
2323
)
24+
from durable_workflow import serializer
2425

2526

2627
def _mock_response(status: int = 200, json_data: dict | None = None) -> httpx.Response:
@@ -71,7 +72,7 @@ def test_full(self) -> None:
7172
d = action.to_dict()
7273
assert d["workflow_type"] == "greeter"
7374
assert d["task_queue"] == "q1"
74-
assert d["input"] == {"codec": "json", "blob": '["hello"]'}
75+
assert d["input"]["codec"] == "avro"
7576
assert d["execution_timeout_seconds"] == 3600
7677
assert d["run_timeout_seconds"] == 600
7778

@@ -108,8 +109,8 @@ async def test_action_input_uses_codec_envelope(self, client: Client) -> None:
108109
)
109110
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
110111
action_input = body["action"]["input"]
111-
assert action_input["codec"] == "json"
112-
assert action_input["blob"] == '["Alice",42]'
112+
assert action_input["codec"] == "avro"
113+
assert serializer.decode(action_input["blob"], codec="avro") == ["Alice", 42]
113114

114115
@pytest.mark.asyncio
115116
async def test_minimal(self, client: Client) -> None:

tests/test_serializer.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
class TestEncode:
1919
def test_list(self) -> None:
20-
assert serializer.encode(["a", 1, True]) == '["a",1,true]'
20+
assert serializer.encode(["a", 1, True], codec="json") == '["a",1,true]'
2121

2222
def test_dict(self) -> None:
23-
assert serializer.encode({"k": "v"}) == '{"k":"v"}'
23+
assert serializer.encode({"k": "v"}, codec="json") == '{"k":"v"}'
2424

2525
def test_none(self) -> None:
26-
assert serializer.encode(None) == "null"
26+
assert serializer.encode(None, codec="json") == "null"
2727

2828

2929
class TestDecode:
3030
def test_roundtrip_list(self) -> None:
31-
assert serializer.decode(serializer.encode(["a", 1, True])) == ["a", 1, True]
31+
assert serializer.decode(serializer.encode(["a", 1, True], codec="json")) == ["a", 1, True]
3232

3333
def test_none_blob(self) -> None:
3434
assert serializer.decode(None) is None
@@ -79,12 +79,13 @@ def test_empty_string_passthrough(self) -> None:
7979
class TestEnvelope:
8080
def test_structure(self) -> None:
8181
env = serializer.envelope(["a", 1])
82-
assert env["codec"] == "json"
83-
assert env["blob"] == '["a",1]'
82+
assert env["codec"] == "avro"
83+
assert env["blob"] == serializer.encode(["a", 1], codec="avro")
8484

8585
def test_none_value(self) -> None:
8686
env = serializer.envelope(None)
87-
assert env == {"codec": "json", "blob": "null"}
87+
assert env["codec"] == "avro"
88+
assert serializer.decode(env["blob"], codec="avro") is None
8889

8990

9091
# Fixture blobs were generated by the PHP workflow package's

0 commit comments

Comments
 (0)