Skip to content

Commit 4c6ce2f

Browse files
Resolve external payload storage policies
1 parent fb67941 commit 4c6ce2f

4 files changed

Lines changed: 311 additions & 4 deletions

File tree

README.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,24 @@ payload storage.
215215
Retention cleanup should delete by typed reference rather than by raw URI:
216216
`delete_external_payload(storage, reference, cache=cache)` calls the configured
217217
driver and evicts any verified replay-cache entry for the same reference.
218+
When the server or Cloud API returns an external payload storage policy, use
219+
`ExternalPayloadStoragePolicy.from_dict(...)` plus
220+
`external_storage_driver_from_policy(...)` to turn that control-plane payload
221+
into the matching SDK driver while keeping provider clients application-owned.
218222

219223
```python
220-
from durable_workflow import S3ExternalStorage, serializer
224+
from durable_workflow import (
225+
ExternalPayloadStoragePolicy,
226+
external_storage_driver_from_policy,
227+
serializer,
228+
)
221229

222-
storage = S3ExternalStorage(s3_client, bucket="workflow-payloads", prefix="prod")
230+
policy = ExternalPayloadStoragePolicy.from_dict(namespace_response)
231+
storage = external_storage_driver_from_policy(policy, s3_client=s3_client)
223232
payload = serializer.external_storage_envelope(
224233
{"large": "value"},
225234
external_storage=storage,
226-
threshold_bytes=2 * 1024 * 1024,
235+
threshold_bytes=policy.threshold_bytes or 2 * 1024 * 1024,
227236
)
228237
```
229238

src/durable_workflow/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@
6767
ExternalPayloadCache,
6868
ExternalPayloadIntegrityError,
6969
ExternalPayloadReference,
70+
ExternalPayloadStoragePolicy,
7071
ExternalStorageDriver,
7172
GCSExternalStorage,
7273
LocalFilesystemExternalStorage,
7374
S3ExternalStorage,
7475
delete_external_payload,
76+
external_storage_driver_from_policy,
7577
fetch_external_payload,
7678
store_external_payload,
7779
)
@@ -197,6 +199,7 @@
197199
"ExternalPayloadCache",
198200
"ExternalPayloadIntegrityError",
199201
"ExternalPayloadReference",
202+
"ExternalPayloadStoragePolicy",
200203
"ExternalStorageDriver",
201204
"ExternalTaskInput",
202205
"ExternalTaskInputError",
@@ -244,6 +247,7 @@
244247
"EXTERNAL_TASK_RESULT_VERSION",
245248
"delete_external_payload",
246249
"external_storage_envelope",
250+
"external_storage_driver_from_policy",
247251
"fetch_external_payload",
248252
"parse_external_task_input",
249253
"parse_external_task_input_artifact",

src/durable_workflow/external_storage.py

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
import hashlib
66
from collections import OrderedDict
7-
from dataclasses import dataclass
7+
from collections.abc import Mapping
8+
from dataclasses import dataclass, field
89
from datetime import datetime
910
from pathlib import Path
1011
from typing import Any, Protocol
@@ -30,6 +31,47 @@ def delete(self, uri: str) -> None:
3031
"""Delete previously persisted payload bytes when retention removes a run."""
3132

3233

34+
@dataclass(frozen=True)
35+
class ExternalPayloadStoragePolicy:
36+
"""Normalized external payload storage policy from server or Cloud APIs."""
37+
38+
enabled: bool
39+
driver: str | None = None
40+
threshold_bytes: int | None = None
41+
config: Mapping[str, Any] = field(default_factory=dict)
42+
reference: str | None = None
43+
prefix: str = ""
44+
mode: str | None = None
45+
status: str | None = None
46+
integrity_required: bool = True
47+
48+
@classmethod
49+
def from_dict(cls, data: object) -> ExternalPayloadStoragePolicy:
50+
"""Parse a server namespace or Cloud organization storage policy."""
51+
policy = _extract_policy(data)
52+
driver = _optional_string(policy.get("driver"), "external payload storage driver")
53+
enabled = bool(policy.get("enabled", driver is not None))
54+
threshold_bytes = _optional_positive_int(policy.get("threshold_bytes"), "threshold_bytes")
55+
config = _optional_mapping(policy.get("config"), "config")
56+
prefix = _optional_string(policy.get("prefix"), "prefix") or _optional_string(
57+
config.get("prefix"),
58+
"config.prefix",
59+
)
60+
integrity_required = bool(policy.get("integrity_required", True))
61+
62+
return cls(
63+
enabled=enabled,
64+
driver=driver,
65+
threshold_bytes=threshold_bytes,
66+
config=config,
67+
reference=_optional_string(policy.get("reference"), "reference"),
68+
prefix=prefix or "",
69+
mode=_optional_string(policy.get("mode"), "mode"),
70+
status=_optional_string(policy.get("status"), "status"),
71+
integrity_required=integrity_required,
72+
)
73+
74+
3375
@dataclass(frozen=True)
3476
class ExternalPayloadReference:
3577
"""Stable wire envelope for a payload stored outside workflow history."""
@@ -321,6 +363,70 @@ def delete(self, uri: str) -> None:
321363
self.container_client.delete_blob(key)
322364

323365

366+
def external_storage_driver_from_policy(
367+
policy: ExternalPayloadStoragePolicy | Mapping[str, Any],
368+
*,
369+
s3_client: Any | None = None,
370+
gcs_client: Any | None = None,
371+
azure_container_client: Any | None = None,
372+
local_root: str | Path | None = None,
373+
) -> ExternalStorageDriver:
374+
"""Build an SDK storage driver from a server or Cloud policy payload.
375+
376+
Provider SDK clients remain application-owned. Pass the already-configured
377+
S3/GCS/Azure client that matches the policy returned by the control plane.
378+
"""
379+
normalized = (
380+
policy
381+
if isinstance(policy, ExternalPayloadStoragePolicy)
382+
else ExternalPayloadStoragePolicy.from_dict(policy)
383+
)
384+
if not normalized.enabled:
385+
raise ValueError("external payload storage policy is disabled")
386+
if normalized.driver is None:
387+
raise ValueError("external payload storage policy driver is required")
388+
389+
driver = normalized.driver.lower()
390+
if driver == "local":
391+
root = local_root or _policy_string(normalized, "uri") or normalized.reference
392+
if root is None:
393+
raise ValueError("local external payload storage policy requires config.uri or local_root")
394+
return LocalFilesystemExternalStorage(_local_path(root))
395+
396+
if driver == "s3":
397+
if s3_client is None:
398+
raise ValueError("s3 external payload storage policy requires s3_client")
399+
bucket = _policy_string(normalized, "bucket") or _s3_bucket_from_reference(normalized.reference)
400+
if bucket is None:
401+
raise ValueError("s3 external payload storage policy requires config.bucket or an S3 bucket reference")
402+
return S3ExternalStorage(s3_client, bucket=bucket, prefix=_policy_prefix(normalized))
403+
404+
if driver == "gcs":
405+
if gcs_client is None:
406+
raise ValueError("gcs external payload storage policy requires gcs_client")
407+
bucket = _policy_string(normalized, "bucket") or _gcs_bucket_from_reference(normalized.reference)
408+
if bucket is None:
409+
raise ValueError("gcs external payload storage policy requires config.bucket or a GCS bucket reference")
410+
return GCSExternalStorage(gcs_client, bucket=bucket, prefix=_policy_prefix(normalized))
411+
412+
if driver in {"azure", "azure-blob"}:
413+
if azure_container_client is None:
414+
raise ValueError("azure external payload storage policy requires azure_container_client")
415+
container = (
416+
_policy_string(normalized, "container")
417+
or _policy_string(normalized, "bucket")
418+
or _azure_container_from_reference(normalized.reference)
419+
)
420+
if container is None:
421+
raise ValueError(
422+
"azure external payload storage policy requires config.container, "
423+
"config.bucket, or a container reference"
424+
)
425+
return AzureBlobExternalStorage(azure_container_client, container=container, prefix=_policy_prefix(normalized))
426+
427+
raise ValueError(f"unsupported external payload storage driver {normalized.driver!r}")
428+
429+
324430
def store_external_payload(
325431
driver: ExternalStorageDriver,
326432
data: bytes,
@@ -387,6 +493,96 @@ def _validate_sha256(sha256: str) -> None:
387493
raise ValueError("sha256 must be a hex digest") from exc
388494

389495

496+
def _extract_policy(data: object) -> Mapping[str, Any]:
497+
if not isinstance(data, Mapping):
498+
raise ValueError("external payload storage policy must be an object")
499+
nested = data.get("external_payload_storage")
500+
if nested is not None:
501+
if not isinstance(nested, Mapping):
502+
raise ValueError("external_payload_storage must be an object")
503+
return nested
504+
return data
505+
506+
507+
def _optional_mapping(data: object, field_name: str) -> Mapping[str, Any]:
508+
if data is None:
509+
return {}
510+
if not isinstance(data, Mapping):
511+
raise ValueError(f"external payload storage policy {field_name} must be an object")
512+
return data
513+
514+
515+
def _optional_string(data: object, field_name: str) -> str | None:
516+
if data is None:
517+
return None
518+
if not isinstance(data, str):
519+
raise ValueError(f"external payload storage policy {field_name} must be a string")
520+
return data
521+
522+
523+
def _optional_positive_int(data: object, field_name: str) -> int | None:
524+
if data is None:
525+
return None
526+
if not isinstance(data, int) or data < 1:
527+
raise ValueError(f"external payload storage policy {field_name} must be a positive integer")
528+
return data
529+
530+
531+
def _policy_string(policy: ExternalPayloadStoragePolicy, key: str) -> str | None:
532+
return _optional_string(policy.config.get(key), f"config.{key}")
533+
534+
535+
def _policy_prefix(policy: ExternalPayloadStoragePolicy) -> str:
536+
return _policy_string(policy, "prefix") or policy.prefix
537+
538+
539+
def _local_path(root: str | Path) -> str | Path:
540+
if isinstance(root, Path):
541+
return root
542+
parsed = urlparse(root)
543+
if parsed.scheme == "file":
544+
if parsed.netloc not in {"", "localhost"}:
545+
raise ValueError("local external payload storage file URI must be local")
546+
return unquote(parsed.path)
547+
if parsed.scheme:
548+
raise ValueError("local external payload storage policy must use a file:// URI or filesystem path")
549+
return root
550+
551+
552+
def _s3_bucket_from_reference(reference: str | None) -> str | None:
553+
if reference is None:
554+
return None
555+
if reference.startswith("arn:aws:s3:::"):
556+
bucket = reference.removeprefix("arn:aws:s3:::").split("/", 1)[0]
557+
return bucket or None
558+
parsed = urlparse(reference)
559+
if parsed.scheme == "s3" and parsed.netloc:
560+
return parsed.netloc
561+
return reference or None
562+
563+
564+
def _gcs_bucket_from_reference(reference: str | None) -> str | None:
565+
if reference is None:
566+
return None
567+
marker = "/buckets/"
568+
if marker in reference:
569+
bucket = reference.rsplit(marker, 1)[1].split("/", 1)[0]
570+
return bucket or None
571+
parsed = urlparse(reference)
572+
if parsed.scheme == "gs" and parsed.netloc:
573+
return parsed.netloc
574+
return reference or None
575+
576+
577+
def _azure_container_from_reference(reference: str | None) -> str | None:
578+
if reference is None:
579+
return None
580+
parsed = urlparse(reference)
581+
if parsed.scheme in {"azure", "azure-blob"} and parsed.netloc:
582+
return parsed.netloc
583+
return reference or None
584+
585+
390586
def _validate_rfc3339(value: str) -> None:
391587
normalized = value[:-1] + "+00:00" if value.endswith("Z") else value
392588
try:

tests/test_external_storage.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
ExternalPayloadCache,
1111
ExternalPayloadIntegrityError,
1212
ExternalPayloadReference,
13+
ExternalPayloadStoragePolicy,
1314
GCSExternalStorage,
1415
LocalFilesystemExternalStorage,
1516
S3ExternalStorage,
1617
delete_external_payload,
18+
external_storage_driver_from_policy,
1719
fetch_external_payload,
1820
store_external_payload,
1921
)
@@ -419,3 +421,99 @@ def test_azure_external_storage_round_trips_and_deletes_payload() -> None:
419421
def test_object_storage_rejects_unsafe_prefix() -> None:
420422
with pytest.raises(ValueError, match="unsafe"):
421423
S3ExternalStorage(FakeS3Client(), bucket="payloads", prefix="../tenant-a")
424+
425+
426+
def test_external_storage_policy_builds_s3_driver_from_server_namespace_policy() -> None:
427+
client = FakeS3Client()
428+
policy = ExternalPayloadStoragePolicy.from_dict(
429+
{
430+
"external_payload_storage": {
431+
"driver": "s3",
432+
"enabled": True,
433+
"threshold_bytes": 2_097_152,
434+
"config": {
435+
"bucket": "dw-payloads",
436+
"prefix": "billing/",
437+
},
438+
}
439+
}
440+
)
441+
442+
storage = external_storage_driver_from_policy(policy, s3_client=client)
443+
assert isinstance(storage, S3ExternalStorage)
444+
assert policy.threshold_bytes == 2_097_152
445+
446+
reference = store_external_payload(storage, b'{"from":"server-policy"}', codec="json")
447+
448+
assert reference.uri.startswith("s3://dw-payloads/billing/json/")
449+
assert fetch_external_payload(storage, reference) == b'{"from":"server-policy"}'
450+
451+
452+
def test_external_storage_policy_builds_gcs_driver_from_cloud_reference() -> None:
453+
client = FakeGCSClient()
454+
policy = ExternalPayloadStoragePolicy.from_dict(
455+
{
456+
"enabled": True,
457+
"mode": "byob",
458+
"driver": "gcs",
459+
"reference": "projects/acme/buckets/workflow-payloads",
460+
"prefix": "prod",
461+
"threshold_bytes": 1_500_000,
462+
"status": "pending_validation",
463+
}
464+
)
465+
466+
storage = external_storage_driver_from_policy(policy, gcs_client=client)
467+
468+
reference = store_external_payload(storage, b'{"from":"cloud-policy"}', codec="json")
469+
470+
assert reference.uri.startswith("gs://workflow-payloads/prod/json/")
471+
assert fetch_external_payload(storage, reference) == b'{"from":"cloud-policy"}'
472+
473+
474+
def test_external_storage_policy_builds_local_driver_from_file_uri(tmp_path: Path) -> None:
475+
root = tmp_path / "payloads"
476+
storage = external_storage_driver_from_policy(
477+
{
478+
"driver": "local",
479+
"enabled": True,
480+
"config": {"uri": root.as_uri()},
481+
}
482+
)
483+
484+
reference = store_external_payload(storage, b"local-policy", codec="json")
485+
486+
assert fetch_external_payload(storage, reference) == b"local-policy"
487+
488+
489+
def test_external_storage_policy_builds_azure_driver_from_container_config() -> None:
490+
client = FakeAzureContainerClient()
491+
storage = external_storage_driver_from_policy(
492+
{
493+
"driver": "azure",
494+
"enabled": True,
495+
"config": {"container": "payloads", "prefix": "tenant-a"},
496+
},
497+
azure_container_client=client,
498+
)
499+
500+
reference = store_external_payload(storage, b'{"from":"azure-policy"}', codec="json")
501+
502+
assert reference.uri.startswith("azure-blob://payloads/tenant-a/json/")
503+
assert fetch_external_payload(storage, reference) == b'{"from":"azure-policy"}'
504+
505+
506+
def test_external_storage_policy_rejects_disabled_or_missing_provider_client() -> None:
507+
with pytest.raises(ValueError, match="disabled"):
508+
external_storage_driver_from_policy({"driver": "s3", "enabled": False, "config": {"bucket": "payloads"}})
509+
510+
with pytest.raises(ValueError, match="s3_client"):
511+
external_storage_driver_from_policy({"driver": "s3", "enabled": True, "config": {"bucket": "payloads"}})
512+
513+
514+
def test_external_storage_policy_validates_threshold_and_config_shape() -> None:
515+
with pytest.raises(ValueError, match="threshold_bytes"):
516+
ExternalPayloadStoragePolicy.from_dict({"driver": "local", "threshold_bytes": 0})
517+
518+
with pytest.raises(ValueError, match="config"):
519+
ExternalPayloadStoragePolicy.from_dict({"driver": "local", "config": "file:///tmp/payloads"})

0 commit comments

Comments
 (0)