Skip to content

Commit 87dc5f7

Browse files
committed
PR Feedback
1 parent 9777a36 commit 87dc5f7

File tree

7 files changed

+71
-24
lines changed

7 files changed

+71
-24
lines changed

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[flake8]
2-
ignore = E501,C901
2+
ignore = E501,C901,W503
33
exclude =
44
.git
55
*_pb2*

docs/supported-patterns.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
6464
# Orders of $1000 or more require manager approval
6565
yield ctx.call_activity(send_approval_request, input=order)
6666

67-
# Approvals must be received within 24 hours or they will be canceled.
67+
# Approvals must be received within 24 hours or they will be cancelled.
6868
approval_event = ctx.wait_for_external_event("approval_received")
6969
timeout_event = ctx.create_timer(timedelta(hours=24))
7070
winner = yield task.when_any([approval_event, timeout_event])
7171
if winner == timeout_event:
72-
return "Canceled"
72+
return "Cancelled"
7373

7474
# The order was approved
7575
yield ctx.call_activity(place_order, input=order)

durabletask/task.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
228228
"""
229229
pass
230230

231-
# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
231+
# TOOD: Add a timeout parameter, which allows the task to be cancelled if the event is
232232
# not received within the specified timeout. This requires support for task cancellation.
233233
@abstractmethod
234234
def wait_for_external_event(self, name: str) -> CancellableTask:
@@ -324,8 +324,8 @@ class OrchestrationStateError(Exception):
324324
pass
325325

326326

327-
class TaskCanceledError(Exception):
328-
"""Exception type for canceled orchestration tasks."""
327+
class TaskCancelledError(Exception):
328+
"""Exception type for cancelled orchestration tasks."""
329329

330330

331331
class Task(ABC, Generic[T]):
@@ -440,7 +440,7 @@ def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]):
440440

441441

442442
class CancellableTask(CompletableTask[T]):
443-
"""A completable task that can be canceled before it finishes."""
443+
"""A completable task that can be cancelled before it finishes."""
444444

445445
def __init__(self) -> None:
446446
super().__init__()
@@ -449,12 +449,12 @@ def __init__(self) -> None:
449449

450450
@property
451451
def is_cancelled(self) -> bool:
452-
"""Returns True if the task was canceled, False otherwise."""
452+
"""Returns True if the task was cancelled, False otherwise."""
453453
return self._is_cancelled
454454

455455
def get_result(self) -> T:
456456
if self._is_cancelled:
457-
raise TaskCanceledError('The task was canceled.')
457+
raise TaskCancelledError('The task was cancelled.')
458458
return super().get_result()
459459

460460
def set_cancel_handler(self, cancel_handler: Callable[[], None]) -> None:
@@ -524,27 +524,26 @@ class TimerTask(CancellableTask[None]):
524524
def set_retryable_parent(self, retryable_task: RetryableTask):
525525
self._retryable_parent = retryable_task
526526

527-
def complete(self, *args, **kwargs):
527+
def complete(self, _: datetime) -> None:
528528
super().complete(None)
529529

530-
531530
class LongTimerTask(TimerTask):
532-
def __init__(self, final_fire_at: datetime, maximum_timer_duration: timedelta):
531+
def __init__(self, final_fire_at: datetime, maximum_timer_interval: timedelta):
533532
super().__init__()
534533
self._final_fire_at = final_fire_at
535-
self._maximum_timer_duration = maximum_timer_duration
534+
self._maximum_timer_interval = maximum_timer_interval
536535

537536
def start(self, current_utc_datetime: datetime) -> datetime:
538537
return self._get_next_fire_at(current_utc_datetime)
539538

540-
def complete(self, current_utc_datetime: datetime):
539+
def complete(self, current_utc_datetime: datetime) -> Optional[datetime]:
541540
if current_utc_datetime < self._final_fire_at:
542541
return self._get_next_fire_at(current_utc_datetime)
543-
super().complete(None)
542+
return super().complete(current_utc_datetime)
544543

545544
def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
546-
if current_utc_datetime + self._maximum_timer_duration < self._final_fire_at:
547-
return current_utc_datetime + self._maximum_timer_duration
545+
if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at:
546+
return current_utc_datetime + self._maximum_timer_interval
548547
return self._final_fire_at
549548

550549

durabletask/worker.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,9 @@ def _execute_orchestrator(
646646
completionToken,
647647
):
648648
try:
649-
executor = _OrchestrationExecutor(self._registry, self._logger)
649+
executor = _OrchestrationExecutor(self._registry,
650+
self._logger,
651+
self.maximum_timer_interval)
650652
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
651653
res = pb.OrchestratorResponse(
652654
instanceId=req.instanceId,
@@ -1029,7 +1031,7 @@ def set_custom_status(self, custom_status: Any) -> None:
10291031
shared.to_json(custom_status) if custom_status is not None else None
10301032
)
10311033

1032-
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
1034+
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.CancellableTask:
10331035
return self.create_timer_internal(fire_at)
10341036

10351037
def create_timer_internal(
@@ -1045,7 +1047,11 @@ def create_timer_internal(
10451047

10461048
next_fire_at: datetime = final_fire_at
10471049

1048-
if self._maximum_timer_interval is not None and self.current_utc_datetime + self._maximum_timer_interval < final_fire_at:
1050+
if (
1051+
self._maximum_timer_interval is not None
1052+
and self._maximum_timer_interval > timedelta(0)
1053+
and self.current_utc_datetime + self._maximum_timer_interval < final_fire_at
1054+
):
10491055
timer_task = task.LongTimerTask(final_fire_at, self._maximum_timer_interval)
10501056
next_fire_at = timer_task.start(self.current_utc_datetime)
10511057
else:

examples/human_interaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
4848
# Orders of $1000 or more require manager approval
4949
yield ctx.call_activity(send_approval_request, input=order)
5050

51-
# Approvals must be received within 24 hours or they will be canceled.
51+
# Approvals must be received within 24 hours or they will be cancelled.
5252
approval_event = ctx.wait_for_external_event("approval_received")
5353
timeout_event = ctx.create_timer(timedelta(hours=24))
5454
winner = yield task.when_any([approval_event, timeout_event])

tests/durabletask/test_orchestration_e2e.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,3 +654,45 @@ def orchestrator(ctx: task.OrchestrationContext, _):
654654
assert state.failure_details is None
655655
expected = "approved" if winning_event == "Approve" else "rejected"
656656
assert state.serialized_output == json.dumps(expected)
657+
658+
659+
def test_long_timer_chunking():
660+
"""Verify that a timer longer than maximum_timer_interval is broken into
661+
intermediate chunks and that the orchestration completes correctly.
662+
663+
The worker is configured with a 2-second maximum_timer_interval. The
664+
orchestrator requests a 5-second timer, which requires 3 chunks
665+
(0→2s, 2→4s, 4→5s). Each chunk causes a full orchestrator replay, so
666+
the orchestrator function is invoked once for the initial scheduling and
667+
once more for each timerFired event — 4 invocations in total. Asserting
668+
invocation_count >= 4 confirms that intermediate chunks actually fired
669+
rather than the timer being scheduled as a single unit.
670+
"""
671+
672+
invocation_count = 0
673+
674+
def orchestrator(ctx: task.OrchestrationContext, _):
675+
nonlocal invocation_count
676+
invocation_count += 1
677+
yield ctx.create_timer(timedelta(seconds=5))
678+
return "done"
679+
680+
with worker.TaskHubGrpcWorker(
681+
host_address=HOST,
682+
maximum_timer_interval=timedelta(seconds=2),
683+
) as w:
684+
w.add_orchestrator(orchestrator)
685+
w.start()
686+
687+
task_hub_client = client.TaskHubGrpcClient(host_address=HOST)
688+
id = task_hub_client.schedule_new_orchestration(orchestrator)
689+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
690+
691+
assert state is not None
692+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
693+
assert state.failure_details is None
694+
assert state.serialized_output == json.dumps("done")
695+
# 3 chunks (0→2s, 2→4s, 4→5s) produce 4 total orchestrator invocations
696+
# (initial scheduling + one replay per timerFired). >= 4 proves that at
697+
# least two intermediate chunk timers fired rather than one direct timer.
698+
assert invocation_count >= 4

tests/durabletask/test_orchestration_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,15 +380,15 @@ def dummy_activity(ctx, _):
380380
assert not hasattr(activity_task, "is_cancelled")
381381

382382

383-
def test_cancelled_task_get_result_raises_task_canceled_error():
384-
"""Tests that canceled cancellable tasks raise TaskCanceledError from get_result."""
383+
def test_cancelled_task_get_result_raises_task_cancelled_error():
384+
"""Tests that cancelled cancellable tasks raise TaskCancelledError from get_result."""
385385

386386
cancellable_task = task.CancellableTask()
387387

388388
assert cancellable_task.cancel() is True
389389
assert cancellable_task.is_cancelled is True
390390

391-
with pytest.raises(task.TaskCanceledError):
391+
with pytest.raises(task.TaskCancelledError):
392392
cancellable_task.get_result()
393393

394394

0 commit comments

Comments
 (0)