|
6 | 6 | from dataclasses import dataclass |
7 | 7 | from datetime import datetime |
8 | 8 | from enum import Enum |
9 | | -from typing import Any, List, Optional, Sequence, TypeVar, Union |
| 9 | +from typing import Any, Generic, List, Optional, Sequence, TypeVar, Union |
10 | 10 |
|
11 | 11 | import grpc |
12 | 12 | import grpc.aio |
13 | 13 |
|
| 14 | +import durabletask.history as history |
14 | 15 | from durabletask.entities import EntityInstanceId |
15 | 16 | from durabletask.entities.entity_metadata import EntityMetadata |
16 | 17 | from durabletask.grpc_options import GrpcChannelOptions |
17 | 18 | import durabletask.internal.helpers as helpers |
| 19 | +import durabletask.internal.history_helpers as history_helpers |
18 | 20 | import durabletask.internal.orchestrator_service_pb2 as pb |
19 | 21 | import durabletask.internal.orchestrator_service_pb2_grpc as stubs |
20 | 22 | import durabletask.internal.shared as shared |
|
38 | 40 |
|
39 | 41 | TInput = TypeVar('TInput') |
40 | 42 | TOutput = TypeVar('TOutput') |
| 43 | +TItem = TypeVar('TItem') |
41 | 44 |
|
42 | 45 |
|
43 | 46 | class OrchestrationStatus(Enum): |
@@ -100,6 +103,12 @@ class PurgeInstancesResult: |
100 | 103 | is_complete: bool |
101 | 104 |
|
102 | 105 |
|
| 106 | +@dataclass |
| 107 | +class Page(Generic[TItem]): |
| 108 | + items: List[TItem] |
| 109 | + continuation_token: Optional[str] |
| 110 | + |
| 111 | + |
103 | 112 | @dataclass |
104 | 113 | class CleanEntityStorageResult: |
105 | 114 | empty_entities_removed: int |
@@ -230,6 +239,44 @@ def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = Tr |
230 | 239 | payload_helpers.deexternalize_payloads(res, self._payload_store) |
231 | 240 | return new_orchestration_state(req.instanceId, res) |
232 | 241 |
|
| 242 | + def get_orchestration_history(self, |
| 243 | + instance_id: str, *, |
| 244 | + execution_id: Optional[str] = None, |
| 245 | + for_work_item_processing: bool = False) -> List[history.HistoryEvent]: |
| 246 | + req = pb.StreamInstanceHistoryRequest( |
| 247 | + instanceId=instance_id, |
| 248 | + executionId=helpers.get_string_value(execution_id), |
| 249 | + forWorkItemProcessing=for_work_item_processing, |
| 250 | + ) |
| 251 | + self._logger.info(f"Retrieving history for instance '{instance_id}'.") |
| 252 | + stream = self._stub.StreamInstanceHistory(req) |
| 253 | + return history_helpers.collect_history_events(stream, self._payload_store) |
| 254 | + |
| 255 | + def list_instance_ids(self, |
| 256 | + runtime_status: Optional[List[OrchestrationStatus]] = None, |
| 257 | + completed_time_from: Optional[datetime] = None, |
| 258 | + completed_time_to: Optional[datetime] = None, |
| 259 | + page_size: Optional[int] = None, |
| 260 | + continuation_token: Optional[str] = None) -> Page[str]: |
| 261 | + req = pb.ListInstanceIdsRequest( |
| 262 | + runtimeStatus=[status.value for status in runtime_status] if runtime_status else [], |
| 263 | + completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None, |
| 264 | + completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None, |
| 265 | + pageSize=page_size or 0, |
| 266 | + lastInstanceKey=helpers.get_string_value(continuation_token), |
| 267 | + ) |
| 268 | + self._logger.info( |
| 269 | + "Listing terminal instance IDs with filters: " |
| 270 | + f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, " |
| 271 | + f"completed_time_from={completed_time_from}, " |
| 272 | + f"completed_time_to={completed_time_to}, " |
| 273 | + f"page_size={page_size}, " |
| 274 | + f"continuation_token={continuation_token}" |
| 275 | + ) |
| 276 | + resp: pb.ListInstanceIdsResponse = self._stub.ListInstanceIds(req) |
| 277 | + next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None |
| 278 | + return Page(items=list(resp.instanceIds), continuation_token=next_token) |
| 279 | + |
233 | 280 | def get_all_orchestration_states(self, |
234 | 281 | orchestration_query: Optional[OrchestrationQuery] = None |
235 | 282 | ) -> List[OrchestrationState]: |
@@ -525,6 +572,44 @@ async def get_orchestration_state(self, instance_id: str, *, |
525 | 572 | await payload_helpers.deexternalize_payloads_async(res, self._payload_store) |
526 | 573 | return new_orchestration_state(req.instanceId, res) |
527 | 574 |
|
| 575 | + async def get_orchestration_history(self, |
| 576 | + instance_id: str, *, |
| 577 | + execution_id: Optional[str] = None, |
| 578 | + for_work_item_processing: bool = False) -> List[history.HistoryEvent]: |
| 579 | + req = pb.StreamInstanceHistoryRequest( |
| 580 | + instanceId=instance_id, |
| 581 | + executionId=helpers.get_string_value(execution_id), |
| 582 | + forWorkItemProcessing=for_work_item_processing, |
| 583 | + ) |
| 584 | + self._logger.info(f"Retrieving history for instance '{instance_id}'.") |
| 585 | + stream = self._stub.StreamInstanceHistory(req) |
| 586 | + return await history_helpers.collect_history_events_async(stream, self._payload_store) |
| 587 | + |
| 588 | + async def list_instance_ids(self, |
| 589 | + runtime_status: Optional[List[OrchestrationStatus]] = None, |
| 590 | + completed_time_from: Optional[datetime] = None, |
| 591 | + completed_time_to: Optional[datetime] = None, |
| 592 | + page_size: Optional[int] = None, |
| 593 | + continuation_token: Optional[str] = None) -> Page[str]: |
| 594 | + req = pb.ListInstanceIdsRequest( |
| 595 | + runtimeStatus=[status.value for status in runtime_status] if runtime_status else [], |
| 596 | + completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None, |
| 597 | + completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None, |
| 598 | + pageSize=page_size or 0, |
| 599 | + lastInstanceKey=helpers.get_string_value(continuation_token), |
| 600 | + ) |
| 601 | + self._logger.info( |
| 602 | + "Listing terminal instance IDs with filters: " |
| 603 | + f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, " |
| 604 | + f"completed_time_from={completed_time_from}, " |
| 605 | + f"completed_time_to={completed_time_to}, " |
| 606 | + f"page_size={page_size}, " |
| 607 | + f"continuation_token={continuation_token}" |
| 608 | + ) |
| 609 | + resp: pb.ListInstanceIdsResponse = await self._stub.ListInstanceIds(req) |
| 610 | + next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None |
| 611 | + return Page(items=list(resp.instanceIds), continuation_token=next_token) |
| 612 | + |
528 | 613 | async def get_all_orchestration_states(self, |
529 | 614 | orchestration_query: Optional[OrchestrationQuery] = None |
530 | 615 | ) -> List[OrchestrationState]: |
|
0 commit comments