Skip to content

Commit 95e7809

Browse files
Support worker-applied Python updates
1 parent e033a98 commit 95e7809

5 files changed

Lines changed: 482 additions & 26 deletions

File tree

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
A Python SDK for the [Durable Workflow server](https://github.com/durable-workflow/server). Speaks the server's language-neutral HTTP/JSON worker protocol — no PHP runtime required.
44

5-
Status: **Alpha**. Core features implemented: workflows, activities, schedules, signals, timers, child workflows, continue-as-new, side effects, and version markers. Client calls for queries and updates exist; Python workflow-side query/update receiver metadata is available, while server-routed Python query/update execution is still in progress. Full language-neutral protocol support for cross-PHP/Python orchestration is the release goal.
5+
Status: **Alpha**. Core features implemented: workflows, activities, schedules, signals, timers, child workflows, continue-as-new, side effects, version markers, and worker-applied accepted updates. Client calls for queries and updates exist; Python workflow-side query receiver metadata is available, while server-routed Python query execution and pre-accept update validator routing are still in progress. Full language-neutral protocol support for cross-PHP/Python orchestration is the release goal.
66

77
## Install
88

@@ -126,11 +126,12 @@ class ApprovalWorkflow:
126126
raise ValueError("approved must be boolean")
127127
```
128128

129-
The Python SDK now records query and update receiver metadata on workflow
130-
classes, and exposes a query-state replay helper for future worker-side query
131-
execution. Production server routing for Python query and update handlers is
132-
tracked separately; use the client query/update methods only with server and
133-
worker runtimes that advertise support for the target workflow type.
129+
The Python SDK records query and update receiver metadata on workflow classes,
130+
exposes a query-state replay helper, and applies accepted updates on Python
131+
workflow tasks by emitting `complete_update` or `fail_update` back to the
132+
server. Query routing and synchronous pre-accept update validator execution are
133+
still server-side follow-ups; use those paths only with deployments that
134+
advertise support for the target workflow type.
134135

135136
## Features
136137

src/durable_workflow/worker.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
WORKER_TASKS,
4444
MetricsRecorder,
4545
)
46-
from .workflow import replay
46+
from .workflow import apply_update, replay
4747

4848
log = logging.getLogger("durable_workflow.worker")
4949

@@ -275,6 +275,37 @@ async def _run_workflow_task(self, task: dict[str, Any]) -> list[dict[str, Any]]
275275
log.warning("failed to report unknown workflow type: %s", e)
276276
return None
277277

278+
update_id = task.get("workflow_update_id")
279+
if isinstance(update_id, str) and update_id:
280+
update_command = apply_update(
281+
cls,
282+
history,
283+
start_input,
284+
update_id,
285+
run_id=run_id,
286+
payload_codec=codec,
287+
)
288+
command = update_command.to_server_command(
289+
self.task_queue,
290+
payload_codec=command_codec,
291+
)
292+
log.info(
293+
"completing workflow update task %s for update %s with %s",
294+
task_id,
295+
update_id,
296+
command["type"],
297+
)
298+
try:
299+
await self.client.complete_workflow_task(
300+
task_id=task_id,
301+
lease_owner=self.worker_id,
302+
workflow_task_attempt=attempt,
303+
commands=[command],
304+
)
305+
except Exception as e:
306+
log.warning("failed to complete workflow update task %s: %s", task_id, e)
307+
return [command]
308+
278309
try:
279310
outcome = replay(cls, history, start_input, run_id=run_id, payload_codec=codec)
280311
except AvroNotInstalledError as e:

src/durable_workflow/workflow.py

Lines changed: 227 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,50 @@ def to_server_command(
316316
return cmd
317317

318318

319+
@dataclass
320+
class CompleteUpdate:
321+
"""Worker command completing an accepted workflow update."""
322+
323+
update_id: str
324+
result: Any
325+
326+
def to_server_command(
327+
self, task_queue: str, *, payload_codec: str = serializer.AVRO_CODEC
328+
) -> dict[str, Any]:
329+
return {
330+
"type": "complete_update",
331+
"update_id": self.update_id,
332+
"result": serializer.envelope(self.result, codec=payload_codec),
333+
}
334+
335+
336+
@dataclass
337+
class FailUpdate:
338+
"""Worker command failing an accepted workflow update."""
339+
340+
update_id: str
341+
message: str
342+
exception_type: str | None = None
343+
exception_class: str | None = None
344+
non_retryable: bool = True
345+
346+
def to_server_command(
347+
self, task_queue: str, *, payload_codec: str = serializer.AVRO_CODEC
348+
) -> dict[str, Any]:
349+
cmd: dict[str, Any] = {
350+
"type": "fail_update",
351+
"update_id": self.update_id,
352+
"message": self.message,
353+
}
354+
if self.exception_type is not None:
355+
cmd["exception_type"] = self.exception_type
356+
if self.exception_class is not None:
357+
cmd["exception_class"] = self.exception_class
358+
if self.non_retryable:
359+
cmd["non_retryable"] = True
360+
return cmd
361+
362+
319363
@dataclass
320364
class ContinueAsNew:
321365
"""Workflow return value that starts a new run with fresh history."""
@@ -427,7 +471,7 @@ def to_server_command(
427471

428472
Command = (
429473
ScheduleActivity | StartTimer | CompleteWorkflow | FailWorkflow
430-
| ContinueAsNew | RecordSideEffect | StartChildWorkflow
474+
| CompleteUpdate | FailUpdate | ContinueAsNew | RecordSideEffect | StartChildWorkflow
431475
| RecordVersionMarker | UpsertSearchAttributes
432476
)
433477

@@ -601,6 +645,17 @@ def _decode_signal_args(payload: dict[str, Any], fallback_codec: str | None) ->
601645
return [decoded]
602646

603647

648+
def _decode_update_args(payload: dict[str, Any], fallback_codec: str | None) -> list[Any]:
649+
codec = payload.get("payload_codec") or fallback_codec
650+
raw = payload.get("arguments")
651+
if raw is None:
652+
return []
653+
decoded = serializer.decode_envelope(raw, codec=codec)
654+
if isinstance(decoded, list):
655+
return decoded
656+
return [decoded]
657+
658+
604659
def replay(
605660
workflow_cls: type,
606661
history_events: Iterable[dict[str, Any]],
@@ -665,6 +720,135 @@ def query_state(
665720
raise QueryFailed(str(exc) or f"query {query_name!r} failed") from exc
666721

667722

723+
def apply_update(
724+
workflow_cls: type,
725+
history_events: Iterable[dict[str, Any]],
726+
start_input: list[Any],
727+
update_id: str,
728+
*,
729+
run_id: str = "",
730+
payload_codec: str | None = None,
731+
) -> CompleteUpdate | FailUpdate:
732+
"""Replay current workflow state and run one accepted update handler.
733+
734+
The server remains the durable authority: it accepts the update, sends a
735+
workflow task carrying ``workflow_update_id``, and records
736+
``UpdateApplied`` / ``UpdateCompleted`` when this helper's worker command
737+
is submitted. Python only reconstructs in-memory state and runs the
738+
registered receiver method for the accepted update.
739+
"""
740+
events = list(history_events)
741+
try:
742+
state = _replay_state(
743+
workflow_cls,
744+
events,
745+
start_input,
746+
run_id=run_id,
747+
payload_codec=payload_codec,
748+
)
749+
except Exception as exc:
750+
return _fail_update_from_exception(
751+
update_id,
752+
"workflow replay failed before update",
753+
exc,
754+
)
755+
756+
if state.outcome.commands and isinstance(state.outcome.commands[0], FailWorkflow):
757+
failure = state.outcome.commands[0]
758+
return FailUpdate(
759+
update_id=update_id,
760+
message=f"workflow replay failed before update: {failure.message}",
761+
exception_type=failure.exception_type,
762+
)
763+
764+
accepted = _accepted_update_payload(events, update_id)
765+
if accepted is None:
766+
return FailUpdate(
767+
update_id=update_id,
768+
message=f"accepted update {update_id!r} was not present in workflow history",
769+
exception_type="UpdateNotFound",
770+
)
771+
772+
update_name = accepted.get("update_name")
773+
if not isinstance(update_name, str) or update_name == "":
774+
return FailUpdate(
775+
update_id=update_id,
776+
message=f"accepted update {update_id!r} is missing an update name",
777+
exception_type="InvalidUpdate",
778+
)
779+
780+
try:
781+
args = _decode_update_args(accepted, payload_codec)
782+
except Exception as exc:
783+
return _fail_update_from_exception(update_id, "update argument decode failed", exc)
784+
785+
update_registry: dict[str, str] = getattr(workflow_cls, "__workflow_updates__", {}) or {}
786+
method_name = update_registry.get(update_name)
787+
if method_name is None:
788+
return FailUpdate(
789+
update_id=update_id,
790+
message=f"unknown update {update_name!r}",
791+
exception_type="UnknownUpdate",
792+
)
793+
794+
validator_registry: dict[str, str] = getattr(
795+
workflow_cls,
796+
"__workflow_update_validators__",
797+
{},
798+
) or {}
799+
validator_name = validator_registry.get(update_name)
800+
if validator_name is not None:
801+
validator = getattr(state.instance, validator_name, None)
802+
if validator is None:
803+
return FailUpdate(
804+
update_id=update_id,
805+
message=f"update validator {update_name!r} is not available",
806+
exception_type="UnknownUpdateValidator",
807+
)
808+
try:
809+
validation_result = validator(*args)
810+
if validation_result is False:
811+
raise ValueError(f"update validator {update_name!r} returned false")
812+
except Exception as exc:
813+
return _fail_update_from_exception(update_id, "update validator failed", exc)
814+
815+
handler = getattr(state.instance, method_name, None)
816+
if handler is None:
817+
return FailUpdate(
818+
update_id=update_id,
819+
message=f"update handler {update_name!r} is not available",
820+
exception_type="UnknownUpdate",
821+
)
822+
823+
try:
824+
return CompleteUpdate(update_id=update_id, result=handler(*args))
825+
except Exception as exc:
826+
return _fail_update_from_exception(update_id, "update handler failed", exc)
827+
828+
829+
def _accepted_update_payload(
830+
events: list[dict[str, Any]],
831+
update_id: str,
832+
) -> dict[str, Any] | None:
833+
for event in reversed(events):
834+
if event.get("event_type") not in ("UpdateAccepted", "update_accepted"):
835+
continue
836+
payload = event.get("payload") or {}
837+
if payload.get("update_id") == update_id:
838+
return payload
839+
return None
840+
841+
842+
def _fail_update_from_exception(update_id: str, prefix: str, exc: Exception) -> FailUpdate:
843+
message = str(exc) or type(exc).__name__
844+
return FailUpdate(
845+
update_id=update_id,
846+
message=f"{prefix}: {message}",
847+
exception_type=type(exc).__name__,
848+
exception_class=f"{type(exc).__module__}.{type(exc).__qualname__}",
849+
)
850+
851+
668852
def _replay_state(
669853
workflow_cls: type,
670854
history_events: Iterable[dict[str, Any]],
@@ -692,10 +876,10 @@ def _state(commands: list[Command]) -> _ReplayState:
692876
return _ReplayState(outcome=ReplayOutcome(commands=commands), instance=instance)
693877

694878
resolved_results: list[Any] = []
695-
# (resolved_result_index_before_apply, signal_name, decoded_args) —
696-
# signals apply before the generator consumes the resolved_result at the
697-
# stored index, which preserves history interleaving with activities.
698-
pending_signals: list[tuple[int, str, list[Any]]] = []
879+
# (resolved_result_index_before_apply, receiver_kind, name, decoded_args) —
880+
# external receivers apply before the generator consumes the resolved_result
881+
# at the stored index, preserving history interleaving with activities.
882+
pending_receivers: list[tuple[int, str, str, list[Any]]] = []
699883
for ev in events:
700884
etype = ev.get("event_type")
701885
payload = ev.get("payload") or {}
@@ -719,24 +903,51 @@ def _state(commands: list[Command]) -> _ReplayState:
719903
elif etype in ("SignalReceived", "signal_received"):
720904
signal_name = payload.get("signal_name")
721905
if isinstance(signal_name, str) and signal_name:
722-
pending_signals.append(
723-
(len(resolved_results), signal_name, _decode_signal_args(payload, payload_codec))
906+
pending_receivers.append(
907+
(
908+
len(resolved_results),
909+
"signal",
910+
signal_name,
911+
_decode_signal_args(payload, payload_codec),
912+
)
913+
)
914+
elif etype in ("UpdateApplied", "update_applied"):
915+
update_name = payload.get("update_name")
916+
if isinstance(update_name, str) and update_name:
917+
pending_receivers.append(
918+
(
919+
len(resolved_results),
920+
"update",
921+
update_name,
922+
_decode_update_args(payload, payload_codec),
923+
)
724924
)
725925

726926
signal_registry: dict[str, str] = getattr(workflow_cls, "__workflow_signals__", {}) or {}
727-
728-
def _apply_due_signals() -> None:
729-
while pending_signals and pending_signals[0][0] <= result_cursor:
730-
_, name, args = pending_signals.pop(0)
731-
method_name = signal_registry.get(name)
732-
if method_name is None:
733-
continue
927+
update_registry: dict[str, str] = getattr(workflow_cls, "__workflow_updates__", {}) or {}
928+
929+
def _apply_due_receivers() -> None:
930+
while pending_receivers and pending_receivers[0][0] <= result_cursor:
931+
_, kind, name, args = pending_receivers.pop(0)
932+
if kind == "signal":
933+
method_name = signal_registry.get(name)
934+
if method_name is None:
935+
continue
936+
else:
937+
method_name = update_registry.get(name)
938+
if method_name is None:
939+
raise TypeError(f"unknown update {name!r} in workflow history")
734940
handler = getattr(instance, method_name, None)
735941
if handler is None:
736-
continue
942+
if kind == "signal":
943+
continue
944+
raise TypeError(f"update handler {name!r} is not available")
737945
ctx.logger._set_replaying(True)
738946
handler(*args)
739947

948+
result_cursor = 0
949+
_apply_due_receivers()
950+
740951
gen = instance.run(ctx, *start_input)
741952
if not hasattr(gen, "__next__"):
742953
if isinstance(gen, ContinueAsNew):
@@ -745,14 +956,13 @@ def _apply_due_signals() -> None:
745956

746957
ctx.logger._set_replaying(True)
747958

748-
result_cursor = 0
749959
next_value: Any = None
750960
first = True
751961
pending: list[Command] = []
752962
advanced_cmd: Any = None
753963
try:
754964
while True:
755-
_apply_due_signals()
965+
_apply_due_receivers()
756966
if advanced_cmd is not None:
757967
cmd = advanced_cmd
758968
advanced_cmd = None

0 commit comments

Comments
 (0)