Skip to content

Commit e033a98

Browse files
Add Python query and update receiver metadata
1 parent b10ba6d commit e033a98

5 files changed

Lines changed: 395 additions & 18 deletions

File tree

README.md

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
A Python SDK for the [Durable Workflow server](https://github.com/durable-workflow/server). Speaks the server's language-neutral HTTP/JSON worker protocol — no PHP runtime required.
44

5-
Status: **Alpha**. Core features implemented: workflows, activities, schedules, signals, queries, updates, timers, child workflows, continue-as-new, side effects, and version markers. Full language-neutral protocol support for cross-PHP/Python orchestration.
5+
Status: **Alpha**. Core features implemented: workflows, activities, schedules, signals, timers, child workflows, continue-as-new, side effects, and version markers. Client calls for queries and updates exist; Python workflow-side query/update receiver metadata is available, while server-routed Python query/update execution is still in progress. Full language-neutral protocol support for cross-PHP/Python orchestration is the release goal.
66

77
## Install
88

@@ -97,6 +97,41 @@ receipt = yield ctx.start_child_workflow(
9797
)
9898
```
9999

100+
## Workflow signals, queries, and updates
101+
102+
Signals mutate workflow state during replay:
103+
104+
```python
105+
@workflow.defn(name="approval")
106+
class ApprovalWorkflow:
107+
def __init__(self) -> None:
108+
self.approved = False
109+
110+
@workflow.signal("approve")
111+
def approve(self, by: str) -> None:
112+
self.approved = True
113+
114+
@workflow.query("status")
115+
def status(self) -> dict:
116+
return {"approved": self.approved}
117+
118+
@workflow.update("set_approval")
119+
def set_approval(self, approved: bool) -> dict:
120+
self.approved = approved
121+
return {"approved": self.approved}
122+
123+
@set_approval.validator
124+
def validate_set_approval(self, approved: bool) -> None:
125+
if not isinstance(approved, bool):
126+
raise ValueError("approved must be boolean")
127+
```
128+
129+
The Python SDK now records query and update receiver metadata on workflow
130+
classes, and exposes a query-state replay helper for future worker-side query
131+
execution. Production server routing for Python query and update handlers is
132+
tracked separately; use the client query/update methods only with server and
133+
worker runtimes that advertise support for the target workflow type.
134+
100135
## Features
101136

102137
- **Async-first**: Built on `httpx` and `asyncio`

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pip install 'durable-workflow[prometheus]'
1818

1919
- **[Client](reference/client.md)** — start workflows, signal, query, update, wait for results, manage schedules.
2020
- **[Worker](reference/worker.md)** — poll the server for workflow and activity tasks, dispatch to registered handlers.
21-
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ActivityRetryPolicy`, `ChildWorkflowRetryPolicy`, `ContinueAsNew`, `StartChildWorkflow`, and the workflow decorator.
21+
- **[Workflow](reference/workflow.md)** — workflow-side primitives: `ActivityRetryPolicy`, `ChildWorkflowRetryPolicy`, `ContinueAsNew`, `StartChildWorkflow`, signal/query/update decorators, and query-state replay helpers.
2222
- **[Activity](reference/activity.md)** — activity decorator and execution context.
2323
- **[Errors](reference/errors.md)** — typed exceptions raised by the client and worker.
2424
- **[Retry policy](reference/retry_policy.md)** — HTTP transport retry configuration for the client.

src/durable_workflow/workflow.py

Lines changed: 159 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,46 @@
2828
from typing import Any
2929

3030
from . import serializer
31-
from .errors import ChildWorkflowFailed
31+
from .errors import ChildWorkflowFailed, QueryFailed
3232

3333
_REGISTRY: dict[str, type] = {}
3434

3535

3636
def defn(*, name: str): # type: ignore[no-untyped-def]
3737
"""Register a class as a workflow type under a language-neutral name.
3838
39-
Scans the class for ``@signal``-decorated methods and builds a signal
40-
registry at decoration time so the replayer can dispatch incoming
41-
signals without re-inspecting the class on every history event.
39+
Scans the class for ``@signal``, ``@query``, and ``@update`` decorated
40+
methods and builds registries at decoration time so worker-side dispatch
41+
can use stable receiver names without re-inspecting the class on every
42+
history event or control-plane request.
4243
"""
4344

4445
def wrap(cls: type) -> type:
4546
cls.__workflow_name__ = name # type: ignore[attr-defined]
4647
signals: dict[str, str] = {}
48+
queries: dict[str, str] = {}
49+
updates: dict[str, str] = {}
50+
update_validators: dict[str, str] = {}
4751
for attr in dir(cls):
4852
if attr.startswith("_"):
4953
continue
5054
member = getattr(cls, attr, None)
5155
signal_name = getattr(member, "__signal_name__", None)
5256
if isinstance(signal_name, str) and signal_name:
5357
signals[signal_name] = attr
58+
query_name = getattr(member, "__query_name__", None)
59+
if isinstance(query_name, str) and query_name:
60+
queries[query_name] = attr
61+
update_name = getattr(member, "__update_name__", None)
62+
if isinstance(update_name, str) and update_name:
63+
updates[update_name] = attr
64+
update_validator_name = getattr(member, "__update_validator_name__", None)
65+
if isinstance(update_validator_name, str) and update_validator_name:
66+
update_validators[update_validator_name] = attr
5467
cls.__workflow_signals__ = signals # type: ignore[attr-defined]
68+
cls.__workflow_queries__ = queries # type: ignore[attr-defined]
69+
cls.__workflow_updates__ = updates # type: ignore[attr-defined]
70+
cls.__workflow_update_validators__ = update_validators # type: ignore[attr-defined]
5571
_REGISTRY[name] = cls
5672
return cls
5773

@@ -87,6 +103,60 @@ def wrap(method: Callable[..., Any]) -> Callable[..., Any]:
87103
return wrap
88104

89105

106+
def query(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
107+
"""Mark a workflow method as a read-only query handler.
108+
109+
Query methods are invoked against replayed workflow state. They must not
110+
mutate ``self`` or perform I/O. The server-side worker query transport is
111+
still implemented separately; this decorator records the Python receiver
112+
metadata and is used by :func:`query_state`.
113+
"""
114+
115+
def wrap(method: Callable[..., Any]) -> Callable[..., Any]:
116+
method.__query_name__ = name # type: ignore[attr-defined]
117+
return method
118+
119+
return wrap
120+
121+
122+
def update(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
123+
"""Mark a workflow method as an update handler.
124+
125+
The returned function also exposes ``.validator`` for the common pattern::
126+
127+
@workflow.update("approve")
128+
def approve(self, approved: bool) -> dict: ...
129+
130+
@approve.validator
131+
def validate_approve(self, approved: bool) -> None: ...
132+
133+
This release records receiver metadata only. The server-side Python update
134+
execution transport is tracked separately.
135+
"""
136+
137+
def wrap(method: Callable[..., Any]) -> Callable[..., Any]:
138+
method.__update_name__ = name # type: ignore[attr-defined]
139+
140+
def validator(validator_method: Callable[..., Any]) -> Callable[..., Any]:
141+
validator_method.__update_validator_name__ = name # type: ignore[attr-defined]
142+
return validator_method
143+
144+
method.validator = validator # type: ignore[attr-defined]
145+
return method
146+
147+
return wrap
148+
149+
150+
def update_validator(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
151+
"""Mark a workflow method as the validator for an update name."""
152+
153+
def wrap(method: Callable[..., Any]) -> Callable[..., Any]:
154+
method.__update_validator_name__ = name # type: ignore[attr-defined]
155+
return method
156+
157+
return wrap
158+
159+
90160
def registry() -> dict[str, type]:
91161
"""Return a copy of workflow types registered in this process."""
92162
return dict(_REGISTRY)
@@ -505,6 +575,12 @@ class ReplayOutcome:
505575
commands: list[Command]
506576

507577

578+
@dataclass
579+
class _ReplayState:
580+
outcome: ReplayOutcome
581+
instance: Any
582+
583+
508584
def _decode_history_result(payload: dict[str, Any], fallback_codec: str | None) -> Any:
509585
codec = payload.get("payload_codec") or fallback_codec
510586
return serializer.decode_envelope(payload.get("result"), codec=codec)
@@ -533,6 +609,70 @@ def replay(
533609
run_id: str = "",
534610
payload_codec: str | None = None,
535611
) -> ReplayOutcome:
612+
return _replay_state(
613+
workflow_cls,
614+
history_events,
615+
start_input,
616+
run_id=run_id,
617+
payload_codec=payload_codec,
618+
).outcome
619+
620+
621+
def query_state(
622+
workflow_cls: type,
623+
history_events: Iterable[dict[str, Any]],
624+
start_input: list[Any],
625+
query_name: str,
626+
args: list[Any] | None = None,
627+
*,
628+
run_id: str = "",
629+
payload_codec: str | None = None,
630+
) -> Any:
631+
"""Replay a workflow to current state and invoke a registered query.
632+
633+
This is the Python-side core that a future server-routed query task can
634+
call after fetching durable history. Unknown query names and handler
635+
exceptions are normalized to :class:`~durable_workflow.errors.QueryFailed`.
636+
"""
637+
try:
638+
state = _replay_state(
639+
workflow_cls,
640+
history_events,
641+
start_input,
642+
run_id=run_id,
643+
payload_codec=payload_codec,
644+
)
645+
except Exception as exc:
646+
raise QueryFailed(f"workflow replay failed before query: {exc}") from exc
647+
if state.outcome.commands and isinstance(state.outcome.commands[0], FailWorkflow):
648+
failure = state.outcome.commands[0]
649+
raise QueryFailed(f"workflow replay failed before query: {failure.message}") from None
650+
651+
query_registry: dict[str, str] = getattr(workflow_cls, "__workflow_queries__", {}) or {}
652+
method_name = query_registry.get(query_name)
653+
if method_name is None:
654+
raise QueryFailed(f"unknown query {query_name!r}")
655+
656+
handler = getattr(state.instance, method_name, None)
657+
if handler is None:
658+
raise QueryFailed(f"query handler {query_name!r} is not available")
659+
660+
try:
661+
return handler(*(list(args) if args is not None else []))
662+
except QueryFailed:
663+
raise
664+
except Exception as exc:
665+
raise QueryFailed(str(exc) or f"query {query_name!r} failed") from exc
666+
667+
668+
def _replay_state(
669+
workflow_cls: type,
670+
history_events: Iterable[dict[str, Any]],
671+
start_input: list[Any],
672+
*,
673+
run_id: str = "",
674+
payload_codec: str | None = None,
675+
) -> _ReplayState:
536676
events = list(history_events)
537677

538678
workflow_start_time: datetime | None = None
@@ -548,6 +688,9 @@ def replay(
548688
instance = workflow_cls()
549689
ctx = WorkflowContext(run_id=run_id, current_time=workflow_start_time)
550690

691+
def _state(commands: list[Command]) -> _ReplayState:
692+
return _ReplayState(outcome=ReplayOutcome(commands=commands), instance=instance)
693+
551694
resolved_results: list[Any] = []
552695
# (resolved_result_index_before_apply, signal_name, decoded_args) —
553696
# signals apply before the generator consumes the resolved_result at the
@@ -597,8 +740,8 @@ def _apply_due_signals() -> None:
597740
gen = instance.run(ctx, *start_input)
598741
if not hasattr(gen, "__next__"):
599742
if isinstance(gen, ContinueAsNew):
600-
return ReplayOutcome(commands=[gen])
601-
return ReplayOutcome(commands=[CompleteWorkflow(result=gen)])
743+
return _state([gen])
744+
return _state([CompleteWorkflow(result=gen)])
602745

603746
ctx.logger._set_replaying(True)
604747

@@ -631,15 +774,15 @@ def _apply_due_signals() -> None:
631774
continue
632775
except StopIteration as stop:
633776
if isinstance(stop.value, ContinueAsNew):
634-
return ReplayOutcome(commands=[stop.value])
635-
return ReplayOutcome(commands=[CompleteWorkflow(result=stop.value)])
777+
return _state([stop.value])
778+
return _state([CompleteWorkflow(result=stop.value)])
636779
next_value = vals
637780
continue
638781
ctx.logger._set_replaying(False)
639782
pending.extend(cmd)
640-
return ReplayOutcome(commands=pending)
783+
return _state(pending)
641784
if isinstance(cmd, ContinueAsNew):
642-
return ReplayOutcome(commands=[cmd])
785+
return _state([cmd])
643786
if isinstance(cmd, RecordSideEffect):
644787
if result_cursor < len(resolved_results):
645788
next_value = resolved_results[result_cursor]
@@ -678,20 +821,20 @@ def _apply_due_signals() -> None:
678821
continue
679822
except StopIteration as stop:
680823
if isinstance(stop.value, ContinueAsNew):
681-
return ReplayOutcome(commands=[stop.value])
682-
return ReplayOutcome(commands=[CompleteWorkflow(result=stop.value)])
824+
return _state([stop.value])
825+
return _state([CompleteWorkflow(result=stop.value)])
683826
next_value = val
684827
continue
685828
ctx.logger._set_replaying(False)
686829
pending.append(cmd)
687-
return ReplayOutcome(commands=pending)
830+
return _state(pending)
688831
raise TypeError(f"workflow yielded unsupported command: {cmd!r}")
689832
except StopIteration as stop:
690833
if isinstance(stop.value, ContinueAsNew):
691-
return ReplayOutcome(commands=pending + [stop.value])
692-
return ReplayOutcome(commands=pending + [CompleteWorkflow(result=stop.value)])
834+
return _state(pending + [stop.value])
835+
return _state(pending + [CompleteWorkflow(result=stop.value)])
693836
except Exception as exc:
694-
return ReplayOutcome(commands=[FailWorkflow(
837+
return _state([FailWorkflow(
695838
message=str(exc),
696839
exception_type=type(exc).__name__,
697840
)])

0 commit comments

Comments
 (0)