Skip to content

Commit ff855c0

Browse files
Parse external task result envelopes
1 parent 76df9be commit ff855c0

6 files changed

Lines changed: 488 additions & 0 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@
5353
ExternalStorageDriver,
5454
LocalFilesystemExternalStorage,
5555
)
56+
from .external_task_result import (
57+
EXTERNAL_TASK_RESULT_SCHEMA,
58+
EXTERNAL_TASK_RESULT_VERSION,
59+
ExternalTaskFailure,
60+
ExternalTaskIdentity,
61+
ExternalTaskResult,
62+
ExternalTaskResultError,
63+
parse_external_task_result,
64+
)
5665
from .interceptors import (
5766
ActivityHandler,
5867
ActivityInterceptorContext,
@@ -135,6 +144,10 @@
135144
"ExternalPayloadIntegrityError",
136145
"ExternalPayloadReference",
137146
"ExternalStorageDriver",
147+
"ExternalTaskFailure",
148+
"ExternalTaskIdentity",
149+
"ExternalTaskResult",
150+
"ExternalTaskResultError",
138151
"InvalidArgument",
139152
"InMemoryMetrics",
140153
"LocalFilesystemExternalStorage",
@@ -158,7 +171,10 @@
158171
"WorkflowFailed",
159172
"WorkflowNotFound",
160173
"WorkflowTerminated",
174+
"EXTERNAL_TASK_RESULT_SCHEMA",
175+
"EXTERNAL_TASK_RESULT_VERSION",
161176
"external_storage_envelope",
177+
"parse_external_task_result",
162178
"to_avro_payload_value",
163179
"to_avro_payload_values",
164180
"replay",
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
from __future__ import annotations
2+
3+
from collections.abc import Mapping
4+
from dataclasses import dataclass
5+
from typing import Any, Literal
6+
7+
EXTERNAL_TASK_RESULT_SCHEMA = "durable-workflow.v2.external-task-result"
8+
EXTERNAL_TASK_RESULT_VERSION = 1
9+
10+
ExternalTaskResultKind = Literal["success", "failure", "malformed_output"]
11+
12+
_FAILURE_KINDS = {
13+
"application",
14+
"timeout",
15+
"cancellation",
16+
"malformed_output",
17+
"handler_crash",
18+
"decode_failure",
19+
"unsupported_payload",
20+
}
21+
22+
_FAILURE_CLASSIFICATIONS = {
23+
"application_error",
24+
"timeout",
25+
"cancelled",
26+
"deadline_exceeded",
27+
"handler_crash",
28+
"decode_failure",
29+
"malformed_output",
30+
"unsupported_payload_codec",
31+
"unsupported_payload_reference",
32+
}
33+
34+
35+
@dataclass(frozen=True)
36+
class ExternalTaskResultError(ValueError):
37+
"""Raised when a handler result envelope violates the v1 result contract."""
38+
39+
message: str
40+
41+
def __str__(self) -> str:
42+
return self.message
43+
44+
45+
@dataclass(frozen=True)
46+
class ExternalTaskIdentity:
47+
id: str
48+
kind: str
49+
attempt: int
50+
idempotency_key: str
51+
52+
53+
@dataclass(frozen=True)
54+
class ExternalTaskFailure:
55+
kind: str
56+
classification: str
57+
message: str
58+
type: str | None
59+
stack_trace: str | None
60+
timeout_type: str | None
61+
cancelled: bool
62+
details: Mapping[str, Any] | None
63+
64+
@property
65+
def deadline_exceeded(self) -> bool:
66+
return self.classification == "deadline_exceeded" or self.timeout_type == "deadline_exceeded"
67+
68+
@property
69+
def handler_crash(self) -> bool:
70+
return self.classification == "handler_crash" or self.kind == "handler_crash"
71+
72+
@property
73+
def decode_failure(self) -> bool:
74+
return self.classification == "decode_failure" or self.kind == "decode_failure"
75+
76+
@property
77+
def unsupported_payload(self) -> bool:
78+
return self.kind == "unsupported_payload" or self.classification in {
79+
"unsupported_payload_codec",
80+
"unsupported_payload_reference",
81+
}
82+
83+
84+
@dataclass(frozen=True)
85+
class ExternalTaskResult:
86+
kind: ExternalTaskResultKind
87+
task: ExternalTaskIdentity
88+
recorded: bool
89+
metadata: Mapping[str, Any]
90+
result: Mapping[str, Any] | None = None
91+
failure: ExternalTaskFailure | None = None
92+
raw_output: Mapping[str, Any] | None = None
93+
retryable: bool = False
94+
95+
@property
96+
def succeeded(self) -> bool:
97+
return self.kind == "success"
98+
99+
@property
100+
def failed(self) -> bool:
101+
return self.kind in {"failure", "malformed_output"}
102+
103+
@property
104+
def malformed_output(self) -> bool:
105+
return self.kind == "malformed_output"
106+
107+
@property
108+
def cancelled(self) -> bool:
109+
return self.failure.cancelled if self.failure is not None else False
110+
111+
112+
def parse_external_task_result(envelope: Mapping[str, Any]) -> ExternalTaskResult:
113+
"""Parse and validate a v1 external task result envelope.
114+
115+
The parser follows the server contract's additive-field policy: unknown
116+
optional fields are ignored, while required fields and known enum values are
117+
enforced so carriers can make stable retryability and failure decisions.
118+
"""
119+
120+
_require_value(envelope, "schema", EXTERNAL_TASK_RESULT_SCHEMA)
121+
_require_value(envelope, "version", EXTERNAL_TASK_RESULT_VERSION)
122+
123+
outcome = _require_mapping(envelope, "outcome")
124+
status = _require_str(outcome, "status")
125+
recorded = _require_bool(outcome, "recorded")
126+
task = _parse_task(_require_mapping(envelope, "task"))
127+
metadata = _require_mapping(envelope, "metadata")
128+
129+
if status == "succeeded":
130+
return ExternalTaskResult(
131+
kind="success",
132+
task=task,
133+
recorded=recorded,
134+
metadata=metadata,
135+
result=_require_nullable_mapping(envelope, "result"),
136+
)
137+
138+
if status != "failed":
139+
raise ExternalTaskResultError(f"Unsupported external task result status [{status}].")
140+
141+
failure = _parse_failure(_require_mapping(envelope, "failure"))
142+
retryable = _require_bool(outcome, "retryable")
143+
if failure.kind == "malformed_output" or failure.classification == "malformed_output" or "raw_output" in envelope:
144+
return ExternalTaskResult(
145+
kind="malformed_output",
146+
task=task,
147+
recorded=recorded,
148+
metadata=metadata,
149+
failure=failure,
150+
raw_output=_require_mapping(envelope, "raw_output"),
151+
retryable=retryable,
152+
)
153+
154+
return ExternalTaskResult(
155+
kind="failure",
156+
task=task,
157+
recorded=recorded,
158+
metadata=metadata,
159+
failure=failure,
160+
retryable=retryable,
161+
)
162+
163+
164+
def _parse_task(task: Mapping[str, Any]) -> ExternalTaskIdentity:
165+
attempt = _require_int(task, "attempt")
166+
if attempt < 1:
167+
raise ExternalTaskResultError("External task result task.attempt must be >= 1.")
168+
169+
return ExternalTaskIdentity(
170+
id=_require_str(task, "id"),
171+
kind=_require_str(task, "kind"),
172+
attempt=attempt,
173+
idempotency_key=_require_str(task, "idempotency_key"),
174+
)
175+
176+
177+
def _parse_failure(failure: Mapping[str, Any]) -> ExternalTaskFailure:
178+
kind = _require_str(failure, "kind")
179+
if kind not in _FAILURE_KINDS:
180+
raise ExternalTaskResultError(f"Unsupported external task failure kind [{kind}].")
181+
182+
classification = _require_str(failure, "classification")
183+
if classification not in _FAILURE_CLASSIFICATIONS:
184+
raise ExternalTaskResultError(
185+
f"Unsupported external task failure classification [{classification}]."
186+
)
187+
188+
cancelled = _require_bool(failure, "cancelled")
189+
if classification == "cancelled" and not cancelled:
190+
raise ExternalTaskResultError("Cancelled external task failures must set failure.cancelled=true.")
191+
192+
return ExternalTaskFailure(
193+
kind=kind,
194+
classification=classification,
195+
message=_require_str(failure, "message"),
196+
type=_require_optional_str(failure, "type"),
197+
stack_trace=_require_optional_str(failure, "stack_trace"),
198+
timeout_type=_require_optional_str(failure, "timeout_type"),
199+
cancelled=cancelled,
200+
details=_require_nullable_mapping(failure, "details"),
201+
)
202+
203+
204+
def _require_mapping(value: Mapping[str, Any], key: str) -> Mapping[str, Any]:
205+
if key not in value:
206+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
207+
item = value[key]
208+
if not isinstance(item, Mapping):
209+
raise ExternalTaskResultError(f"External task result field [{key}] must be an object.")
210+
return item
211+
212+
213+
def _require_nullable_mapping(value: Mapping[str, Any], key: str) -> Mapping[str, Any] | None:
214+
if key not in value:
215+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
216+
item = value[key]
217+
if item is None:
218+
return None
219+
if not isinstance(item, Mapping):
220+
raise ExternalTaskResultError(f"External task result field [{key}] must be an object or null.")
221+
return item
222+
223+
224+
def _require_str(value: Mapping[str, Any], key: str) -> str:
225+
if key not in value:
226+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
227+
item = value[key]
228+
if not isinstance(item, str):
229+
raise ExternalTaskResultError(f"External task result field [{key}] must be a string.")
230+
return item
231+
232+
233+
def _require_optional_str(value: Mapping[str, Any], key: str) -> str | None:
234+
if key not in value:
235+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
236+
item = value[key]
237+
if item is None:
238+
return None
239+
if not isinstance(item, str):
240+
raise ExternalTaskResultError(f"External task result field [{key}] must be a string or null.")
241+
return item
242+
243+
244+
def _require_bool(value: Mapping[str, Any], key: str) -> bool:
245+
if key not in value:
246+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
247+
item = value[key]
248+
if not isinstance(item, bool):
249+
raise ExternalTaskResultError(f"External task result field [{key}] must be a boolean.")
250+
return item
251+
252+
253+
def _require_int(value: Mapping[str, Any], key: str) -> int:
254+
if key not in value:
255+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
256+
item = value[key]
257+
if not isinstance(item, int) or isinstance(item, bool):
258+
raise ExternalTaskResultError(f"External task result field [{key}] must be an integer.")
259+
return item
260+
261+
262+
def _require_value(value: Mapping[str, Any], key: str, expected: object) -> None:
263+
if key not in value:
264+
raise ExternalTaskResultError(f"External task result is missing required field [{key}].")
265+
if value[key] != expected:
266+
raise ExternalTaskResultError(
267+
f"External task result field [{key}] must be [{expected}], got [{value[key]}]."
268+
)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"schema": "durable-workflow.v2.external-task-result",
3+
"version": 1,
4+
"outcome": {
5+
"status": "failed",
6+
"retryable": true,
7+
"recorded": true
8+
},
9+
"task": {
10+
"id": "acttask_01HV7D3G3G61TAH2YB5RK45XJS",
11+
"kind": "activity_task",
12+
"attempt": 1,
13+
"idempotency_key": "attempt_01HV7D3KJ1C8WQNNY8MVM8J40X"
14+
},
15+
"failure": {
16+
"kind": "timeout",
17+
"classification": "deadline_exceeded",
18+
"message": "Deadline exceeded while waiting for billing provider.",
19+
"type": "ProviderTimeout",
20+
"stack_trace": null,
21+
"timeout_type": "deadline_exceeded",
22+
"cancelled": false,
23+
"details": {
24+
"codec": "avro",
25+
"blob": "BASE64_AVRO_FAILURE_DETAILS"
26+
}
27+
},
28+
"metadata": {
29+
"handler": "billing.charge-card",
30+
"carrier": "process-carrier",
31+
"duration_ms": 5000
32+
}
33+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"schema": "durable-workflow.v2.external-task-result",
3+
"version": 1,
4+
"outcome": {
5+
"status": "failed",
6+
"retryable": false,
7+
"recorded": false
8+
},
9+
"task": {
10+
"id": "acttask_01HV7D3G3G61TAH2YB5RK45XJS",
11+
"kind": "activity_task",
12+
"attempt": 1,
13+
"idempotency_key": "attempt_01HV7D3KJ1C8WQNNY8MVM8J40X"
14+
},
15+
"failure": {
16+
"kind": "malformed_output",
17+
"classification": "malformed_output",
18+
"message": "Handler exited without producing a valid result envelope.",
19+
"type": "MalformedExternalTaskOutput",
20+
"stack_trace": null,
21+
"timeout_type": null,
22+
"cancelled": false,
23+
"details": null
24+
},
25+
"raw_output": {
26+
"stdout_preview": "{not json",
27+
"stderr_preview": "deprecated flag ignored",
28+
"exit_code": 64
29+
},
30+
"metadata": {
31+
"handler": "billing.charge-card",
32+
"carrier": "process-carrier",
33+
"duration_ms": 22
34+
}
35+
}

0 commit comments

Comments
 (0)