Skip to content

Commit fae4fa9

Browse files
Add SDK external payload reference support
Add SDK external payload reference support
1 parent 8a932ba commit fae4fa9

8 files changed

Lines changed: 397 additions & 3 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ failures surfaced as workflow failure commands.
205205
- **Polyglot**: Works alongside PHP workers on the same task queue
206206
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
207207
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
208+
- **External payload references**: opt-in reference envelopes and a local filesystem driver for large-payload offload experiments
208209
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, schedule, signal, update, query, or search-attribute payloads reach the server
209210
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
210211
- **Deterministic workflow helpers**: `ctx.now()`, `ctx.random()`, `ctx.uuid4()`, and `ctx.uuid7()` replay from workflow state

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pip install 'durable-workflow[prometheus]'
2424
- **[Retry policy](reference/retry_policy.md)** — HTTP transport retry configuration for the client. Durable activity and child workflow retry policies are workflow primitives, not transport settings.
2525
- **[Metrics](reference/metrics.md)** — pluggable recorders, including a Prometheus adapter.
2626
- **[Serializer](reference/serializer.md)** — payload encoding and decoding helpers.
27+
- **[External storage](reference/external_storage.md)** — reference envelopes and driver contracts for large-payload offload.
2728
- **[Sync helpers](reference/sync.md)** — blocking wrappers around the async client for scripts and tests.
2829

2930
## Versioning

docs/reference/external_storage.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# External Storage
2+
3+
::: durable_workflow.external_storage

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ nav:
7070
- Errors: reference/errors.md
7171
- Retry policy: reference/retry_policy.md
7272
- Serializer: reference/serializer.md
73+
- External storage: reference/external_storage.md
7374
- Metrics: reference/metrics.md
7475
- Sync helpers: reference/sync.md
7576
- Testing: reference/testing.md

src/durable_workflow/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@
4646
WorkflowPayloadDecodeError,
4747
WorkflowTerminated,
4848
)
49+
from .external_storage import (
50+
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA,
51+
ExternalPayloadIntegrityError,
52+
ExternalPayloadReference,
53+
ExternalStorageDriver,
54+
LocalFilesystemExternalStorage,
55+
)
4956
from .interceptors import (
5057
ActivityHandler,
5158
ActivityInterceptorContext,
@@ -66,6 +73,7 @@
6673
from .serializer import (
6774
PayloadSizeWarningConfig,
6875
PayloadSizeWarningContext,
76+
external_storage_envelope,
6977
to_avro_payload_value,
7078
to_avro_payload_values,
7179
)
@@ -123,8 +131,13 @@
123131
"testing",
124132
"workflow",
125133
"DurableWorkflowError",
134+
"EXTERNAL_PAYLOAD_REFERENCE_SCHEMA",
135+
"ExternalPayloadIntegrityError",
136+
"ExternalPayloadReference",
137+
"ExternalStorageDriver",
126138
"InvalidArgument",
127139
"InMemoryMetrics",
140+
"LocalFilesystemExternalStorage",
128141
"MetricsRecorder",
129142
"NamespaceNotFound",
130143
"NoopMetrics",
@@ -145,6 +158,7 @@
145158
"WorkflowFailed",
146159
"WorkflowNotFound",
147160
"WorkflowTerminated",
161+
"external_storage_envelope",
148162
"to_avro_payload_value",
149163
"to_avro_payload_values",
150164
"replay",
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""External payload storage contracts for large Durable Workflow payloads."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
from dataclasses import dataclass
7+
from pathlib import Path
8+
from typing import Protocol
9+
from urllib.parse import unquote, urlparse
10+
11+
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA = "durable-workflow.v2.external-payload-reference.v1"
12+
13+
14+
class ExternalPayloadIntegrityError(ValueError):
15+
"""Raised when fetched external payload bytes do not match their reference."""
16+
17+
18+
class ExternalStorageDriver(Protocol):
19+
"""Protocol implemented by pluggable external payload storage drivers."""
20+
21+
def put(self, data: bytes, *, sha256: str, codec: str) -> str:
22+
"""Persist *data* and return a stable URI for later fetches."""
23+
24+
def get(self, uri: str) -> bytes:
25+
"""Fetch previously persisted payload bytes."""
26+
27+
def delete(self, uri: str) -> None:
28+
"""Delete previously persisted payload bytes when retention removes a run."""
29+
30+
31+
@dataclass(frozen=True)
32+
class ExternalPayloadReference:
33+
"""Stable wire envelope for a payload stored outside workflow history."""
34+
35+
uri: str
36+
sha256: str
37+
size_bytes: int
38+
codec: str
39+
schema: str = EXTERNAL_PAYLOAD_REFERENCE_SCHEMA
40+
41+
def to_dict(self) -> dict[str, str | int]:
42+
return {
43+
"schema": self.schema,
44+
"uri": self.uri,
45+
"sha256": self.sha256,
46+
"size_bytes": self.size_bytes,
47+
"codec": self.codec,
48+
}
49+
50+
@classmethod
51+
def from_dict(cls, data: object) -> ExternalPayloadReference:
52+
if not isinstance(data, dict):
53+
raise ValueError("external payload reference must be an object")
54+
55+
schema = data.get("schema")
56+
uri = data.get("uri")
57+
sha256 = data.get("sha256")
58+
size_bytes = data.get("size_bytes")
59+
codec = data.get("codec")
60+
61+
if schema != EXTERNAL_PAYLOAD_REFERENCE_SCHEMA:
62+
raise ValueError("unsupported external payload reference schema")
63+
if not isinstance(uri, str) or not uri:
64+
raise ValueError("external payload reference uri must be a non-empty string")
65+
if not isinstance(sha256, str) or len(sha256) != 64:
66+
raise ValueError("external payload reference sha256 must be a hex digest")
67+
try:
68+
int(sha256, 16)
69+
except ValueError as exc:
70+
raise ValueError("external payload reference sha256 must be a hex digest") from exc
71+
if not isinstance(size_bytes, int) or size_bytes < 0:
72+
raise ValueError("external payload reference size_bytes must be a non-negative integer")
73+
if not isinstance(codec, str) or not codec:
74+
raise ValueError("external payload reference codec must be a non-empty string")
75+
76+
return cls(uri=uri, sha256=sha256, size_bytes=size_bytes, codec=codec, schema=schema)
77+
78+
79+
class LocalFilesystemExternalStorage:
80+
"""Dependency-free external storage driver for development and tests."""
81+
82+
def __init__(self, root: str | Path) -> None:
83+
self.root = Path(root).resolve()
84+
self.root.mkdir(parents=True, exist_ok=True)
85+
86+
def put(self, data: bytes, *, sha256: str, codec: str) -> str:
87+
_validate_sha256(sha256)
88+
codec_segment = _safe_codec_segment(codec)
89+
path = self.root / codec_segment / sha256[:2] / sha256
90+
path.parent.mkdir(parents=True, exist_ok=True)
91+
if not path.exists():
92+
path.write_bytes(data)
93+
return path.as_uri()
94+
95+
def get(self, uri: str) -> bytes:
96+
path = self._path_from_uri(uri)
97+
return path.read_bytes()
98+
99+
def delete(self, uri: str) -> None:
100+
path = self._path_from_uri(uri)
101+
try:
102+
path.unlink()
103+
except FileNotFoundError:
104+
return
105+
106+
def _path_from_uri(self, uri: str) -> Path:
107+
parsed = urlparse(uri)
108+
if parsed.scheme != "file" or parsed.netloc not in {"", "localhost"}:
109+
raise ValueError("local external storage can only read file:// URIs")
110+
111+
path = Path(unquote(parsed.path)).resolve()
112+
try:
113+
path.relative_to(self.root)
114+
except ValueError as exc:
115+
raise ValueError("external payload URI is outside the local storage root") from exc
116+
return path
117+
118+
119+
def store_external_payload(
120+
driver: ExternalStorageDriver,
121+
data: bytes,
122+
*,
123+
codec: str,
124+
) -> ExternalPayloadReference:
125+
"""Store encoded payload bytes and return their reference envelope."""
126+
sha256 = hashlib.sha256(data).hexdigest()
127+
uri = driver.put(data, sha256=sha256, codec=codec)
128+
return ExternalPayloadReference(
129+
uri=uri,
130+
sha256=sha256,
131+
size_bytes=len(data),
132+
codec=codec,
133+
)
134+
135+
136+
def fetch_external_payload(
137+
driver: ExternalStorageDriver,
138+
reference: ExternalPayloadReference,
139+
) -> bytes:
140+
"""Fetch payload bytes and verify size/hash before replay or decode."""
141+
data = driver.get(reference.uri)
142+
if len(data) != reference.size_bytes:
143+
raise ExternalPayloadIntegrityError("external payload size does not match its reference")
144+
145+
actual_sha256 = hashlib.sha256(data).hexdigest()
146+
if actual_sha256 != reference.sha256:
147+
raise ExternalPayloadIntegrityError("external payload hash does not match its reference")
148+
return data
149+
150+
151+
def _validate_sha256(sha256: str) -> None:
152+
if len(sha256) != 64:
153+
raise ValueError("sha256 must be a hex digest")
154+
try:
155+
int(sha256, 16)
156+
except ValueError as exc:
157+
raise ValueError("sha256 must be a hex digest") from exc
158+
159+
160+
def _safe_codec_segment(codec: str) -> str:
161+
if not codec:
162+
raise ValueError("codec must be a non-empty string")
163+
if not all(char.isalnum() or char in {"-", "_", "."} for char in codec):
164+
raise ValueError("codec contains characters that are unsafe for local storage paths")
165+
return codec

src/durable_workflow/serializer.py

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@
2929
from uuid import UUID
3030

3131
from . import _avro
32+
from .external_storage import (
33+
ExternalPayloadReference,
34+
ExternalStorageDriver,
35+
fetch_external_payload,
36+
store_external_payload,
37+
)
3238

3339
JSON_CODEC = "json"
3440
AVRO_CODEC = "avro"
@@ -269,6 +275,42 @@ def envelope(
269275
}
270276

271277

278+
def external_storage_envelope(
279+
value: Any,
280+
*,
281+
external_storage: ExternalStorageDriver,
282+
threshold_bytes: int,
283+
codec: str = AVRO_CODEC,
284+
size_warning: PayloadSizeWarningConfig | None = DEFAULT_PAYLOAD_SIZE_WARNING,
285+
warning_context: PayloadSizeWarningContext | Mapping[str, Any] | None = None,
286+
) -> dict[str, Any]:
287+
"""Wrap a value in a normal envelope or an external-payload reference.
288+
289+
Payloads whose encoded UTF-8 size is greater than *threshold_bytes* are
290+
stored through *external_storage* and represented as ``{codec,
291+
external_storage}`` so workflow history carries a stable reference instead
292+
of the large payload bytes.
293+
"""
294+
if threshold_bytes < 1:
295+
raise ValueError("external storage threshold must be at least 1 byte")
296+
297+
blob = encode(
298+
value,
299+
codec=codec,
300+
size_warning=size_warning,
301+
warning_context=warning_context,
302+
)
303+
data = blob.encode("utf-8")
304+
if len(data) <= threshold_bytes:
305+
return {"codec": codec, "blob": blob}
306+
307+
reference = store_external_payload(external_storage, data, codec=codec)
308+
return {
309+
"codec": codec,
310+
"external_storage": reference.to_dict(),
311+
}
312+
313+
272314
def envelope_many(
273315
values: Sequence[Any],
274316
codec: str = AVRO_CODEC,
@@ -376,20 +418,48 @@ def _is_context_sequence(
376418
)
377419

378420

379-
def decode_envelope(value: Any, codec: str | None = None) -> Any:
421+
def decode_envelope(
422+
value: Any,
423+
codec: str | None = None,
424+
*,
425+
external_storage: ExternalStorageDriver | None = None,
426+
) -> Any:
380427
"""Decode a value that may be a ``{codec, blob}`` envelope or a raw blob.
381428
382429
When *value* is an envelope, its inner ``codec`` takes precedence over
383430
the *codec* argument. When *value* is a raw blob, *codec* selects the
384-
decoder (defaulting to JSON).
431+
decoder (defaulting to JSON). External payload references require an
432+
*external_storage* driver and verify size/hash before decode.
385433
"""
386434
if isinstance(value, dict) and "codec" in value and "blob" in value:
387435
return decode(value["blob"], codec=value["codec"])
436+
if isinstance(value, dict) and "external_storage" in value:
437+
if external_storage is None:
438+
raise ValueError("external payload reference requires an external storage driver")
439+
reference = ExternalPayloadReference.from_dict(value["external_storage"])
440+
envelope_codec = value.get("codec")
441+
if envelope_codec is not None and envelope_codec != reference.codec:
442+
raise ValueError("external payload reference codec does not match envelope codec")
443+
blob = fetch_external_payload(external_storage, reference).decode("utf-8")
444+
return decode(blob, codec=reference.codec)
388445
return decode(value, codec=codec)
389446

390447

391-
def decode_envelopes(values: Sequence[Any], codec: str | None = None) -> list[Any]:
448+
def decode_envelopes(
449+
values: Sequence[Any],
450+
codec: str | None = None,
451+
*,
452+
external_storage: ExternalStorageDriver | None = None,
453+
) -> list[Any]:
392454
"""Decode several raw blobs or ``{codec, blob}`` envelopes in order."""
455+
if external_storage is not None or any(
456+
isinstance(value, dict) and "external_storage" in value for value in values
457+
):
458+
return [
459+
decode_envelope(value, codec=codec, external_storage=external_storage)
460+
for value in values
461+
]
462+
393463
jobs: list[tuple[str | None, str | None]] = []
394464
passthroughs: dict[int, Any] = {}
395465
for index, value in enumerate(values):

0 commit comments

Comments
 (0)