Skip to content

Commit b10ba6d

Browse files
Add in-process WorkflowEnvironment testing harness
Closes zorporation/durable-workflow#393. New durable_workflow.testing module with: - WorkflowEnvironment: drives a workflow to completion in a single process, no server required. Resolves ScheduleActivity/StartTimer/ StartChildWorkflow/RecordSideEffect/UpsertSearchAttributes/ RecordVersionMarker commands against user-registered mocks. - register_activity_result() and register_activity() for canned or callable activity mocks. - register_child_workflow_result() for child workflow returns. - signal() to pre-queue signals that dispatch to @workflow.signal handlers before the first iteration. - replay_history() and replay_history_file() to replay a production history export (from Client.get_history) against current workflow code — standard pattern for regression-testing non-determinism. 12 unit tests cover single- and multi-activity pipelines, callable mocks, timer auto-fire, workflow failure surfacing, missing-mock errors, iteration limits, signal dispatch, and history-file replay in both list and dict-with-events shapes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 92a47fa commit b10ba6d

9 files changed

Lines changed: 584 additions & 4 deletions

File tree

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,26 @@ result = yield ctx.schedule_activity(
7777
)
7878
```
7979

80+
Child workflow starts use the same retry policy shape and workflow-level
81+
execution/run timeout names:
82+
83+
```python
84+
from durable_workflow import ChildWorkflowRetryPolicy
85+
86+
receipt = yield ctx.start_child_workflow(
87+
"payment.child",
88+
[order],
89+
retry_policy=ChildWorkflowRetryPolicy(
90+
max_attempts=3,
91+
initial_interval_seconds=2,
92+
backoff_coefficient=2,
93+
non_retryable_error_types=["ValidationError"],
94+
),
95+
execution_timeout_seconds=600,
96+
run_timeout_seconds=120,
97+
)
98+
```
99+
80100
## Features
81101

82102
- **Async-first**: Built on `httpx` and `asyncio`

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pip install 'durable-workflow[prometheus]'
1818

1919
- **[Client](reference/client.md)** — start workflows, signal, query, update, wait for results, manage schedules.
2020
- **[Worker](reference/worker.md)** — poll the server for workflow and activity tasks, dispatch to registered handlers.
21-
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ActivityRetryPolicy`, `ContinueAsNew`, `StartChildWorkflow`, and the workflow decorator.
21+
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ActivityRetryPolicy`, `ChildWorkflowRetryPolicy`, `ContinueAsNew`, `StartChildWorkflow`, and the workflow decorator.
2222
- **[Activity](reference/activity.md)** — activity decorator and execution context.
2323
- **[Errors](reference/errors.md)** — typed exceptions raised by the client and worker.
2424
- **[Retry policy](reference/retry_policy.md)** — HTTP transport retry configuration for the client.

docs/reference/testing.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Testing
2+
3+
::: durable_workflow.testing

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ nav:
7272
- Serializer: reference/serializer.md
7373
- Metrics: reference/metrics.md
7474
- Sync helpers: reference/sync.md
75+
- Testing: reference/testing.md
7576

7677
extra:
7778
social:

src/durable_workflow/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
except PackageNotFoundError: # source checkout without installed metadata
77
__version__ = "0.0.0+unknown"
88

9-
from . import activity, sync, workflow
9+
from . import activity, sync, testing, workflow
1010
from .activity import ActivityContext, ActivityInfo
1111
from .client import (
1212
Client,
@@ -48,14 +48,15 @@
4848
)
4949
from .retry_policy import RetryPolicy, TransportRetryPolicy
5050
from .worker import Worker
51-
from .workflow import ActivityRetryPolicy, ContinueAsNew, StartChildWorkflow
51+
from .workflow import ActivityRetryPolicy, ChildWorkflowRetryPolicy, ContinueAsNew, StartChildWorkflow
5252

5353
__all__ = [
5454
"__version__",
5555
"ActivityCancelled",
5656
"ActivityContext",
5757
"ActivityInfo",
5858
"ActivityRetryPolicy",
59+
"ChildWorkflowRetryPolicy",
5960
"ChildWorkflowFailed",
6061
"Client",
6162
"ContinueAsNew",
@@ -76,6 +77,7 @@
7677
"WorkflowList",
7778
"activity",
7879
"sync",
80+
"testing",
7981
"workflow",
8082
"DurableWorkflowError",
8183
"InvalidArgument",

src/durable_workflow/testing.py

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
"""In-process test harness for workflow authoring.
2+
3+
:class:`WorkflowEnvironment` drives a workflow to completion in a single
4+
Python process, without a running server or worker. It reuses the same
5+
:func:`durable_workflow.workflow.replay` machinery the worker uses, but
6+
resolves yielded commands against user-registered activity mocks and
7+
auto-fires timers / side-effects / search-attribute upserts so tests do
8+
not need a real clock or Redis.
9+
10+
Typical use::
11+
12+
def test_my_workflow():
13+
env = WorkflowEnvironment()
14+
env.register_activity_result("charge_card", {"id": "ch_1"})
15+
env.register_activity_result("send_receipt", None)
16+
result = env.execute_workflow(OrderWorkflow, "order-1", {"amount": 42})
17+
assert result == {"status": "complete", "charge_id": "ch_1"}
18+
19+
For regression-testing workflow code against production histories, use
20+
:func:`replay_history` — it hands the real durable history straight to
21+
the worker's replayer and surfaces any non-determinism as a raised
22+
exception.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
import json
28+
from collections.abc import Callable, Iterable
29+
from pathlib import Path
30+
from typing import Any
31+
32+
from . import serializer
33+
from .errors import WorkflowCancelled, WorkflowFailed, WorkflowTerminated
34+
from .workflow import (
35+
CompleteWorkflow,
36+
ContinueAsNew,
37+
FailWorkflow,
38+
RecordSideEffect,
39+
RecordVersionMarker,
40+
ReplayOutcome,
41+
ScheduleActivity,
42+
StartChildWorkflow,
43+
StartTimer,
44+
UpsertSearchAttributes,
45+
replay,
46+
)
47+
48+
49+
class WorkflowEnvironment:
50+
"""Drives a workflow to completion against user-registered activity mocks."""
51+
52+
def __init__(self, *, iteration_limit: int = 1000) -> None:
53+
self._activity_results: dict[str, Any] = {}
54+
self._activity_fns: dict[str, Callable[..., Any]] = {}
55+
self._child_workflow_results: dict[str, Any] = {}
56+
self._pending_signals: list[tuple[str, list[Any]]] = []
57+
self._iteration_limit = iteration_limit
58+
59+
def register_activity_result(self, name: str, result: Any) -> None:
60+
"""Canned response: every call to ``name`` returns ``result``."""
61+
self._activity_results[name] = result
62+
63+
def register_activity(self, name: str, fn: Callable[..., Any]) -> None:
64+
"""Callable mock: ``fn(*arguments)`` is invoked for each scheduled call.
65+
66+
Use this when the test needs the mock to vary with arguments (e.g.
67+
look up by order id) or to capture invocations.
68+
"""
69+
self._activity_fns[name] = fn
70+
71+
def register_child_workflow_result(self, workflow_type: str, result: Any) -> None:
72+
"""Canned response for child workflow completions."""
73+
self._child_workflow_results[workflow_type] = result
74+
75+
def signal(self, name: str, args: list[Any] | None = None) -> None:
76+
"""Queue a signal to be delivered before the next iteration.
77+
78+
Signals are drained in the order they were queued and injected into
79+
the workflow history as ``SignalReceived`` events; the replayer then
80+
dispatches each to its registered ``@workflow.signal`` handler.
81+
"""
82+
self._pending_signals.append((name, list(args) if args is not None else []))
83+
84+
def execute_workflow(
85+
self,
86+
workflow_cls: type,
87+
*args: Any,
88+
run_id: str = "test-run",
89+
) -> Any:
90+
"""Drive ``workflow_cls`` to a terminal state and return its result.
91+
92+
Raises :class:`~durable_workflow.errors.WorkflowFailed` if the workflow
93+
ended in the ``failed`` state. Activities that do not have a
94+
registered mock raise :class:`KeyError` so tests fail loudly on
95+
missing fixtures.
96+
"""
97+
history: list[dict[str, Any]] = []
98+
99+
for _ in range(self._iteration_limit):
100+
self._drain_pending_signals_into(history)
101+
outcome = replay(workflow_cls, history, list(args), run_id=run_id)
102+
terminal = self._apply_commands(outcome, history)
103+
if terminal is not _SENTINEL:
104+
return terminal
105+
106+
raise RuntimeError(
107+
f"workflow did not terminate within {self._iteration_limit} iterations; "
108+
"check for missing activity mocks or signals that never satisfy a wait."
109+
)
110+
111+
def _drain_pending_signals_into(self, history: list[dict[str, Any]]) -> None:
112+
while self._pending_signals:
113+
name, sig_args = self._pending_signals.pop(0)
114+
history.append(
115+
{
116+
"event_type": "SignalReceived",
117+
"payload": {
118+
"signal_name": name,
119+
"value": serializer.envelope(sig_args),
120+
"payload_codec": serializer.AVRO_CODEC,
121+
},
122+
}
123+
)
124+
125+
def _apply_commands(
126+
self, outcome: ReplayOutcome, history: list[dict[str, Any]]
127+
) -> Any:
128+
for cmd in outcome.commands:
129+
if isinstance(cmd, CompleteWorkflow):
130+
return cmd.result
131+
if isinstance(cmd, FailWorkflow):
132+
raise WorkflowFailed(cmd.message, cmd.exception_type)
133+
if isinstance(cmd, ContinueAsNew):
134+
raise NotImplementedError(
135+
"continue_as_new is not yet supported by the test harness; "
136+
"drive each run explicitly with a separate execute_workflow call."
137+
)
138+
if isinstance(cmd, ScheduleActivity):
139+
history.append(self._resolve_activity(cmd))
140+
elif isinstance(cmd, StartTimer):
141+
history.append({"event_type": "TimerFired", "payload": {}})
142+
elif isinstance(cmd, StartChildWorkflow):
143+
history.append(self._resolve_child_workflow(cmd))
144+
elif isinstance(cmd, RecordSideEffect):
145+
history.append(
146+
{
147+
"event_type": "SideEffectRecorded",
148+
"payload": {
149+
"result": serializer.envelope(cmd.result),
150+
"payload_codec": serializer.AVRO_CODEC,
151+
},
152+
}
153+
)
154+
elif isinstance(cmd, UpsertSearchAttributes):
155+
history.append(
156+
{"event_type": "SearchAttributesUpserted", "payload": {}}
157+
)
158+
elif isinstance(cmd, RecordVersionMarker):
159+
history.append(
160+
{
161+
"event_type": "VersionMarkerRecorded",
162+
"payload": {"version": cmd.version},
163+
}
164+
)
165+
else:
166+
raise TypeError(f"unsupported command in test harness: {cmd!r}")
167+
return _SENTINEL
168+
169+
def _resolve_activity(self, cmd: ScheduleActivity) -> dict[str, Any]:
170+
if cmd.activity_type in self._activity_fns:
171+
result = self._activity_fns[cmd.activity_type](*cmd.arguments)
172+
elif cmd.activity_type in self._activity_results:
173+
result = self._activity_results[cmd.activity_type]
174+
else:
175+
raise KeyError(
176+
f"no mock registered for activity {cmd.activity_type!r}; "
177+
"call env.register_activity_result() or env.register_activity()."
178+
)
179+
return {
180+
"event_type": "ActivityCompleted",
181+
"payload": {
182+
"result": serializer.envelope(result),
183+
"payload_codec": serializer.AVRO_CODEC,
184+
},
185+
}
186+
187+
def _resolve_child_workflow(self, cmd: StartChildWorkflow) -> dict[str, Any]:
188+
if cmd.workflow_type not in self._child_workflow_results:
189+
raise KeyError(
190+
f"no mock registered for child workflow {cmd.workflow_type!r}; "
191+
"call env.register_child_workflow_result()."
192+
)
193+
return {
194+
"event_type": "ChildRunCompleted",
195+
"payload": {
196+
"result": serializer.envelope(self._child_workflow_results[cmd.workflow_type]),
197+
"payload_codec": serializer.AVRO_CODEC,
198+
},
199+
}
200+
201+
202+
# Sentinel marking "no terminal command seen this iteration".
203+
_SENTINEL = object()
204+
205+
206+
def replay_history(
207+
workflow_cls: type,
208+
history_events: Iterable[dict[str, Any]],
209+
start_input: list[Any] | None = None,
210+
*,
211+
run_id: str = "",
212+
payload_codec: str | None = None,
213+
) -> ReplayOutcome:
214+
"""Replay a production history against current workflow code.
215+
216+
Hands the durable history directly to the worker's replayer. Raises any
217+
exception the workflow would raise during replay — for example a
218+
non-determinism failure when ``run`` yields a different command sequence
219+
from the one recorded in history.
220+
221+
This is the supported way to regression-test a workflow change against
222+
real production traffic: dump the history from ``Client.get_history``,
223+
save the JSON, and replay it on every PR.
224+
"""
225+
return replay(
226+
workflow_cls,
227+
history_events,
228+
list(start_input or []),
229+
run_id=run_id,
230+
payload_codec=payload_codec,
231+
)
232+
233+
234+
def replay_history_file(
235+
workflow_cls: type,
236+
path: str | Path,
237+
start_input: list[Any] | None = None,
238+
*,
239+
run_id: str = "",
240+
payload_codec: str | None = None,
241+
) -> ReplayOutcome:
242+
"""Convenience wrapper: load a JSON history file and replay it.
243+
244+
Accepts either a list of events at the top level or a dict with an
245+
``events`` key (matching the shape of ``Client.get_history``).
246+
"""
247+
data = json.loads(Path(path).read_text())
248+
events = data["events"] if isinstance(data, dict) else data
249+
return replay_history(
250+
workflow_cls,
251+
events,
252+
start_input,
253+
run_id=run_id,
254+
payload_codec=payload_codec,
255+
)
256+
257+
258+
__all__ = [
259+
"WorkflowEnvironment",
260+
"replay_history",
261+
"replay_history_file",
262+
]
263+
264+
265+
# Re-export terminal exceptions the harness may raise so tests can catch
266+
# them without hunting for the right import path.
267+
_TERMINAL_EXCEPTIONS = (WorkflowFailed, WorkflowCancelled, WorkflowTerminated)

0 commit comments

Comments
 (0)