Skip to content

Commit 8bd600c

Browse files
sdk-python: update v2 sdk, client, errors, and more
1 parent f761def commit 8bd600c

5 files changed

Lines changed: 954 additions & 3 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
from . import activity, sync, workflow
22
from .activity import ActivityContext, ActivityInfo
3-
from .client import Client, WorkflowExecution, WorkflowHandle, WorkflowList
3+
from .client import (
4+
Client,
5+
ScheduleAction,
6+
ScheduleBackfillResult,
7+
ScheduleDescription,
8+
ScheduleHandle,
9+
ScheduleList,
10+
ScheduleSpec,
11+
ScheduleTriggerResult,
12+
WorkflowExecution,
13+
WorkflowHandle,
14+
WorkflowList,
15+
)
416
from .errors import (
517
ActivityCancelled,
618
ChildWorkflowFailed,
@@ -9,6 +21,8 @@
921
NamespaceNotFound,
1022
NonRetryableError,
1123
QueryFailed,
24+
ScheduleAlreadyExists,
25+
ScheduleNotFound,
1226
ServerError,
1327
Unauthorized,
1428
UpdateRejected,
@@ -29,14 +43,23 @@
2943
"Client",
3044
"ContinueAsNew",
3145
"NonRetryableError",
46+
"ScheduleAction",
47+
"ScheduleAlreadyExists",
48+
"ScheduleBackfillResult",
49+
"ScheduleDescription",
50+
"ScheduleHandle",
51+
"ScheduleList",
52+
"ScheduleNotFound",
53+
"ScheduleSpec",
54+
"ScheduleTriggerResult",
3255
"StartChildWorkflow",
3356
"Worker",
3457
"WorkflowExecution",
3558
"WorkflowHandle",
3659
"WorkflowList",
37-
"workflow",
3860
"activity",
3961
"sync",
62+
"workflow",
4063
"DurableWorkflowError",
4164
"InvalidArgument",
4265
"NamespaceNotFound",

src/durable_workflow/client.py

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,92 @@ class WorkflowList:
3737
next_page_token: str | None = None
3838

3939

40+
@dataclass
41+
class ScheduleSpec:
42+
cron_expressions: list[str] | None = None
43+
intervals: list[dict[str, str]] | None = None
44+
timezone: str | None = None
45+
46+
def to_dict(self) -> dict[str, Any]:
47+
d: dict[str, Any] = {}
48+
if self.cron_expressions is not None:
49+
d["cron_expressions"] = self.cron_expressions
50+
if self.intervals is not None:
51+
d["intervals"] = self.intervals
52+
if self.timezone is not None:
53+
d["timezone"] = self.timezone
54+
return d
55+
56+
57+
@dataclass
58+
class ScheduleAction:
59+
workflow_type: str
60+
task_queue: str | None = None
61+
input: list[Any] | None = None
62+
execution_timeout_seconds: int | None = None
63+
run_timeout_seconds: int | None = None
64+
65+
def to_dict(self) -> dict[str, Any]:
66+
d: dict[str, Any] = {"workflow_type": self.workflow_type}
67+
if self.task_queue is not None:
68+
d["task_queue"] = self.task_queue
69+
if self.input is not None:
70+
d["input"] = self.input
71+
if self.execution_timeout_seconds is not None:
72+
d["execution_timeout_seconds"] = self.execution_timeout_seconds
73+
if self.run_timeout_seconds is not None:
74+
d["run_timeout_seconds"] = self.run_timeout_seconds
75+
return d
76+
77+
78+
@dataclass
79+
class ScheduleDescription:
80+
schedule_id: str
81+
status: str | None = None
82+
spec: dict[str, Any] | None = None
83+
action: dict[str, Any] | None = None
84+
overlap_policy: str | None = None
85+
note: str | None = None
86+
memo: dict[str, Any] | None = None
87+
search_attributes: dict[str, Any] | None = None
88+
jitter_seconds: int | None = None
89+
max_runs: int | None = None
90+
remaining_actions: int | None = None
91+
fires_count: int = 0
92+
failures_count: int = 0
93+
next_fire_at: str | None = None
94+
last_fired_at: str | None = None
95+
latest_workflow_instance_id: str | None = None
96+
paused_at: str | None = None
97+
created_at: str | None = None
98+
updated_at: str | None = None
99+
info: dict[str, Any] | None = None
100+
101+
102+
@dataclass
103+
class ScheduleList:
104+
schedules: list[ScheduleDescription]
105+
next_page_token: str | None = None
106+
107+
108+
@dataclass
109+
class ScheduleTriggerResult:
110+
schedule_id: str
111+
outcome: str
112+
workflow_id: str | None = None
113+
run_id: str | None = None
114+
reason: str | None = None
115+
buffer_depth: int | None = None
116+
117+
118+
@dataclass
119+
class ScheduleBackfillResult:
120+
schedule_id: str
121+
outcome: str
122+
fires_attempted: int = 0
123+
results: list[dict[str, Any]] | None = None
124+
125+
40126
class WorkflowHandle:
41127
def __init__(self, client: Client, workflow_id: str, run_id: str | None = None, workflow_type: str = "") -> None:
42128
self._client = client
@@ -81,6 +167,62 @@ async def update(
81167
)
82168

83169

170+
class ScheduleHandle:
171+
def __init__(self, client: Client, schedule_id: str) -> None:
172+
self._client = client
173+
self.schedule_id = schedule_id
174+
175+
async def describe(self) -> ScheduleDescription:
176+
return await self._client.describe_schedule(self.schedule_id)
177+
178+
async def update(
179+
self,
180+
*,
181+
spec: ScheduleSpec | None = None,
182+
action: ScheduleAction | None = None,
183+
overlap_policy: str | None = None,
184+
jitter_seconds: int | None = None,
185+
max_runs: int | None = None,
186+
memo: dict[str, Any] | None = None,
187+
search_attributes: dict[str, Any] | None = None,
188+
note: str | None = None,
189+
) -> None:
190+
await self._client.update_schedule(
191+
self.schedule_id,
192+
spec=spec,
193+
action=action,
194+
overlap_policy=overlap_policy,
195+
jitter_seconds=jitter_seconds,
196+
max_runs=max_runs,
197+
memo=memo,
198+
search_attributes=search_attributes,
199+
note=note,
200+
)
201+
202+
async def pause(self, *, note: str | None = None) -> None:
203+
await self._client.pause_schedule(self.schedule_id, note=note)
204+
205+
async def resume(self, *, note: str | None = None) -> None:
206+
await self._client.resume_schedule(self.schedule_id, note=note)
207+
208+
async def trigger(self, *, overlap_policy: str | None = None) -> ScheduleTriggerResult:
209+
return await self._client.trigger_schedule(self.schedule_id, overlap_policy=overlap_policy)
210+
211+
async def delete(self) -> None:
212+
await self._client.delete_schedule(self.schedule_id)
213+
214+
async def backfill(
215+
self,
216+
*,
217+
start_time: str,
218+
end_time: str,
219+
overlap_policy: str | None = None,
220+
) -> ScheduleBackfillResult:
221+
return await self._client.backfill_schedule(
222+
self.schedule_id, start_time=start_time, end_time=end_time, overlap_policy=overlap_policy,
223+
)
224+
225+
84226
class Client:
85227
"""HTTP client for the Durable Workflow server."""
86228

@@ -356,6 +498,184 @@ async def get_result(
356498
)
357499
await asyncio.sleep(poll_interval)
358500

501+
# ── Schedules ─────────────────────────────────────────────────────
502+
def get_schedule_handle(self, schedule_id: str) -> ScheduleHandle:
503+
return ScheduleHandle(self, schedule_id=schedule_id)
504+
505+
async def create_schedule(
506+
self,
507+
*,
508+
schedule_id: str | None = None,
509+
spec: ScheduleSpec,
510+
action: ScheduleAction,
511+
overlap_policy: str | None = None,
512+
jitter_seconds: int | None = None,
513+
max_runs: int | None = None,
514+
memo: dict[str, Any] | None = None,
515+
search_attributes: dict[str, Any] | None = None,
516+
paused: bool = False,
517+
note: str | None = None,
518+
) -> ScheduleHandle:
519+
body: dict[str, Any] = {
520+
"spec": spec.to_dict(),
521+
"action": action.to_dict(),
522+
}
523+
if schedule_id is not None:
524+
body["schedule_id"] = schedule_id
525+
if overlap_policy is not None:
526+
body["overlap_policy"] = overlap_policy
527+
if jitter_seconds is not None:
528+
body["jitter_seconds"] = jitter_seconds
529+
if max_runs is not None:
530+
body["max_runs"] = max_runs
531+
if memo is not None:
532+
body["memo"] = memo
533+
if search_attributes is not None:
534+
body["search_attributes"] = search_attributes
535+
if paused:
536+
body["paused"] = True
537+
if note is not None:
538+
body["note"] = note
539+
data = await self._request("POST", "/schedules", json=body)
540+
sid = data.get("schedule_id", schedule_id or "")
541+
return ScheduleHandle(self, schedule_id=sid)
542+
543+
async def list_schedules(self) -> ScheduleList:
544+
data = await self._request("GET", "/schedules")
545+
items = data.get("schedules", [])
546+
schedules = [
547+
ScheduleDescription(
548+
schedule_id=item.get("schedule_id", ""),
549+
status=item.get("status"),
550+
spec=item.get("spec"),
551+
action=item.get("action"),
552+
overlap_policy=item.get("overlap_policy"),
553+
note=item.get("note"),
554+
fires_count=item.get("fires_count", 0),
555+
next_fire_at=item.get("next_fire_at"),
556+
last_fired_at=item.get("last_fired_at"),
557+
)
558+
for item in items
559+
]
560+
return ScheduleList(
561+
schedules=schedules,
562+
next_page_token=data.get("next_page_token"),
563+
)
564+
565+
async def describe_schedule(self, schedule_id: str) -> ScheduleDescription:
566+
data = await self._request("GET", f"/schedules/{schedule_id}", context=schedule_id)
567+
return ScheduleDescription(
568+
schedule_id=data.get("schedule_id", schedule_id),
569+
status=data.get("status"),
570+
spec=data.get("spec"),
571+
action=data.get("action"),
572+
overlap_policy=data.get("overlap_policy"),
573+
note=data.get("note"),
574+
memo=data.get("memo"),
575+
search_attributes=data.get("search_attributes"),
576+
jitter_seconds=data.get("jitter_seconds"),
577+
max_runs=data.get("max_runs"),
578+
remaining_actions=data.get("remaining_actions"),
579+
fires_count=data.get("fires_count", 0),
580+
failures_count=data.get("failures_count", 0),
581+
next_fire_at=data.get("next_fire_at"),
582+
last_fired_at=data.get("last_fired_at"),
583+
latest_workflow_instance_id=data.get("latest_workflow_instance_id"),
584+
paused_at=data.get("paused_at"),
585+
created_at=data.get("created_at"),
586+
updated_at=data.get("updated_at"),
587+
info=data.get("info"),
588+
)
589+
590+
async def update_schedule(
591+
self,
592+
schedule_id: str,
593+
*,
594+
spec: ScheduleSpec | None = None,
595+
action: ScheduleAction | None = None,
596+
overlap_policy: str | None = None,
597+
jitter_seconds: int | None = None,
598+
max_runs: int | None = None,
599+
memo: dict[str, Any] | None = None,
600+
search_attributes: dict[str, Any] | None = None,
601+
note: str | None = None,
602+
) -> None:
603+
body: dict[str, Any] = {}
604+
if spec is not None:
605+
body["spec"] = spec.to_dict()
606+
if action is not None:
607+
body["action"] = action.to_dict()
608+
if overlap_policy is not None:
609+
body["overlap_policy"] = overlap_policy
610+
if jitter_seconds is not None:
611+
body["jitter_seconds"] = jitter_seconds
612+
if max_runs is not None:
613+
body["max_runs"] = max_runs
614+
if memo is not None:
615+
body["memo"] = memo
616+
if search_attributes is not None:
617+
body["search_attributes"] = search_attributes
618+
if note is not None:
619+
body["note"] = note
620+
await self._request("PUT", f"/schedules/{schedule_id}", json=body, context=schedule_id)
621+
622+
async def pause_schedule(self, schedule_id: str, *, note: str | None = None) -> None:
623+
body: dict[str, Any] = {}
624+
if note is not None:
625+
body["note"] = note
626+
await self._request("POST", f"/schedules/{schedule_id}/pause", json=body, context=schedule_id)
627+
628+
async def resume_schedule(self, schedule_id: str, *, note: str | None = None) -> None:
629+
body: dict[str, Any] = {}
630+
if note is not None:
631+
body["note"] = note
632+
await self._request("POST", f"/schedules/{schedule_id}/resume", json=body, context=schedule_id)
633+
634+
async def trigger_schedule(
635+
self, schedule_id: str, *, overlap_policy: str | None = None
636+
) -> ScheduleTriggerResult:
637+
body: dict[str, Any] = {}
638+
if overlap_policy is not None:
639+
body["overlap_policy"] = overlap_policy
640+
data = await self._request(
641+
"POST", f"/schedules/{schedule_id}/trigger", json=body, context=schedule_id,
642+
)
643+
return ScheduleTriggerResult(
644+
schedule_id=data.get("schedule_id", schedule_id),
645+
outcome=data.get("outcome", ""),
646+
workflow_id=data.get("workflow_id"),
647+
run_id=data.get("run_id"),
648+
reason=data.get("reason"),
649+
buffer_depth=data.get("buffer_depth"),
650+
)
651+
652+
async def delete_schedule(self, schedule_id: str) -> None:
653+
await self._request("DELETE", f"/schedules/{schedule_id}", context=schedule_id)
654+
655+
async def backfill_schedule(
656+
self,
657+
schedule_id: str,
658+
*,
659+
start_time: str,
660+
end_time: str,
661+
overlap_policy: str | None = None,
662+
) -> ScheduleBackfillResult:
663+
body: dict[str, Any] = {
664+
"start_time": start_time,
665+
"end_time": end_time,
666+
}
667+
if overlap_policy is not None:
668+
body["overlap_policy"] = overlap_policy
669+
data = await self._request(
670+
"POST", f"/schedules/{schedule_id}/backfill", json=body, context=schedule_id,
671+
)
672+
return ScheduleBackfillResult(
673+
schedule_id=data.get("schedule_id", schedule_id),
674+
outcome=data.get("outcome", ""),
675+
fires_attempted=data.get("fires_attempted", 0),
676+
results=data.get("results"),
677+
)
678+
359679
# ── Worker protocol ────────────────────────────────────────────────
360680
async def register_worker(
361681
self,

0 commit comments

Comments
 (0)