Skip to content

Commit 91057ab

Browse files
committed
PR Feedback
1 parent 7eb6a8e commit 91057ab

File tree

2 files changed

+17
-40
lines changed

2 files changed

+17
-40
lines changed

durabletask/worker.py

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,26 +1328,11 @@ def execute(
13281328
has_rewind_in_new = any(
13291329
e.HasField("executionRewound") for e in new_events
13301330
)
1331-
if has_rewind_in_new:
1332-
from itertools import chain
1333-
has_execution_completed = any(
1334-
e.HasField("executionCompleted")
1335-
for e in chain(old_events, new_events)
1336-
)
1337-
if has_execution_completed:
1338-
# The orchestration completed (with failure) and needs
1339-
# rewinding — short-circuit to build clean history.
1340-
return self._build_rewind_result(
1341-
instance_id, orchestration_name, old_events, new_events)
1342-
1343-
# During replay, remove executionCompleted events from the
1344-
# committed history so the orchestrator function replays
1345-
# cleanly. orchestratorCompleted events are kept as they
1346-
# bookend each replay batch.
1347-
old_events = [
1348-
e for e in old_events
1349-
if not e.HasField("executionCompleted")
1350-
]
1331+
if has_rewind_in_new and any(e.HasField("executionCompleted") for e in old_events):
1332+
# The orchestration completed (with failure) and needs
1333+
# rewinding — short-circuit to build clean history.
1334+
return self._build_rewind_result(
1335+
instance_id, orchestration_name, old_events, new_events)
13511336

13521337
ctx = _RuntimeOrchestrationContext(instance_id, self._registry)
13531338
try:
@@ -1439,19 +1424,14 @@ def _build_rewind_result(
14391424
f"{instance_id}: Orchestration {orchestration_name} is being rewound"
14401425
)
14411426

1442-
# Combine old + new events into a single timeline, then remove
1443-
# failed results so the orchestration can replay successfully.
1444-
all_events = list(old_events) + list(new_events)
1427+
if len(new_events) != 2 or not new_events[1].HasField("executionRewound"):
1428+
raise ValueError(
1429+
"When rewinding an orchestration, the new events list must contain exactly two events: orchestratorStarted and the executionRewound event."
1430+
)
14451431

1446-
# Extract the executionRewound event from new_events so we can
1447-
# read its parentExecutionId (set when this is a sub-orchestration
1448-
# being rewound by its parent).
1449-
rewind_event: pb.ExecutionRewoundEvent | None = None
1450-
for e in new_events:
1451-
if e.HasField("executionRewound"):
1452-
rewind_event = e.executionRewound
1453-
break
1432+
rewind_event: pb.ExecutionRewoundEvent = new_events[1].executionRewound
14541433

1434+
all_events = list(old_events) + list(new_events)
14551435
# Generate a new execution ID for the rewound execution.
14561436
new_execution_id = uuid.uuid4().hex
14571437

@@ -1485,11 +1465,10 @@ def _build_rewind_result(
14851465
event_copy.CopyFrom(event)
14861466
event_copy.executionStarted.orchestrationInstance.executionId.CopyFrom(
14871467
ph.get_string_value_or_empty(new_execution_id))
1488-
if rewind_event is not None:
1489-
if rewind_event.HasField("parentExecutionId"):
1490-
if rewind_event.parentExecutionId.value:
1491-
event_copy.executionStarted.parentInstance.orchestrationInstance.executionId.CopyFrom(
1492-
rewind_event.parentExecutionId)
1468+
if rewind_event.HasField("parentExecutionId"):
1469+
if rewind_event.parentExecutionId.value:
1470+
event_copy.executionStarted.parentInstance.orchestrationInstance.executionId.CopyFrom(
1471+
rewind_event.parentExecutionId)
14931472
clean_history.append(event_copy)
14941473
continue
14951474

@@ -1925,7 +1904,7 @@ def process_event(
19251904
# Bookend event for each replay batch — no action needed.
19261905
pass
19271906
elif event.HasField("executionCompleted"):
1928-
# Terminal marker event — no action needed during replay.
1907+
# Terminal marker event — in practice, this never appears during replay.
19291908
pass
19301909
elif event.HasField("executionRewound"):
19311910
# Informational event added when an orchestration is rewound. No action needed.

tests/durabletask/test_rewind_e2e.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
# Licensed under the MIT License.
33

44
import json
5-
import threading
6-
import time
75

86
import pytest
97

@@ -51,7 +49,7 @@ def test_rewind_failed_activity():
5149
_reset_counters()
5250

5351
def failing_activity(_: task.ActivityContext, input: str) -> str:
54-
global _activity_call_count, _should_fail
52+
global _activity_call_count
5553
_activity_call_count += 1
5654
if _should_fail:
5755
raise RuntimeError("Simulated failure")

0 commit comments

Comments
 (0)