Skip to content

Commit 27445df

Browse files
Parse external task input fixture artifacts
Parse external task input fixtures Add typed parsing helpers and fixture coverage for external task input envelopes. Tests: pytest -m "not integration"; ruff check src tests; mypy src/durable_workflow Fixture comparison: external task input fixtures match server contract fixtures.
1 parent ff855c0 commit 27445df

8 files changed

Lines changed: 554 additions & 0 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,17 @@ manifests from `GET /api/cluster/info`:
395395
- `control_plane.version: "2"`
396396
- `control_plane.request_contract.schema: durable-workflow.v2.control-plane-request.contract` version `1`
397397
- `worker_protocol.version: "1.0"`
398+
- `worker_protocol.external_task_input_contract.schema: durable-workflow.v2.external-task-input.contract` version `1`
398399

399400
The top-level server `version` is build identity only. The worker checks these
400401
protocol manifests at startup and fails closed when compatibility is missing,
401402
unknown, or undiscoverable.
402403

404+
External task carriers can validate fixture artifacts from
405+
`worker_protocol.external_task_input_contract.fixtures` with
406+
`parse_external_task_input_artifact()` and parse leased task envelopes with
407+
`parse_external_task_input()`.
408+
403409
## Development
404410

405411
```bash
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# External Task Input
2+
3+
::: durable_workflow.external_task_input

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ nav:
7171
- Retry policy: reference/retry_policy.md
7272
- Serializer: reference/serializer.md
7373
- External storage: reference/external_storage.md
74+
- External task input: reference/external_task_input.md
7475
- Metrics: reference/metrics.md
7576
- Sync helpers: reference/sync.md
7677
- Testing: reference/testing.md

src/durable_workflow/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@
5353
ExternalStorageDriver,
5454
LocalFilesystemExternalStorage,
5555
)
56+
from .external_task_input import (
57+
EXTERNAL_TASK_INPUT_CONTRACT_SCHEMA,
58+
EXTERNAL_TASK_INPUT_MEDIA_TYPE,
59+
EXTERNAL_TASK_INPUT_SCHEMA,
60+
EXTERNAL_TASK_INPUT_VERSION,
61+
ExternalTaskInput,
62+
ExternalTaskInputError,
63+
ExternalTaskInputIdentity,
64+
ExternalTaskLease,
65+
ExternalTaskWorkflowContext,
66+
parse_external_task_input,
67+
parse_external_task_input_artifact,
68+
)
5669
from .external_task_result import (
5770
EXTERNAL_TASK_RESULT_SCHEMA,
5871
EXTERNAL_TASK_RESULT_VERSION,
@@ -144,6 +157,11 @@
144157
"ExternalPayloadIntegrityError",
145158
"ExternalPayloadReference",
146159
"ExternalStorageDriver",
160+
"ExternalTaskInput",
161+
"ExternalTaskInputError",
162+
"ExternalTaskInputIdentity",
163+
"ExternalTaskLease",
164+
"ExternalTaskWorkflowContext",
147165
"ExternalTaskFailure",
148166
"ExternalTaskIdentity",
149167
"ExternalTaskResult",
@@ -171,9 +189,15 @@
171189
"WorkflowFailed",
172190
"WorkflowNotFound",
173191
"WorkflowTerminated",
192+
"EXTERNAL_TASK_INPUT_CONTRACT_SCHEMA",
193+
"EXTERNAL_TASK_INPUT_MEDIA_TYPE",
194+
"EXTERNAL_TASK_INPUT_SCHEMA",
195+
"EXTERNAL_TASK_INPUT_VERSION",
174196
"EXTERNAL_TASK_RESULT_SCHEMA",
175197
"EXTERNAL_TASK_RESULT_VERSION",
176198
"external_storage_envelope",
199+
"parse_external_task_input",
200+
"parse_external_task_input_artifact",
177201
"parse_external_task_result",
178202
"to_avro_payload_value",
179203
"to_avro_payload_values",
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
from __future__ import annotations
2+
3+
import hashlib
4+
import json
5+
from collections.abc import Mapping, Sequence
6+
from dataclasses import dataclass
7+
from typing import Any, Literal, cast
8+
9+
EXTERNAL_TASK_INPUT_SCHEMA = "durable-workflow.v2.external-task-input"
10+
EXTERNAL_TASK_INPUT_CONTRACT_SCHEMA = "durable-workflow.v2.external-task-input.contract"
11+
EXTERNAL_TASK_INPUT_MEDIA_TYPE = "application/vnd.durable-workflow.external-task-input+json"
12+
EXTERNAL_TASK_INPUT_VERSION = 1
13+
14+
ExternalTaskInputKind = Literal["activity_task", "workflow_task"]
15+
16+
17+
@dataclass(frozen=True)
18+
class ExternalTaskInputError(ValueError):
19+
"""Raised when a leased external task input envelope violates the v1 contract."""
20+
21+
message: str
22+
23+
def __str__(self) -> str:
24+
return self.message
25+
26+
27+
@dataclass(frozen=True)
28+
class ExternalTaskInputIdentity:
29+
id: str
30+
kind: ExternalTaskInputKind
31+
attempt: int
32+
task_queue: str
33+
handler: str | None
34+
connection: str | None
35+
idempotency_key: str
36+
activity_attempt_id: str | None = None
37+
compatibility: str | None = None
38+
39+
40+
@dataclass(frozen=True)
41+
class ExternalTaskWorkflowContext:
42+
id: str
43+
run_id: str
44+
status: str | None = None
45+
resume: Mapping[str, Any] | None = None
46+
47+
48+
@dataclass(frozen=True)
49+
class ExternalTaskLease:
50+
owner: str
51+
expires_at: str
52+
heartbeat_endpoint: str
53+
54+
55+
@dataclass(frozen=True)
56+
class ExternalTaskInput:
57+
kind: ExternalTaskInputKind
58+
task: ExternalTaskInputIdentity
59+
workflow: ExternalTaskWorkflowContext
60+
lease: ExternalTaskLease
61+
payloads: Mapping[str, Any]
62+
headers: Mapping[str, Any]
63+
deadlines: Mapping[str, Any] | None = None
64+
history: Mapping[str, Any] | None = None
65+
66+
@property
67+
def is_activity_task(self) -> bool:
68+
return self.kind == "activity_task"
69+
70+
@property
71+
def is_workflow_task(self) -> bool:
72+
return self.kind == "workflow_task"
73+
74+
75+
def parse_external_task_input(envelope: Mapping[str, Any]) -> ExternalTaskInput:
76+
"""Parse and validate a v1 carrier-neutral external task input envelope.
77+
78+
Unknown optional fields are ignored, matching the server contract's additive
79+
field policy. Required top-level and known nested fields are enforced so an
80+
SDK carrier can fail closed before invoking user handlers.
81+
"""
82+
83+
_require_value(envelope, "schema", EXTERNAL_TASK_INPUT_SCHEMA)
84+
_require_value(envelope, "version", EXTERNAL_TASK_INPUT_VERSION)
85+
86+
task_fields = _require_mapping(envelope, "task")
87+
kind = _require_task_kind(task_fields, "kind")
88+
task = _parse_task(task_fields, kind)
89+
workflow = _parse_workflow(_require_mapping(envelope, "workflow"), kind)
90+
lease = _parse_lease(_require_mapping(envelope, "lease"))
91+
payloads = _parse_payloads(_require_mapping(envelope, "payloads"))
92+
headers = _require_mapping(envelope, "headers")
93+
94+
if kind == "activity_task":
95+
return ExternalTaskInput(
96+
kind=kind,
97+
task=task,
98+
workflow=workflow,
99+
lease=lease,
100+
payloads=payloads,
101+
deadlines=_parse_deadlines(_require_mapping(envelope, "deadlines")),
102+
headers=headers,
103+
)
104+
105+
return ExternalTaskInput(
106+
kind=kind,
107+
task=task,
108+
workflow=workflow,
109+
lease=lease,
110+
payloads=payloads,
111+
history=_parse_history(_require_mapping(envelope, "history")),
112+
headers=headers,
113+
)
114+
115+
116+
def parse_external_task_input_artifact(artifact: Mapping[str, Any]) -> ExternalTaskInput:
117+
"""Validate a cluster-info fixture artifact and parse its embedded example."""
118+
119+
artifact_name = _require_str(artifact, "artifact")
120+
if not artifact_name.startswith("durable-workflow.v2.external-task-input."):
121+
raise ExternalTaskInputError(f"Unsupported external task input artifact [{artifact_name}].")
122+
123+
_require_value(artifact, "media_type", EXTERNAL_TASK_INPUT_MEDIA_TYPE)
124+
_require_value(artifact, "schema", EXTERNAL_TASK_INPUT_SCHEMA)
125+
_require_value(artifact, "version", EXTERNAL_TASK_INPUT_VERSION)
126+
127+
example = _require_mapping(artifact, "example")
128+
expected_sha = _require_str(artifact, "sha256")
129+
actual_sha = _sha256_json(example)
130+
if actual_sha != expected_sha:
131+
raise ExternalTaskInputError(
132+
f"External task input artifact [{artifact_name}] sha256 mismatch: "
133+
f"expected {expected_sha}, got {actual_sha}."
134+
)
135+
136+
return parse_external_task_input(example)
137+
138+
139+
def _parse_task(task: Mapping[str, Any], kind: ExternalTaskInputKind) -> ExternalTaskInputIdentity:
140+
attempt = _require_int(task, "attempt")
141+
if attempt < 1:
142+
raise ExternalTaskInputError("External task input task.attempt must be >= 1.")
143+
144+
handler: str | None
145+
compatibility: str | None
146+
activity_attempt_id: str | None
147+
148+
if kind == "activity_task":
149+
activity_attempt_id = _require_str(task, "activity_attempt_id")
150+
handler = _require_str(task, "handler")
151+
compatibility = None
152+
else:
153+
activity_attempt_id = None
154+
handler = _require_optional_str(task, "handler")
155+
compatibility = _require_optional_str(task, "compatibility")
156+
157+
return ExternalTaskInputIdentity(
158+
id=_require_str(task, "id"),
159+
kind=kind,
160+
attempt=attempt,
161+
activity_attempt_id=activity_attempt_id,
162+
task_queue=_require_str(task, "task_queue"),
163+
handler=handler,
164+
connection=_require_optional_str(task, "connection"),
165+
compatibility=compatibility,
166+
idempotency_key=_require_str(task, "idempotency_key"),
167+
)
168+
169+
170+
def _parse_workflow(workflow: Mapping[str, Any], kind: ExternalTaskInputKind) -> ExternalTaskWorkflowContext:
171+
if kind == "workflow_task":
172+
status = _require_optional_str(workflow, "status")
173+
resume = _require_mapping(workflow, "resume")
174+
else:
175+
status = None
176+
resume = None
177+
178+
return ExternalTaskWorkflowContext(
179+
id=_require_str(workflow, "id"),
180+
run_id=_require_str(workflow, "run_id"),
181+
status=status,
182+
resume=resume,
183+
)
184+
185+
186+
def _parse_lease(lease: Mapping[str, Any]) -> ExternalTaskLease:
187+
return ExternalTaskLease(
188+
owner=_require_str(lease, "owner"),
189+
expires_at=_require_str(lease, "expires_at"),
190+
heartbeat_endpoint=_require_str(lease, "heartbeat_endpoint"),
191+
)
192+
193+
194+
def _parse_payloads(payloads: Mapping[str, Any]) -> Mapping[str, Any]:
195+
_require_nullable_mapping(payloads, "arguments")
196+
return payloads
197+
198+
199+
def _parse_deadlines(deadlines: Mapping[str, Any]) -> Mapping[str, Any]:
200+
for key in ("schedule_to_start", "start_to_close", "schedule_to_close", "heartbeat"):
201+
_require_optional_str(deadlines, key)
202+
return deadlines
203+
204+
205+
def _parse_history(history: Mapping[str, Any]) -> Mapping[str, Any]:
206+
_require_sequence(history, "events")
207+
_require_int(history, "last_sequence")
208+
_require_optional_str(history, "next_page_token")
209+
_require_optional_str(history, "encoding")
210+
return history
211+
212+
213+
def _sha256_json(value: Mapping[str, Any]) -> str:
214+
encoded = json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
215+
return hashlib.sha256(encoded).hexdigest()
216+
217+
218+
def _require_mapping(value: Mapping[str, Any], key: str) -> Mapping[str, Any]:
219+
if key not in value:
220+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
221+
item = value[key]
222+
if not isinstance(item, Mapping):
223+
raise ExternalTaskInputError(f"External task input field [{key}] must be an object.")
224+
return item
225+
226+
227+
def _require_nullable_mapping(value: Mapping[str, Any], key: str) -> Mapping[str, Any] | None:
228+
if key not in value:
229+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
230+
item = value[key]
231+
if item is None:
232+
return None
233+
if not isinstance(item, Mapping):
234+
raise ExternalTaskInputError(f"External task input field [{key}] must be an object or null.")
235+
return item
236+
237+
238+
def _require_sequence(value: Mapping[str, Any], key: str) -> Sequence[Any]:
239+
if key not in value:
240+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
241+
item = value[key]
242+
if not isinstance(item, Sequence) or isinstance(item, (str, bytes, bytearray)):
243+
raise ExternalTaskInputError(f"External task input field [{key}] must be an array.")
244+
return item
245+
246+
247+
def _require_task_kind(value: Mapping[str, Any], key: str) -> ExternalTaskInputKind:
248+
kind = _require_str(value, key)
249+
if kind == "activity_task" or kind == "workflow_task":
250+
return cast(ExternalTaskInputKind, kind)
251+
raise ExternalTaskInputError(f"Unsupported external task input kind [{kind}].")
252+
253+
254+
def _require_str(value: Mapping[str, Any], key: str) -> str:
255+
if key not in value:
256+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
257+
item = value[key]
258+
if not isinstance(item, str):
259+
raise ExternalTaskInputError(f"External task input field [{key}] must be a string.")
260+
return item
261+
262+
263+
def _require_optional_str(value: Mapping[str, Any], key: str) -> str | None:
264+
if key not in value:
265+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
266+
item = value[key]
267+
if item is None:
268+
return None
269+
if not isinstance(item, str):
270+
raise ExternalTaskInputError(f"External task input field [{key}] must be a string or null.")
271+
return item
272+
273+
274+
def _require_int(value: Mapping[str, Any], key: str) -> int:
275+
if key not in value:
276+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
277+
item = value[key]
278+
if not isinstance(item, int) or isinstance(item, bool):
279+
raise ExternalTaskInputError(f"External task input field [{key}] must be an integer.")
280+
return item
281+
282+
283+
def _require_value(value: Mapping[str, Any], key: str, expected: object) -> None:
284+
if key not in value:
285+
raise ExternalTaskInputError(f"External task input is missing required field [{key}].")
286+
if value[key] != expected:
287+
raise ExternalTaskInputError(
288+
f"External task input field [{key}] must be {expected!r}; got {value[key]!r}."
289+
)

0 commit comments

Comments
 (0)