Skip to content

Commit e713e94

Browse files
matteolibrizzi-scaleclaude
authored andcommitted
feat(lib): expose data_converter kwarg on AgentexWorker and Temporal client APIs
Adds a `data_converter` kwarg to: - `AgentexWorker.__init__` and `worker.get_temporal_client` - `clients.temporal.utils.get_temporal_client` - `TemporalClient` / `TemporalACP` / `TemporalACPConfig` / `FastACP` This unlocks composing `OpenAIAgentsPlugin` with a payload codec by passing a pre-built `DataConverter(payload_converter_class=OpenAIPayloadConverter, payload_codec=...)`. Previously the plugin would silently drop a standalone `payload_codec` kwarg because its `_data_converter(None)` transformer builds a fresh converter without any codec — the existing guard rejected this combination outright. The guard is refined: it now fires only when both the plugin is present AND `payload_codec` is the standalone kwarg AND no `data_converter` was supplied, and the error message points callers at the working composition path. An additional guard rejects passing `payload_codec` and `data_converter` together as ambiguous. No behavior change for existing callers (none currently pass `data_converter`). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0a2418c commit e713e94

7 files changed

Lines changed: 285 additions & 35 deletions

File tree

src/agentex/lib/core/clients/temporal/temporal_client.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from temporalio.client import Client, WorkflowExecutionStatus
88
from temporalio.common import RetryPolicy as TemporalRetryPolicy, WorkflowIDReusePolicy
99
from temporalio.service import RPCError, RPCStatusCode
10-
from temporalio.converter import PayloadCodec
10+
from temporalio.converter import DataConverter, PayloadCodec
1111

1212
from agentex.lib.utils.logging import make_logger
1313
from agentex.lib.utils.model_utils import BaseModel
@@ -78,11 +78,16 @@
7878

7979
class TemporalClient:
8080
def __init__(
81-
self, temporal_client: Client | None = None, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None
81+
self,
82+
temporal_client: Client | None = None,
83+
plugins: list[Any] = [],
84+
payload_codec: PayloadCodec | None = None,
85+
data_converter: DataConverter | None = None,
8286
):
8387
self._client: Client | None = temporal_client
8488
self._plugins = plugins
8589
self._payload_codec = payload_codec
90+
self._data_converter = data_converter
8691

8792
@property
8893
def client(self) -> Client:
@@ -92,7 +97,13 @@ def client(self) -> Client:
9297
return self._client
9398

9499
@classmethod
95-
async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None):
100+
async def create(
101+
cls,
102+
temporal_address: str,
103+
plugins: list[Any] = [],
104+
payload_codec: PayloadCodec | None = None,
105+
data_converter: DataConverter | None = None,
106+
):
96107
if temporal_address in [
97108
"false",
98109
"False",
@@ -105,8 +116,13 @@ async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_co
105116
]:
106117
_client = None
107118
else:
108-
_client = await get_temporal_client(temporal_address, plugins=plugins, payload_codec=payload_codec)
109-
return cls(_client, plugins, payload_codec)
119+
_client = await get_temporal_client(
120+
temporal_address,
121+
plugins=plugins,
122+
payload_codec=payload_codec,
123+
data_converter=data_converter,
124+
)
125+
return cls(_client, plugins, payload_codec, data_converter)
110126

111127
async def setup(self, temporal_address: str):
112128
self._client = await self._get_temporal_client(temporal_address=temporal_address)
@@ -124,7 +140,12 @@ async def _get_temporal_client(self, temporal_address: str) -> Client | None:
124140
]:
125141
return None
126142
else:
127-
return await get_temporal_client(temporal_address, plugins=self._plugins, payload_codec=self._payload_codec)
143+
return await get_temporal_client(
144+
temporal_address,
145+
plugins=self._plugins,
146+
payload_codec=self._payload_codec,
147+
data_converter=self._data_converter,
148+
)
128149

129150
async def start_workflow(
130151
self,

src/agentex/lib/core/clients/temporal/utils.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from temporalio.client import Client, Plugin as ClientPlugin
77
from temporalio.worker import Interceptor
88
from temporalio.runtime import Runtime, TelemetryConfig, OpenTelemetryConfig
9-
from temporalio.converter import PayloadCodec
9+
from temporalio.converter import DataConverter, PayloadCodec
1010
from temporalio.contrib.pydantic import pydantic_data_converter
1111

1212
# class DateTimeJSONEncoder(AdvancedJSONEncoder):
@@ -86,6 +86,7 @@ async def get_temporal_client(
8686
metrics_url: str | None = None,
8787
plugins: list[Any] = [],
8888
payload_codec: PayloadCodec | None = None,
89+
data_converter: DataConverter | None = None,
8990
) -> Client:
9091
"""
9192
Create a Temporal client with plugin integration.
@@ -94,7 +95,14 @@ async def get_temporal_client(
9495
temporal_address: Temporal server address
9596
metrics_url: Optional metrics endpoint URL
9697
plugins: List of Temporal plugins to include
97-
payload_codec: Optional payload codec for encoding/decoding payloads (e.g. encryption, compression)
98+
payload_codec: Optional payload codec for encoding/decoding payloads
99+
(e.g. encryption, compression). Cannot be combined with the
100+
OpenAIAgentsPlugin via this kwarg — see ``data_converter``.
101+
data_converter: Optional pre-built ``DataConverter``. Use this when
102+
composing the OpenAIAgentsPlugin with a payload codec: build a
103+
``DataConverter(payload_converter_class=OpenAIPayloadConverter,
104+
payload_codec=...)`` and pass it here. Mutually exclusive with
105+
``payload_codec``.
98106
99107
Returns:
100108
Configured Temporal client
@@ -103,29 +111,50 @@ async def get_temporal_client(
103111
if plugins:
104112
validate_client_plugins(plugins)
105113

114+
if payload_codec is not None and data_converter is not None:
115+
raise ValueError(
116+
"Pass payload_codec inside `data_converter` "
117+
"(DataConverter(..., payload_codec=...)) instead of as a separate "
118+
"kwarg. Specifying both is ambiguous."
119+
)
120+
106121
# Check if OpenAI plugin is present - it needs to configure its own data converter
107122
# Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents
108123
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
109124

110125
has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or []))
111126

112-
if has_openai_plugin and payload_codec is not None:
127+
# When the OpenAI plugin is present, its `_data_converter` transformer
128+
# builds a fresh DataConverter (without any codec) if none is supplied,
129+
# so a standalone `payload_codec` kwarg would be silently dropped and
130+
# payloads would land in Temporal in plain text. Guide the caller to
131+
# the working composition path instead.
132+
if has_openai_plugin and payload_codec is not None and data_converter is None:
113133
raise ValueError(
114-
"payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin "
115-
"installs its own data converter and the codec would be silently ignored, "
116-
"leaving payloads unencoded. Remove one or the other."
134+
"payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would "
135+
"be silently dropped by the plugin's data-converter transformer. "
136+
"Build a DataConverter explicitly with "
137+
"`payload_converter_class=OpenAIPayloadConverter` (or a subclass) "
138+
"and `payload_codec=...`, then pass it via the `data_converter` "
139+
"kwarg instead."
117140
)
118141

119-
connect_kwargs = {
142+
connect_kwargs: dict[str, Any] = {
120143
"target_host": temporal_address,
121144
"plugins": plugins,
122145
}
123146

124-
if not has_openai_plugin:
125-
data_converter = pydantic_data_converter
126-
if payload_codec:
127-
data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec)
147+
if data_converter is not None:
148+
# Caller supplied a pre-built converter. With the OpenAI plugin present
149+
# and `payload_converter_class=OpenAIPayloadConverter` (or subclass),
150+
# the plugin's `_data_converter` transformer passes it through intact,
151+
# preserving any payload_codec.
128152
connect_kwargs["data_converter"] = data_converter
153+
elif not has_openai_plugin:
154+
dc = pydantic_data_converter
155+
if payload_codec:
156+
dc = dataclasses.replace(dc, payload_codec=payload_codec)
157+
connect_kwargs["data_converter"] = dc
129158

130159
if not metrics_url:
131160
client = await Client.connect(**connect_kwargs)

src/agentex/lib/core/temporal/workers/worker.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,35 +95,55 @@ async def get_temporal_client(
9595
metrics_url: str | None = None,
9696
plugins: list = [],
9797
payload_codec: PayloadCodec | None = None,
98+
data_converter: DataConverter | None = None,
9899
) -> Client:
99100
if plugins != []: # We don't need to validate the plugins if they are empty
100101
_validate_plugins(plugins)
101102

103+
if payload_codec is not None and data_converter is not None:
104+
raise ValueError(
105+
"Pass payload_codec inside `data_converter` "
106+
"(DataConverter(..., payload_codec=...)) instead of as a separate "
107+
"kwarg. Specifying both is ambiguous."
108+
)
109+
102110
# Check if OpenAI plugin is present - it needs to configure its own data converter
103111
# Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents
104112
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
105113

106114
has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or []))
107115

108-
if has_openai_plugin and payload_codec is not None:
116+
# When the OpenAI plugin is present, its `_data_converter` transformer
117+
# builds a fresh DataConverter (without any codec) if none is supplied,
118+
# so a standalone `payload_codec` kwarg would be silently dropped and
119+
# payloads would land in Temporal in plain text. Guide the caller to
120+
# the working composition path instead.
121+
if has_openai_plugin and payload_codec is not None and data_converter is None:
109122
raise ValueError(
110-
"payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin "
111-
"installs its own data converter and the codec would be silently ignored, "
112-
"leaving payloads unencoded. Remove one or the other."
123+
"payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would "
124+
"be silently dropped by the plugin's data-converter transformer. "
125+
"Build a DataConverter explicitly with "
126+
"`payload_converter_class=OpenAIPayloadConverter` (or a subclass) "
127+
"and `payload_codec=...`, then pass it via the `data_converter` "
128+
"kwarg instead."
113129
)
114130

115-
# Build connection kwargs
116-
connect_kwargs = {
131+
connect_kwargs: dict[str, Any] = {
117132
"target_host": temporal_address,
118133
"plugins": plugins,
119134
}
120135

121-
# Only set data_converter if OpenAI plugin is not present
122-
if not has_openai_plugin:
123-
data_converter = custom_data_converter
124-
if payload_codec:
125-
data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec)
136+
if data_converter is not None:
137+
# Caller supplied a pre-built converter. With the OpenAI plugin present
138+
# and `payload_converter_class=OpenAIPayloadConverter` (or subclass),
139+
# the plugin's `_data_converter` transformer passes it through intact,
140+
# preserving any payload_codec.
126141
connect_kwargs["data_converter"] = data_converter
142+
elif not has_openai_plugin:
143+
dc = custom_data_converter
144+
if payload_codec:
145+
dc = dataclasses.replace(dc, payload_codec=payload_codec)
146+
connect_kwargs["data_converter"] = dc
127147

128148
if not metrics_url:
129149
client = await Client.connect(**connect_kwargs)
@@ -145,6 +165,7 @@ def __init__(
145165
interceptors: list = [],
146166
metrics_url: str | None = None,
147167
payload_codec: PayloadCodec | None = None,
168+
data_converter: DataConverter | None = None,
148169
):
149170
self.task_queue = task_queue
150171
self.activity_handles = []
@@ -159,6 +180,7 @@ def __init__(
159180
self.interceptors = interceptors
160181
self.metrics_url = metrics_url
161182
self.payload_codec = payload_codec
183+
self.data_converter = data_converter
162184

163185
@overload
164186
async def run(
@@ -195,6 +217,7 @@ async def run(
195217
plugins=self.plugins,
196218
metrics_url=self.metrics_url,
197219
payload_codec=self.payload_codec,
220+
data_converter=self.data_converter,
198221
)
199222

200223
# Enable debug mode if AgentEx debug is enabled (disables deadlock detection)

src/agentex/lib/sdk/fastacp/fastacp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ def create_async_acp(config: AsyncACPConfig, **kwargs) -> BaseACPServer:
6565
temporal_config["interceptors"] = config.interceptors # type: ignore[attr-defined]
6666
if hasattr(config, "payload_codec"):
6767
temporal_config["payload_codec"] = config.payload_codec # type: ignore[attr-defined]
68+
if hasattr(config, "data_converter"):
69+
temporal_config["data_converter"] = config.data_converter # type: ignore[attr-defined]
6870
return implementation_class.create(**temporal_config)
6971
else:
7072
return implementation_class.create(**kwargs)

src/agentex/lib/sdk/fastacp/impl/temporal_acp.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from contextlib import asynccontextmanager
55

66
from fastapi import FastAPI
7-
from temporalio.converter import PayloadCodec
7+
from temporalio.converter import DataConverter, PayloadCodec
88

99
from agentex.protocol.acp import (
1010
SendEventParams,
@@ -33,13 +33,15 @@ def __init__(
3333
plugins: list[Any] | None = None,
3434
interceptors: list[Any] | None = None,
3535
payload_codec: PayloadCodec | None = None,
36+
data_converter: DataConverter | None = None,
3637
):
3738
super().__init__()
3839
self._temporal_task_service = temporal_task_service
3940
self._temporal_address = temporal_address
4041
self._plugins = plugins or []
4142
self._interceptors = interceptors or []
4243
self._payload_codec = payload_codec
44+
self._data_converter = data_converter
4345

4446
@classmethod
4547
@override
@@ -49,12 +51,17 @@ def create(
4951
plugins: list[Any] | None = None,
5052
interceptors: list[Any] | None = None,
5153
payload_codec: PayloadCodec | None = None,
54+
data_converter: DataConverter | None = None,
5255
) -> "TemporalACP":
5356
logger.info("Initializing TemporalACP instance")
5457

5558
# Create instance without temporal client initially
5659
temporal_acp = cls(
57-
temporal_address=temporal_address, plugins=plugins, interceptors=interceptors, payload_codec=payload_codec
60+
temporal_address=temporal_address,
61+
plugins=plugins,
62+
interceptors=interceptors,
63+
payload_codec=payload_codec,
64+
data_converter=data_converter,
5865
)
5966
temporal_acp._setup_handlers()
6067
logger.info("TemporalACP instance initialized now")
@@ -71,7 +78,10 @@ async def lifespan(app: FastAPI):
7178
if self._temporal_task_service is None:
7279
env_vars = EnvironmentVariables.refresh()
7380
temporal_client = await TemporalClient.create(
74-
temporal_address=self._temporal_address, plugins=self._plugins, payload_codec=self._payload_codec
81+
temporal_address=self._temporal_address,
82+
plugins=self._plugins,
83+
payload_codec=self._payload_codec,
84+
data_converter=self._data_converter,
7585
)
7686
self._temporal_task_service = TemporalTaskService(
7787
temporal_client=temporal_client,

src/agentex/lib/types/fastacp.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,24 @@ class TemporalACPConfig(AsyncACPConfig):
5656
encoding/decoding payloads (e.g. encryption, compression). NOTE:
5757
this only configures the ACP (client) side. The worker side must
5858
be configured separately via ``AgentexWorker(payload_codec=...)``
59-
with the SAME codec, or decode will fail at runtime.
59+
with the SAME codec, or decode will fail at runtime. Cannot be
60+
combined with ``OpenAIAgentsPlugin``; use ``data_converter``
61+
instead in that case.
62+
data_converter: Optional pre-built ``temporalio.converter.DataConverter``.
63+
Use this when composing the ``OpenAIAgentsPlugin`` with a payload
64+
codec: build a ``DataConverter(payload_converter_class=
65+
OpenAIPayloadConverter, payload_codec=...)`` and pass it here.
66+
Mutually exclusive with ``payload_codec``. The worker side must
67+
be configured separately via ``AgentexWorker(data_converter=...)``
68+
with the SAME converter, or decode will fail at runtime.
6069
"""
6170

6271
type: Literal["temporal"] = Field(default="temporal", frozen=True)
6372
temporal_address: str = Field(default="temporal-frontend.temporal.svc.cluster.local:7233", frozen=True)
6473
plugins: list[Any] = Field(default=[], frozen=True)
6574
interceptors: list[Any] = Field(default=[], frozen=True)
6675
payload_codec: Any = Field(default=None, frozen=True)
76+
data_converter: Any = Field(default=None, frozen=True)
6777

6878
@field_validator("plugins")
6979
@classmethod

0 commit comments

Comments
 (0)