Skip to content

Commit 342eb24

Browse files
committed
Address PR review comments
1 parent 765cf5e commit 342eb24

5 files changed

Lines changed: 32 additions & 18 deletions

File tree

durabletask/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def list_instance_ids(self,
247247
page_size: Optional[int] = None,
248248
continuation_token: Optional[str] = None) -> Page[str]:
249249
req = pb.ListInstanceIdsRequest(
250-
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
250+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
251251
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
252252
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
253253
pageSize=page_size or 0,
@@ -569,7 +569,7 @@ async def list_instance_ids(self,
569569
page_size: Optional[int] = None,
570570
continuation_token: Optional[str] = None) -> Page[str]:
571571
req = pb.ListInstanceIdsRequest(
572-
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
572+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
573573
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
574574
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
575575
pageSize=page_size or 0,

durabletask/history.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from __future__ import annotations
55

66
from dataclasses import asdict, dataclass
7-
from datetime import datetime
7+
from datetime import datetime, timezone
88
from typing import Any, Optional
99

1010
from google.protobuf import json_format
@@ -254,7 +254,7 @@ def to_dict(event: HistoryEvent) -> dict[str, Any]:
254254
def _base_kwargs(event: pb.HistoryEvent) -> dict[str, Any]:
255255
return {
256256
'event_id': event.eventId,
257-
'timestamp': event.timestamp.ToDatetime(),
257+
'timestamp': event.timestamp.ToDatetime(timezone.utc),
258258
}
259259

260260

@@ -266,7 +266,7 @@ def _string_value(msg: Message, field_name: str) -> Optional[str]:
266266

267267
def _timestamp_value(msg: Message, field_name: str) -> Optional[datetime]:
268268
if msg.HasField(field_name):
269-
return getattr(msg, field_name).ToDatetime()
269+
return getattr(msg, field_name).ToDatetime(timezone.utc)
270270
return None
271271

272272

@@ -398,11 +398,11 @@ def _to_serializable(value: Any) -> Any:
398398
),
399399
'timerCreated': lambda event: TimerCreatedEvent(
400400
**_base_kwargs(event),
401-
fire_at=event.timerCreated.fireAt.ToDatetime(),
401+
fire_at=event.timerCreated.fireAt.ToDatetime(timezone.utc),
402402
),
403403
'timerFired': lambda event: TimerFiredEvent(
404404
**_base_kwargs(event),
405-
fire_at=event.timerFired.fireAt.ToDatetime(),
405+
fire_at=event.timerFired.fireAt.ToDatetime(timezone.utc),
406406
timer_id=event.timerFired.timerId,
407407
),
408408
'orchestratorStarted': lambda event: OrchestratorStartedEvent(**_base_kwargs(event)),

durabletask/testing/in_memory_backend.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
or external storage is not desired.
1111
"""
1212

13+
import bisect
1314
import logging
1415
import threading
1516
import time
@@ -98,6 +99,10 @@ class StateWaiter:
9899
result: Optional[OrchestrationInstance] = None
99100

100101

102+
_DEFAULT_PAGE_SIZE = 100
103+
_TOKEN_SEP = '|'
104+
105+
101106
class InMemoryOrchestrationBackend(stubs.TaskHubSidecarServiceServicer):
102107
"""
103108
In-memory backend for durable orchestrations suitable for testing.
@@ -473,19 +478,24 @@ def ListInstanceIds(self, request: pb.ListInstanceIdsRequest, context):
473478
matching.append(instance)
474479

475480
matching.sort(key=lambda i: (i.completed_at, i.instance_id))
481+
sort_keys = [(i.completed_at, i.instance_id) for i in matching]
476482

477483
start_index = 0
478484
if request.HasField("lastInstanceKey") and request.lastInstanceKey.value:
479-
for idx, instance in enumerate(matching):
480-
if instance.instance_id == request.lastInstanceKey.value:
481-
start_index = idx + 1
482-
break
483-
484-
page_size = request.pageSize if request.pageSize > 0 else len(matching)
485+
token = request.lastInstanceKey.value
486+
sep_idx = token.index(_TOKEN_SEP)
487+
token_ts = datetime.fromisoformat(token[:sep_idx]).replace(tzinfo=timezone.utc)
488+
token_id = token[sep_idx + 1:]
489+
# bisect_right positions us just after the cursor entry
490+
start_index = bisect.bisect_right(sort_keys, (token_ts, token_id))
491+
492+
page_size = request.pageSize if request.pageSize > 0 else _DEFAULT_PAGE_SIZE
485493
page = matching[start_index:start_index + page_size]
486494
next_token = None
487495
if start_index + page_size < len(matching) and page:
488-
next_token = wrappers_pb2.StringValue(value=page[-1].instance_id)
496+
last = page[-1]
497+
encoded = f"{last.completed_at.isoformat()}{_TOKEN_SEP}{last.instance_id}"
498+
next_token = wrappers_pb2.StringValue(value=encoded)
489499

490500
return pb.ListInstanceIdsResponse(
491501
instanceIds=[instance.instance_id for instance in page],

tests/durabletask/test_batch_actions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ def test_list_instance_ids_paginates_terminal_instances(backend):
340340
assert set(first_page.items + second_page.items) == {completed_id, failed_id}
341341
assert failed_state is not None
342342
assert failed_state.runtime_status == client.OrchestrationStatus.FAILED
343-
assert first_page.continuation_token in {completed_id, failed_id}
343+
assert first_page.continuation_token is not None
344+
assert any(instance_id in first_page.continuation_token for instance_id in {completed_id, failed_id})
344345
assert second_page.continuation_token is None
345346

346347

tests/durabletask/test_orchestration_e2e.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ def simple(ctx: task.OrchestrationContext, value: int):
144144
w.start()
145145

146146
task_hub_client = client.TaskHubGrpcClient(host_address=HOST)
147-
instance_id = task_hub_client.schedule_new_orchestration(simple, input=1)
148-
state = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30)
149-
events = task_hub_client.get_orchestration_history(instance_id)
147+
try:
148+
instance_id = task_hub_client.schedule_new_orchestration(simple, input=1)
149+
state = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30)
150+
events = task_hub_client.get_orchestration_history(instance_id)
151+
finally:
152+
task_hub_client.close()
150153

151154
assert state is not None
152155
assert len(events) > 0

0 commit comments

Comments
 (0)