Skip to content

Commit 5d99305

Browse files
Add @workflow.signal decorator and in-workflow signal dispatch
Addresses zorporation/durable-workflow#390 (signal portion). Workflows can now declare @workflow.signal("name") handler methods; the replayer scans for SignalReceived history events, decodes the envelope, and dispatches to the handler in history order — interleaved with activity completions so handler-mutated state is visible to subsequent yields. Queries and updates are scoped out of this change and tracked as follow-up issues. Queries need a worker-side query-execution endpoint since the server cannot replay Python workflows; updates need UpdateAccepted/UpdateApplied history event processing plus a validator contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d6c6330 commit 5d99305

2 files changed

Lines changed: 242 additions & 1 deletion

File tree

src/durable_workflow/workflow.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,59 @@
3434

3535

3636
def defn(*, name: str): # type: ignore[no-untyped-def]
37-
"""Register a class as a workflow type under a language-neutral name."""
37+
"""Register a class as a workflow type under a language-neutral name.
38+
39+
Scans the class for ``@signal``-decorated methods and builds a signal
40+
registry at decoration time so the replayer can dispatch incoming
41+
signals without re-inspecting the class on every history event.
42+
"""
3843

3944
def wrap(cls: type) -> type:
4045
cls.__workflow_name__ = name # type: ignore[attr-defined]
46+
signals: dict[str, str] = {}
47+
for attr in dir(cls):
48+
if attr.startswith("_"):
49+
continue
50+
member = getattr(cls, attr, None)
51+
signal_name = getattr(member, "__signal_name__", None)
52+
if isinstance(signal_name, str) and signal_name:
53+
signals[signal_name] = attr
54+
cls.__workflow_signals__ = signals # type: ignore[attr-defined]
4155
_REGISTRY[name] = cls
4256
return cls
4357

4458
return wrap
4559

4660

61+
def signal(name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
62+
"""Mark a workflow method as the handler for an external signal.
63+
64+
Example::
65+
66+
@workflow.defn(name="approval")
67+
class ApprovalWorkflow:
68+
def __init__(self) -> None:
69+
self.approved: bool = False
70+
71+
@workflow.signal("approve")
72+
def on_approve(self, by: str) -> None:
73+
self.approved = True
74+
75+
The decorated method is called by the replayer when a matching
76+
``SignalReceived`` history event is observed, with the signal's
77+
decoded arguments unpacked into positional parameters. Handler return
78+
values are ignored; to expose state back to the workflow's main run
79+
loop, mutate ``self.*`` attributes (as ``on_approve`` does above) and
80+
yield the usual commands from ``run()``.
81+
"""
82+
83+
def wrap(method: Callable[..., Any]) -> Callable[..., Any]:
84+
method.__signal_name__ = name # type: ignore[attr-defined]
85+
return method
86+
87+
return wrap
88+
89+
4790
def registry() -> dict[str, type]:
4891
"""Return a copy of workflow types registered in this process."""
4992
return dict(_REGISTRY)
@@ -428,6 +471,21 @@ def _decode_history_result(payload: dict[str, Any], fallback_codec: str | None)
428471
return serializer.decode_envelope(payload.get("result"), codec=codec)
429472

430473

474+
def _decode_signal_args(payload: dict[str, Any], fallback_codec: str | None) -> list[Any]:
475+
codec = payload.get("payload_codec") or fallback_codec
476+
raw = payload.get("value")
477+
if raw is None:
478+
raw = payload.get("input")
479+
if raw is None:
480+
raw = payload.get("arguments")
481+
if raw is None:
482+
return []
483+
decoded = serializer.decode_envelope(raw, codec=codec)
484+
if isinstance(decoded, list):
485+
return decoded
486+
return [decoded]
487+
488+
431489
def replay(
432490
workflow_cls: type,
433491
history_events: Iterable[dict[str, Any]],
@@ -452,6 +510,10 @@ def replay(
452510
ctx = WorkflowContext(run_id=run_id, current_time=workflow_start_time)
453511

454512
resolved_results: list[Any] = []
513+
# (resolved_result_index_before_apply, signal_name, decoded_args) —
514+
# signals apply before the generator consumes the resolved_result at the
515+
# stored index, which preserves history interleaving with activities.
516+
pending_signals: list[tuple[int, str, list[Any]]] = []
455517
for ev in events:
456518
etype = ev.get("event_type")
457519
payload = ev.get("payload") or {}
@@ -472,6 +534,26 @@ def replay(
472534
resolved_results.append(payload.get("version", 0))
473535
elif etype in ("SearchAttributesUpserted", "search_attributes_upserted"):
474536
resolved_results.append(None)
537+
elif etype in ("SignalReceived", "signal_received"):
538+
signal_name = payload.get("signal_name")
539+
if isinstance(signal_name, str) and signal_name:
540+
pending_signals.append(
541+
(len(resolved_results), signal_name, _decode_signal_args(payload, payload_codec))
542+
)
543+
544+
signal_registry: dict[str, str] = getattr(workflow_cls, "__workflow_signals__", {}) or {}
545+
546+
def _apply_due_signals() -> None:
547+
while pending_signals and pending_signals[0][0] <= result_cursor:
548+
_, name, args = pending_signals.pop(0)
549+
method_name = signal_registry.get(name)
550+
if method_name is None:
551+
continue
552+
handler = getattr(instance, method_name, None)
553+
if handler is None:
554+
continue
555+
ctx.logger._set_replaying(True)
556+
handler(*args)
475557

476558
gen = instance.run(ctx, *start_input)
477559
if not hasattr(gen, "__next__"):
@@ -488,6 +570,7 @@ def replay(
488570
advanced_cmd: Any = None
489571
try:
490572
while True:
573+
_apply_due_signals()
491574
if advanced_cmd is not None:
492575
cmd = advanced_cmd
493576
advanced_cmd = None

tests/test_signals.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
from __future__ import annotations
2+
3+
from typing import Any
4+
5+
from durable_workflow import serializer, workflow
6+
from durable_workflow.workflow import (
7+
CompleteWorkflow,
8+
ScheduleActivity,
9+
WorkflowContext,
10+
replay,
11+
)
12+
13+
14+
@workflow.defn(name="approval-workflow")
15+
class ApprovalWorkflow:
16+
def __init__(self) -> None:
17+
self.approved: bool = False
18+
self.approvals: list[str] = []
19+
20+
@workflow.signal("approve")
21+
def on_approve(self, by: str) -> None:
22+
self.approved = True
23+
self.approvals.append(by)
24+
25+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
26+
yield ctx.schedule_activity("wait_for_approval", [])
27+
return {"approved": self.approved, "approvals": list(self.approvals)}
28+
29+
30+
@workflow.defn(name="typed-signal-workflow")
31+
class TypedSignalWorkflow:
32+
def __init__(self) -> None:
33+
self.count: int = 0
34+
35+
@workflow.signal("increment")
36+
def on_increment(self, amount: int) -> None:
37+
self.count += amount
38+
39+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
40+
yield ctx.schedule_activity("tick", [])
41+
yield ctx.schedule_activity("tick", [])
42+
return self.count
43+
44+
45+
class TestSignalDecoratorRegistry:
46+
def test_defn_collects_signal_methods_into_registry(self) -> None:
47+
signals = ApprovalWorkflow.__workflow_signals__ # type: ignore[attr-defined]
48+
49+
assert signals == {"approve": "on_approve"}
50+
51+
def test_signal_decorator_sets_signal_name_marker(self) -> None:
52+
assert ApprovalWorkflow.on_approve.__signal_name__ == "approve" # type: ignore[attr-defined]
53+
54+
def test_workflow_without_signal_methods_has_empty_registry(self) -> None:
55+
@workflow.defn(name="no-signals")
56+
class PlainWorkflow:
57+
def run(self, ctx: WorkflowContext) -> str:
58+
return "done"
59+
60+
assert PlainWorkflow.__workflow_signals__ == {} # type: ignore[attr-defined]
61+
62+
63+
def _activity_completed_event(result: Any) -> dict[str, Any]:
64+
return {
65+
"event_type": "ActivityCompleted",
66+
"payload": {"result": serializer.envelope(result), "payload_codec": serializer.AVRO_CODEC},
67+
}
68+
69+
70+
def _signal_received_event(name: str, args: list[Any]) -> dict[str, Any]:
71+
return {
72+
"event_type": "SignalReceived",
73+
"payload": {
74+
"signal_name": name,
75+
"value": serializer.envelope(args),
76+
"payload_codec": serializer.AVRO_CODEC,
77+
},
78+
}
79+
80+
81+
class TestSignalDispatchDuringReplay:
82+
def test_signal_mutates_workflow_state_before_completion(self) -> None:
83+
events = [
84+
_signal_received_event("approve", ["alice"]),
85+
_activity_completed_event("done"),
86+
]
87+
88+
outcome = replay(ApprovalWorkflow, events, [])
89+
90+
# Workflow returns — CompleteWorkflow emitted with mutated state.
91+
assert len(outcome.commands) == 1
92+
assert isinstance(outcome.commands[0], CompleteWorkflow)
93+
assert outcome.commands[0].result == {"approved": True, "approvals": ["alice"]}
94+
95+
def test_multiple_signals_apply_in_history_order(self) -> None:
96+
events = [
97+
_signal_received_event("increment", [3]),
98+
_activity_completed_event(None),
99+
_signal_received_event("increment", [5]),
100+
_activity_completed_event(None),
101+
]
102+
103+
outcome = replay(TypedSignalWorkflow, events, [])
104+
105+
assert len(outcome.commands) == 1
106+
assert isinstance(outcome.commands[0], CompleteWorkflow)
107+
assert outcome.commands[0].result == 8
108+
109+
def test_signal_before_any_activity_still_applies(self) -> None:
110+
events = [
111+
_signal_received_event("approve", ["bob"]),
112+
]
113+
114+
outcome = replay(ApprovalWorkflow, events, [])
115+
116+
# Activity has not completed yet — workflow yields ScheduleActivity and
117+
# the signal has already been applied to instance state.
118+
assert len(outcome.commands) == 1
119+
assert isinstance(outcome.commands[0], ScheduleActivity)
120+
121+
def test_unknown_signal_is_silently_dropped(self) -> None:
122+
events = [
123+
_signal_received_event("not_a_real_signal", ["payload"]),
124+
_activity_completed_event("done"),
125+
]
126+
127+
outcome = replay(ApprovalWorkflow, events, [])
128+
129+
assert len(outcome.commands) == 1
130+
assert isinstance(outcome.commands[0], CompleteWorkflow)
131+
# Handler for "approve" was never invoked because no matching signal
132+
# arrived — state remains the default.
133+
assert outcome.commands[0].result == {"approved": False, "approvals": []}
134+
135+
def test_signal_with_no_args_decodes_to_empty_handler_call(self) -> None:
136+
@workflow.defn(name="ping-workflow")
137+
class PingWorkflow:
138+
def __init__(self) -> None:
139+
self.pings: int = 0
140+
141+
@workflow.signal("ping")
142+
def on_ping(self) -> None:
143+
self.pings += 1
144+
145+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
146+
yield ctx.schedule_activity("step", [])
147+
return self.pings
148+
149+
events = [
150+
_signal_received_event("ping", []),
151+
_signal_received_event("ping", []),
152+
_activity_completed_event(None),
153+
]
154+
155+
outcome = replay(PingWorkflow, events, [])
156+
157+
assert isinstance(outcome.commands[0], CompleteWorkflow)
158+
assert outcome.commands[0].result == 2

0 commit comments

Comments
 (0)