Skip to content

Commit 380b2fb

Browse files
committed
Add more tests, fix in-memory backend bug
1 parent 12dafb8 commit 380b2fb

File tree

3 files changed

+248
-14
lines changed

3 files changed

+248
-14
lines changed

durabletask/testing/in_memory_backend.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,11 +1667,10 @@ def _process_rewind_orchestration_action(
16671667
# Identify sub-orchestrations that were created but did not
16681668
# complete successfully — they need to be recursively rewound.
16691669
completed_sub_orch_task_ids: set[int] = set()
1670-
created_sub_orchs: dict[int, str] = {}
1670+
created_sub_orch_events: dict[int, pb.HistoryEvent] = {}
16711671
for event in new_history:
16721672
if event.HasField("subOrchestrationInstanceCreated"):
1673-
created_sub_orchs[event.eventId] = (
1674-
event.subOrchestrationInstanceCreated.instanceId)
1673+
created_sub_orch_events[event.eventId] = event
16751674
elif event.HasField("subOrchestrationInstanceCompleted"):
16761675
completed_sub_orch_task_ids.add(
16771676
event.subOrchestrationInstanceCompleted.taskScheduledId)
@@ -1684,11 +1683,22 @@ def _process_rewind_orchestration_action(
16841683
reason = event.executionRewound.reason.value
16851684
break
16861685

1687-
# Recursively rewind failed sub-orchestrations.
1688-
for task_id, sub_instance_id in created_sub_orchs.items():
1686+
# Recursively rewind failed sub-orchestrations. If the sub was
1687+
# purged (no longer in _instances), re-create it from the
1688+
# subOrchestrationInstanceCreated event so it runs fresh.
1689+
for task_id, event in created_sub_orch_events.items():
16891690
if task_id not in completed_sub_orch_task_ids:
1691+
sub_info = event.subOrchestrationInstanceCreated
1692+
sub_instance_id = sub_info.instanceId
16901693
sub_instance = self._instances.get(sub_instance_id)
1691-
if (sub_instance and sub_instance.status == pb.ORCHESTRATION_STATUS_FAILED):
1694+
if sub_instance is None:
1695+
# Sub-orchestration was purged — re-create it.
1696+
sub_name = sub_info.name
1697+
sub_input = sub_info.input.value if sub_info.HasField("input") else None
1698+
sub_version = sub_info.version.value if sub_info.HasField("version") else None
1699+
self._create_instance_internal(
1700+
sub_instance_id, sub_name, sub_input, version=sub_version)
1701+
elif sub_instance.status == pb.ORCHESTRATION_STATUS_FAILED:
16921702
self._prepare_rewind(sub_instance, reason)
16931703
self._watch_sub_orchestration(
16941704
instance.instance_id, sub_instance_id, task_id)

tests/durabletask-azuremanaged/test_dts_rewind_e2e.py

Lines changed: 118 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,15 @@ def test_rewind_preserves_successful_results():
8888
rewind should re-execute only the failed activity while the successful
8989
result is replayed from history."""
9090
call_tracker: dict[str, int] = {"first": 0, "second": 0}
91+
should_fail_second = True
9192

9293
def first_activity(_: task.ActivityContext, input: str) -> str:
9394
call_tracker["first"] += 1
9495
return f"first:{input}"
9596

9697
def second_activity(_: task.ActivityContext, input: str) -> str:
9798
call_tracker["second"] += 1
98-
if call_tracker["second"] == 1:
99+
if should_fail_second:
99100
raise RuntimeError("Temporary failure")
100101
return f"second:{input}"
101102

@@ -120,7 +121,8 @@ def orchestrator(ctx: task.OrchestrationContext, input: str):
120121
assert state is not None
121122
assert state.runtime_status == client.OrchestrationStatus.FAILED
122123

123-
# Rewind – second_activity will now succeed on retry.
124+
# Fix second_activity so it now succeeds, then rewind.
125+
should_fail_second = False
124126
c.rewind_orchestration(instance_id, reason="retry")
125127
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
126128

@@ -130,8 +132,8 @@ def orchestrator(ctx: task.OrchestrationContext, input: str):
130132
assert state.failure_details is None
131133
# first_activity should NOT be re-executed – its result is replayed.
132134
assert call_tracker["first"] == 1
133-
# second_activity was called twice (once failed, once succeeded).
134-
assert call_tracker["second"] == 2
135+
# second_activity was called at least twice (once failed, once succeeded).
136+
assert call_tracker["second"] >= 2
135137

136138

137139
def test_rewind_not_found():
@@ -210,6 +212,118 @@ def parent_orchestrator(ctx: task.OrchestrationContext, input: str):
210212
assert sub_call_count == 2
211213

212214

215+
def test_rewind_purged_sub_orchestration():
216+
"""A purged sub-orchestration is re-run when the parent is rewound.
217+
218+
Flow: parent orchestrator -> calls sub-orchestrator -> sub-orchestrator
219+
fails -> parent fails -> client purges the sub-orchestration -> client
220+
rewinds the parent -> parent re-schedules the sub-orchestration which
221+
now succeeds -> parent completes.
222+
"""
223+
child_call_count = 0
224+
225+
def child_activity(_: task.ActivityContext, input: str) -> str:
226+
nonlocal child_call_count
227+
child_call_count += 1
228+
if child_call_count == 1:
229+
raise RuntimeError("Child failure")
230+
return f"child:{input}"
231+
232+
def child_orchestrator(ctx: task.OrchestrationContext, input: str):
233+
result = yield ctx.call_activity(child_activity, input=input)
234+
return result
235+
236+
def parent_orchestrator(ctx: task.OrchestrationContext, input: str):
237+
result = yield ctx.call_sub_orchestrator(
238+
child_orchestrator, input=input, instance_id="sub-orch-to-purge")
239+
return f"parent:{result}"
240+
241+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
242+
taskhub=taskhub_name, token_credential=None) as w:
243+
w.add_orchestrator(parent_orchestrator)
244+
w.add_orchestrator(child_orchestrator)
245+
w.add_activity(child_activity)
246+
w.start()
247+
248+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
249+
taskhub=taskhub_name, token_credential=None)
250+
instance_id = c.schedule_new_orchestration(
251+
parent_orchestrator, input="data")
252+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
253+
254+
# Parent should fail because child failed.
255+
assert state is not None
256+
assert state.runtime_status == client.OrchestrationStatus.FAILED
257+
258+
# Purge the sub-orchestration so it must be completely re-run.
259+
c.purge_orchestration("sub-orch-to-purge")
260+
261+
# Rewind the parent – child will be re-scheduled and succeed.
262+
c.rewind_orchestration(instance_id, reason="purge and retry")
263+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
264+
265+
assert state is not None
266+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
267+
assert state.serialized_output == json.dumps("parent:child:data")
268+
assert child_call_count == 2
269+
270+
271+
def test_rewind_does_not_rerun_successful_activities():
272+
"""Successful activities must not be re-executed during rewind.
273+
274+
The orchestration calls two activities in sequence. The first
275+
succeeds and the second fails. After rewind, only the failed
276+
activity is retried; the successful activity's result is replayed
277+
from history and its body is never called again.
278+
"""
279+
success_call_count = 0
280+
fail_call_count = 0
281+
282+
def success_activity(_: task.ActivityContext, input: str) -> str:
283+
nonlocal success_call_count
284+
success_call_count += 1
285+
return f"ok:{input}"
286+
287+
def fail_activity(_: task.ActivityContext, input: str) -> str:
288+
nonlocal fail_call_count
289+
fail_call_count += 1
290+
if fail_call_count == 1:
291+
raise RuntimeError("Temporary failure")
292+
return f"recovered:{input}"
293+
294+
def orchestrator(ctx: task.OrchestrationContext, input: str):
295+
r1 = yield ctx.call_activity(success_activity, input=input)
296+
r2 = yield ctx.call_activity(fail_activity, input=input)
297+
return [r1, r2]
298+
299+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
300+
taskhub=taskhub_name, token_credential=None) as w:
301+
w.add_orchestrator(orchestrator)
302+
w.add_activity(success_activity)
303+
w.add_activity(fail_activity)
304+
w.start()
305+
306+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
307+
taskhub=taskhub_name, token_credential=None)
308+
instance_id = c.schedule_new_orchestration(orchestrator, input="v")
309+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
310+
311+
assert state is not None
312+
assert state.runtime_status == client.OrchestrationStatus.FAILED
313+
314+
# Rewind – only the failed activity should be retried.
315+
c.rewind_orchestration(instance_id, reason="retry")
316+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
317+
318+
assert state is not None
319+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
320+
assert state.serialized_output == json.dumps(["ok:v", "recovered:v"])
321+
# The successful activity must have been called exactly once.
322+
assert success_call_count == 1
323+
# The failing activity was called twice (once failed, once succeeded).
324+
assert fail_call_count == 2
325+
326+
213327
def test_rewind_without_reason():
214328
"""Rewind should work when no reason is provided."""
215329
call_count = 0

tests/durabletask/test_rewind_e2e.py

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,15 @@ def test_rewind_preserves_successful_results():
9494
_reset_counters()
9595

9696
call_tracker: dict[str, int] = {"first": 0, "second": 0}
97+
should_fail_second = True
9798

9899
def first_activity(_: task.ActivityContext, input: str) -> str:
99100
call_tracker["first"] += 1
100101
return f"first:{input}"
101102

102103
def second_activity(_: task.ActivityContext, input: str) -> str:
103104
call_tracker["second"] += 1
104-
if call_tracker["second"] == 1:
105+
if should_fail_second:
105106
raise RuntimeError("Temporary failure")
106107
return f"second:{input}"
107108

@@ -124,7 +125,8 @@ def orchestrator(ctx: task.OrchestrationContext, input: str):
124125
assert state is not None
125126
assert state.runtime_status == client.OrchestrationStatus.FAILED
126127

127-
# Rewind – second_activity will now succeed on retry.
128+
# Fix second_activity so it now succeeds, then rewind.
129+
should_fail_second = False
128130
c.rewind_orchestration(instance_id, reason="retry")
129131
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
130132

@@ -134,8 +136,8 @@ def orchestrator(ctx: task.OrchestrationContext, input: str):
134136
assert state.failure_details is None
135137
# first_activity should NOT be re-executed – its result is replayed.
136138
assert call_tracker["first"] == 1
137-
# second_activity was called twice (once failed, once succeeded).
138-
assert call_tracker["second"] == 2
139+
# second_activity was called at least twice (once failed, once succeeded).
140+
assert call_tracker["second"] >= 2
139141

140142

141143
def test_rewind_not_found():
@@ -247,6 +249,114 @@ def orchestrator(ctx: task.OrchestrationContext, _):
247249
assert state.serialized_output == json.dumps("ok")
248250

249251

252+
def test_rewind_purged_sub_orchestration():
253+
"""A purged sub-orchestration is re-run when the parent is rewound.
254+
255+
Flow: parent orchestrator -> calls sub-orchestrator -> sub-orchestrator
256+
fails -> parent fails -> client purges the sub-orchestration -> client
257+
rewinds the parent -> parent re-schedules the sub-orchestration which
258+
now succeeds -> parent completes.
259+
"""
260+
child_call_count = 0
261+
262+
def child_activity(_: task.ActivityContext, input: str) -> str:
263+
nonlocal child_call_count
264+
child_call_count += 1
265+
if child_call_count == 1:
266+
raise RuntimeError("Child failure")
267+
return f"child:{input}"
268+
269+
def child_orchestrator(ctx: task.OrchestrationContext, input: str):
270+
result = yield ctx.call_activity(child_activity, input=input)
271+
return result
272+
273+
def parent_orchestrator(ctx: task.OrchestrationContext, input: str):
274+
result = yield ctx.call_sub_orchestrator(
275+
child_orchestrator, input=input, instance_id="sub-orch-to-purge")
276+
return f"parent:{result}"
277+
278+
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
279+
w.add_orchestrator(parent_orchestrator)
280+
w.add_orchestrator(child_orchestrator)
281+
w.add_activity(child_activity)
282+
w.start()
283+
284+
c = client.TaskHubGrpcClient(host_address=HOST)
285+
instance_id = c.schedule_new_orchestration(
286+
parent_orchestrator, input="data")
287+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
288+
289+
# Parent should fail because child failed.
290+
assert state is not None
291+
assert state.runtime_status == client.OrchestrationStatus.FAILED
292+
293+
# Purge the sub-orchestration so it must be completely re-run.
294+
c.purge_orchestration("sub-orch-to-purge")
295+
296+
# Rewind the parent – child will be re-scheduled and succeed.
297+
c.rewind_orchestration(instance_id, reason="purge and retry")
298+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
299+
300+
assert state is not None
301+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
302+
assert state.serialized_output == json.dumps("parent:child:data")
303+
assert child_call_count == 2
304+
305+
306+
def test_rewind_does_not_rerun_successful_activities():
307+
"""Successful activities must not be re-executed during rewind.
308+
309+
The orchestration calls two activities in sequence. The first
310+
succeeds and the second fails. After rewind, only the failed
311+
activity is retried; the successful activity's result is replayed
312+
from history and its body is never called again.
313+
"""
314+
success_call_count = 0
315+
fail_call_count = 0
316+
317+
def success_activity(_: task.ActivityContext, input: str) -> str:
318+
nonlocal success_call_count
319+
success_call_count += 1
320+
return f"ok:{input}"
321+
322+
def fail_activity(_: task.ActivityContext, input: str) -> str:
323+
nonlocal fail_call_count
324+
fail_call_count += 1
325+
if fail_call_count == 1:
326+
raise RuntimeError("Temporary failure")
327+
return f"recovered:{input}"
328+
329+
def orchestrator(ctx: task.OrchestrationContext, input: str):
330+
r1 = yield ctx.call_activity(success_activity, input=input)
331+
r2 = yield ctx.call_activity(fail_activity, input=input)
332+
return [r1, r2]
333+
334+
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
335+
w.add_orchestrator(orchestrator)
336+
w.add_activity(success_activity)
337+
w.add_activity(fail_activity)
338+
w.start()
339+
340+
c = client.TaskHubGrpcClient(host_address=HOST)
341+
instance_id = c.schedule_new_orchestration(orchestrator, input="v")
342+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
343+
344+
assert state is not None
345+
assert state.runtime_status == client.OrchestrationStatus.FAILED
346+
347+
# Rewind – only the failed activity should be retried.
348+
c.rewind_orchestration(instance_id, reason="retry")
349+
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
350+
351+
assert state is not None
352+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
353+
assert state.serialized_output == json.dumps(["ok:v", "recovered:v"])
354+
# The successful activity must have been called exactly once.
355+
assert success_call_count == 1
356+
# The failing activity was called twice (once failed, once succeeded).
357+
assert fail_call_count == 2
358+
359+
250360
def test_rewind_twice():
251361
"""Rewind the same orchestration twice after it fails a second time.
252362

0 commit comments

Comments
 (0)