Skip to content

Commit 18f0c31

Browse files
committed
Merge remote-tracking branch 'origin/main' into strands-plugin-samples
# Conflicts: # pyproject.toml # uv.lock
2 parents 1faf2fe + fded925 commit 18f0c31

13 files changed

Lines changed: 4453 additions & 1788 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7070
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
7171
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
7272
* [external_storage](external_storage) - Offload large payloads to S3-compatible object storage, plus a codec server for the Web UI and CLI.
73+
* [external_storage_redis](external_storage_redis) - Redis driver for external storage
7374
* [gevent_async](gevent_async) - Combine gevent and Temporal.
7475
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
7576
* [langchain](langchain) - Orchestrate workflows for LangChain.

external_storage_redis/README.md

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Redis External Storage
2+
3+
This sample packages a Redis-backed `StorageDriver` implementation for Temporal
4+
external storage.
5+
6+
The code lives in:
7+
8+
* `external_storage_redis/_driver.py` for the `RedisStorageDriver`
9+
* `external_storage_redis/_client.py` for the storage client abstraction
10+
* `external_storage_redis/redis_asyncio.py` for the `redis.asyncio` adapter
11+
* `tests/external_storage_redis/` for unit and worker integration tests
12+
13+
Unlike most samples in this repository, this one is primarily reusable driver
14+
code plus tests rather than a standalone `worker.py` / `starter.py` pair.
15+
16+
## Install Dependencies
17+
18+
From the repository root:
19+
20+
uv sync --group external-storage-redis --group dev
21+
22+
The `external-storage-redis` group installs `redis`, and the `dev` group
23+
installs `fakeredis` for the test suite.
24+
25+
## Using The Driver
26+
27+
```python
28+
import dataclasses
29+
30+
import redis.asyncio as redis
31+
import temporalio.converter
32+
from temporalio.client import Client
33+
from temporalio.converter import ExternalStorage
34+
35+
from external_storage_redis import RedisStorageDriver
36+
from external_storage_redis.redis_asyncio import new_redis_asyncio_client
37+
38+
redis_client = redis.Redis.from_url(
39+
"redis://localhost:6379/0",
40+
decode_responses=False,
41+
)
42+
try:
43+
driver = RedisStorageDriver(
44+
client=new_redis_asyncio_client(redis_client),
45+
key_prefix="temporalio:payloads",
46+
)
47+
48+
client = await Client.connect(
49+
"localhost:7233",
50+
data_converter=dataclasses.replace(
51+
temporalio.converter.default(),
52+
external_storage=ExternalStorage(
53+
drivers=[driver],
54+
payload_size_threshold=256 * 1024,
55+
),
56+
),
57+
)
58+
finally:
59+
await redis_client.aclose()
60+
```
61+
62+
`decode_responses=False` is required because the driver stores serialized
63+
Temporal `Payload` protobuf bytes as Redis values rather than text.
64+
65+
## Driver Behavior
66+
67+
`RedisStorageDriver` accepts these constructor options:
68+
69+
* `driver_name`: defaults to `"redis"`
70+
* `key_prefix`: defaults to `"temporalio:payloads"`
71+
* `ttl`: optional expiration applied only when a key is first inserted
72+
* `max_payload_size`: defaults to 50 MiB
73+
74+
Stored keys are content-addressed using SHA-256 and include Temporal execution
75+
context when it is available. A typical workflow-scoped key looks like:
76+
77+
temporalio:payloads:v0:ns:default:wt:MyWorkflow:wi:my-workflow-id:ri:my-run-id:d:sha256:<hash>
78+
79+
Some behavior to be aware of:
80+
81+
* Any driver used to store payloads must also be configured on the component
82+
that retrieves them.
83+
* The Redis instance must already exist; the driver does not provision it.
84+
* Identical serialized bytes within the same namespace and workflow/activity
85+
scope share the same Redis key.
86+
* Workflow, activity, namespace, and run identifiers are URL-encoded before
87+
being placed into the key.
88+
* Only payloads at or above `ExternalStorage.payload_size_threshold` are
89+
offloaded.
90+
* If `ttl` is set, duplicate stores do not refresh expiration.
91+
* If a payload key is missing at retrieval time, the driver raises a
92+
non-retryable `ApplicationError`.
93+
94+
## Custom Redis Clients
95+
96+
To use a Redis library other than `redis.asyncio`, implement
97+
`RedisStorageDriverClient`:
98+
99+
```python
100+
from datetime import timedelta
101+
102+
from external_storage_redis import RedisStorageDriverClient
103+
104+
105+
class MyRedisClient(RedisStorageDriverClient):
106+
async def get(self, *, key: str) -> bytes | None: ...
107+
108+
async def set_if_absent(
109+
self,
110+
*,
111+
key: str,
112+
data: bytes,
113+
ttl: timedelta | None = None,
114+
) -> bool: ...
115+
```
116+
117+
## Tests
118+
119+
Run the full Redis sample test suite with:
120+
121+
uv run pytest tests/external_storage_redis
122+
123+
Run only the in-memory unit tests with:
124+
125+
uv run pytest tests/external_storage_redis/test_redis.py
126+
127+
The worker integration tests use `WorkflowEnvironment.start_local()` and
128+
`fakeredis`. They do not require a real Redis server, but the first run may
129+
download a Temporal dev-server binary.
130+
131+
Some Temporal dev-server builds disable standalone activity execution. When
132+
that happens, the two standalone-activity integration tests skip automatically.

external_storage_redis/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""Redis storage driver sample for Temporal external storage."""
2+
3+
from external_storage_redis._client import RedisStorageDriverClient
4+
from external_storage_redis._driver import RedisStorageDriver
5+
6+
__all__ = [
7+
"RedisStorageDriverClient",
8+
"RedisStorageDriver",
9+
]

external_storage_redis/_client.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""Redis storage driver client abstraction."""
2+
3+
from __future__ import annotations
4+
5+
from abc import ABC, abstractmethod
6+
from datetime import timedelta
7+
8+
9+
class RedisStorageDriverClient(ABC):
10+
"""Abstract base class for the Redis operations used by the driver."""
11+
12+
@abstractmethod
13+
async def get(self, *, key: str) -> bytes | None:
14+
"""Return the raw bytes stored for *key*, or ``None`` if absent."""
15+
16+
@abstractmethod
17+
async def set_if_absent(
18+
self,
19+
*,
20+
key: str,
21+
data: bytes,
22+
ttl: timedelta | None = None,
23+
) -> bool:
24+
"""Store *data* under *key* only if the key does not already exist.
25+
26+
Args:
27+
key: Redis key to store.
28+
data: Serialized payload bytes.
29+
ttl: Optional expiration to apply only when the value is inserted.
30+
31+
Returns:
32+
``True`` if the value was inserted, ``False`` if the key already
33+
existed.
34+
"""

0 commit comments

Comments
 (0)