Skip to content

Commit 6168ebc

Browse files
authored
External storage and codec server sample (#307)
1 parent dfff599 commit 6168ebc

13 files changed

Lines changed: 1568 additions & 92 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ Some examples require extra dependencies. See each sample's directory for specif
6969
* [eager_wf_start](eager_wf_start) - Run a workflow using Eager Workflow Start
7070
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
7171
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
72+
* [external_storage](external_storage) - Offload large payloads to S3-compatible object storage, plus a codec server for the Web UI and CLI.
7273
* [gevent_async](gevent_async) - Combine gevent and Temporal.
7374
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
7475
* [langchain](langchain) - Orchestrate workflows for LangChain.

external_storage/README.md

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# External Storage Sample
2+
3+
This sample demonstrates how to offload large workflow payloads to Amazon S3-compatible
4+
object storage using the Temporal Python SDK's built-in `ExternalStorage` system,
5+
combined with a gzip `PayloadCodec` so the payloads stored inline in Temporal and in
6+
S3 are both compressed.
7+
8+
**Scenario:** A fulfillment center processes batches of shipping orders. The workflow
9+
receives a small request (a batch ID and order count), then internally calls a
10+
`fetch_orders` activity that returns the full list of orders with customer records,
11+
line items, and handling notes. That list — several hundred kilobytes even after
12+
compression — is passed to a second `process_orders` activity. Finally the workflow
13+
returns a small `BatchSummary` with totals.
14+
15+
Each payload is first compressed by `CompressionCodec`. The SDK then checks the
16+
compressed size against the default 256 KiB threshold; payloads still above it are
17+
stored in S3 and replaced inline with compact claim-check references. The workflow's
18+
own input (`OrderBatchRequest`) and result (`BatchSummary`) compress to a few hundred
19+
bytes and remain inline.
20+
21+
A mock S3 service (`s3.py`) is included so you can run the sample locally without
22+
an AWS account or Docker. A `codec_server.py` is included to decompress payloads
23+
on demand for the Temporal Web UI.
24+
25+
## Prerequisites
26+
27+
* [uv](https://docs.astral.sh/uv/)
28+
* [Temporal CLI](https://docs.temporal.io/cli#install) with a local dev server running:
29+
```
30+
temporal server start-dev
31+
```
32+
33+
## 1. Sync dependencies
34+
35+
```bash
36+
uv sync --group external-storage
37+
```
38+
39+
## 2. Start the mock S3 service
40+
41+
In a dedicated terminal:
42+
43+
```bash
44+
uv run external_storage/s3.py
45+
```
46+
47+
This starts a local S3-compatible server on port 5000 and creates the `temporal-payloads`
48+
bucket. Leave it running for the duration of the sample.
49+
50+
## 3. Run the worker
51+
52+
In a second terminal:
53+
54+
```bash
55+
uv run external_storage/worker.py
56+
```
57+
58+
## 4. Run the starter
59+
60+
In a third terminal:
61+
62+
```bash
63+
uv run external_storage/starter.py
64+
```
65+
66+
Example output:
67+
68+
```
69+
Starting workflow external-storage-20260501-120000 (batch_id=BATCH-20260501-120000, order_count=200)
70+
71+
Batch BATCH-20260501-120000: 200 orders processed
72+
Total shipping cost: $28,512.40
73+
Total weight: 19,684.2 kg
74+
Avg delivery: 4.4 days
75+
```
76+
77+
## 5. (Optional) Run the codec server
78+
79+
Workflow payloads are gzip-compressed; the large ones additionally live in S3 as
80+
external storage references. The codec server serves both transformations on demand
81+
for the Temporal Web UI. Run it in a fourth terminal:
82+
83+
```bash
84+
uv run external_storage/codec_server.py
85+
```
86+
87+
In the Temporal Web UI (http://localhost:8233), open Settings → Data Encoder and set
88+
the Remote Codec Endpoint to `http://localhost:8081`. Reload the workflow page and the
89+
inline compressed payloads will be displayed as readable JSON, and externally-stored
90+
payloads can be downloaded to fetch their actual content from S3.
91+
92+
The Web UI sends the namespace as the `X-Namespace` header on each request, so
93+
multi-namespace setups can dispatch by reading that header.
94+
95+
| Endpoint | Behavior |
96+
| --- | --- |
97+
| `POST /encode` | Compress the payload, then offload to S3 if it exceeds the threshold. |
98+
| `POST /decode` | Retrieve any external storage references from S3, then decompress. Pass `?preserveStorageRefs=true` to leave references as-is. |
99+
| `POST /download` | All inputs must be storage references. Retrieves them from S3 and decompresses. |
100+
101+
## 6. Inspect the workflow
102+
103+
Run `temporal workflow show` to see how payloads are stored:
104+
105+
```bash
106+
temporal workflow show --workflow-id external-storage-<timestamp>
107+
```
108+
109+
The workflow's input (`OrderBatchRequest`) and result (`BatchSummary`) are gzip-encoded
110+
and stored inline in Temporal — small enough to compress to a few hundred bytes. The
111+
two activity payloads carrying the order list — the output of `fetch_orders` and the
112+
input to `process_orders` — exceed 256 KiB even after compression, so they appear as
113+
external storage references, confirming the SDK offloaded them to S3.
114+
115+
## How it works
116+
117+
The `DataConverter` is configured with both a `payload_codec` and an `external_storage`:
118+
119+
```python
120+
driver = S3StorageDriver(
121+
client=new_aioboto3_client(s3_client),
122+
bucket=S3_BUCKET,
123+
)
124+
data_converter = dataclasses.replace(
125+
temporalio.converter.default(),
126+
payload_codec=CompressionCodec(),
127+
external_storage=ExternalStorage(drivers=[driver]),
128+
)
129+
```
130+
131+
On the encode path the SDK:
132+
133+
1. Serializes the Python value to a `Payload`.
134+
2. Runs `CompressionCodec.encode` to gzip the payload bytes.
135+
3. Checks the compressed size against `payload_size_threshold` (default: 256 KiB).
136+
4. If still above the threshold, stores the compressed bytes in S3 via
137+
`S3StorageDriver` and replaces the inline payload with a claim-check reference.
138+
139+
On the decode path the SDK reverses these steps, transparently retrieving from S3 and
140+
decompressing as needed.
141+
142+
Both the worker and the starter must use the **same** `DataConverter` configuration
143+
(codec **and** storage) so each side can read what the other wrote.

external_storage/__init__.py

Whitespace-only changes.

external_storage/_sample_data.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""Order data generation for the external_storage sample.
2+
3+
Produces payloads large enough to exceed the default 256 KiB ExternalStorage
4+
threshold without hand-crafted catalogs of items, addresses, or notes — each
5+
order is padded with random filler text in its item descriptions and shipping
6+
notes. Calibrated so 100 orders serialize to roughly 300 KiB of JSON.
7+
"""
8+
9+
import random
10+
import string
11+
from typing import List
12+
13+
from external_storage.workflows import Address, Customer, Order, OrderItem
14+
15+
_CITIES = [
16+
("Houston", "TX"),
17+
("Dallas", "TX"),
18+
("Los Angeles", "CA"),
19+
("San Francisco", "CA"),
20+
("Denver", "CO"),
21+
("Miami", "FL"),
22+
("Chicago", "IL"),
23+
("New York", "NY"),
24+
("Seattle", "WA"),
25+
("Atlanta", "GA"),
26+
]
27+
28+
_ITEMS_PER_ORDER = 5
29+
_ITEM_DESCRIPTION_CHARS = 500
30+
_SHIPPING_NOTES_CHARS = 200
31+
32+
33+
def _filler(rng: random.Random, n: int) -> str:
34+
return "".join(rng.choices(string.ascii_letters + " ", k=n))
35+
36+
37+
def generate_orders(batch_id: str, count: int) -> List[Order]:
38+
return [_generate_order(batch_id, i) for i in range(1, count + 1)]
39+
40+
41+
def _generate_order(batch_id: str, index: int) -> Order:
42+
rng = random.Random(f"{batch_id}-{index}")
43+
city, state = rng.choice(_CITIES)
44+
items = [
45+
OrderItem(
46+
sku=f"SKU-{rng.randint(10000, 99999)}",
47+
name=f"Product {rng.randint(1, 999)}",
48+
description=_filler(rng, _ITEM_DESCRIPTION_CHARS),
49+
quantity=rng.randint(1, 10),
50+
unit_price_usd=round(rng.uniform(10.0, 1000.0), 2),
51+
weight_kg=round(rng.uniform(0.5, 50.0), 2),
52+
)
53+
for _ in range(_ITEMS_PER_ORDER)
54+
]
55+
return Order(
56+
id=f"ORD-{index:06d}",
57+
customer=Customer(
58+
id=f"CUST-{index:06d}",
59+
name=f"Customer {index}",
60+
email=f"customer{index}@example.com",
61+
address=Address(
62+
street=f"{rng.randint(100, 9999)} Main Street",
63+
city=city,
64+
state=state,
65+
zip_code=f"{rng.randint(10000, 99999)}",
66+
country="US",
67+
),
68+
),
69+
items=items,
70+
total_weight_kg=round(sum(i.weight_kg * i.quantity for i in items), 2),
71+
shipping_notes=_filler(rng, _SHIPPING_NOTES_CHARS),
72+
)

external_storage/codec.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import gzip
2+
from collections.abc import Iterable
3+
from typing import List
4+
5+
from temporalio.api.common.v1 import Payload
6+
from temporalio.converter import PayloadCodec
7+
8+
9+
class CompressionCodec(PayloadCodec):
10+
"""Gzip-based payload codec."""
11+
12+
ENCODING = b"binary/gzip"
13+
14+
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
15+
return [
16+
Payload(
17+
metadata={"encoding": self.ENCODING},
18+
data=gzip.compress(p.SerializeToString()),
19+
)
20+
for p in payloads
21+
]
22+
23+
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
24+
result: List[Payload] = []
25+
for p in payloads:
26+
if p.metadata.get("encoding") == self.ENCODING:
27+
result.append(Payload.FromString(gzip.decompress(p.data)))
28+
else:
29+
result.append(p)
30+
return result

0 commit comments

Comments
 (0)