Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e3a93a4
Initial version
donald-pinckney Jan 14, 2026
3295876
remove discovery, update notes
donald-pinckney Jan 14, 2026
873867d
remove notes
donald-pinckney Jan 14, 2026
bd48697
Add initializeWorker in plugin interface.
donald-pinckney Jan 14, 2026
25e9e07
Re-do shutdownWorkerFactory design with chain
donald-pinckney Jan 14, 2026
06f5fd2
rename runWorkerFactory -> startWorkerFactory
donald-pinckney Jan 14, 2026
2db0acc
Move plugins around
donald-pinckney Jan 14, 2026
8e161ec
add startWorker and shutdownWorker
donald-pinckney Jan 14, 2026
e6b4a8d
rename again
donald-pinckney Jan 15, 2026
f36ac27
Improved SimplePlugin builder design
donald-pinckney Jan 15, 2026
d7a7254
Remove default implementations, and add in startWorkerFactory and shu…
donald-pinckney Jan 15, 2026
db499d6
Don't return the builders
donald-pinckney Jan 15, 2026
0414913
Remove ServiceStubsSupplier and checked exception
donald-pinckney Jan 15, 2026
377d00d
Cleanup applyClientPluginConfiguration
donald-pinckney Jan 15, 2026
d7f9bd3
array instead of list
donald-pinckney Jan 15, 2026
b2957dd
Require ClientPlugin in WorkflowClientOptions
donald-pinckney Jan 15, 2026
7465568
Seaparate out WorkflowClientPlugin and WorkflowServiceStubsPlugin
donald-pinckney Jan 16, 2026
c9f70eb
Lift up to a generic ServiceStubsPlugin
donald-pinckney Jan 16, 2026
f35439f
Add ScheduleClientPlugin, and rename WorkflowClientPlugin.configureCl…
donald-pinckney Jan 16, 2026
219f0cd
Remove ServiceStubsPlugin super interface
donald-pinckney Jan 16, 2026
cfbe494
Document order of validation and plugin application
donald-pinckney Jan 16, 2026
8a5d85f
abstract SimplePlugin
donald-pinckney Jan 16, 2026
19fb453
Add data converters, activities, workflows, Nexus services
donald-pinckney Jan 16, 2026
1fc6deb
bug fix
donald-pinckney Jan 16, 2026
715085e
Add duplicate warnings
donald-pinckney Jan 16, 2026
16a89f5
cleanup tests some
donald-pinckney Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.temporal.internal.sync.StubMarker;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.*;
import java.lang.annotation.Annotation;
Expand All @@ -36,9 +37,13 @@
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClientInternal {

private static final Logger log = LoggerFactory.getLogger(WorkflowClientInternalImpl.class);

private final GenericWorkflowClient genericClient;
private final WorkflowClientOptions options;
private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory;
Expand All @@ -65,7 +70,18 @@ public static WorkflowClient newInstance(

WorkflowClientInternalImpl(
WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) {
options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults();
// Extract WorkflowClientPlugins from service stubs plugins (propagation)
WorkflowClientPlugin[] propagatedPlugins =
extractClientPlugins(workflowServiceStubs.getOptions().getPlugins());

// Merge propagated plugins with client-specified plugins
WorkflowClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins());

// Apply plugin configuration phase (forward order), then validate
WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(options);
builder.setPlugins(mergedPlugins);
applyClientPluginConfiguration(builder, mergedPlugins);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cretz One subtle thing here with plugin propagation is what the plugins see, for which plugins are set on the builder when plugin.configureWorkflowClient(builder) is called on the plugin.

In this Java implementation, the plugins will all see the mergedPlugins (propagated + explicit).

In Python, the plugins only see the plugins that are in the explicit config, and not the propagated ones:

plugins_from_client = cast(
            list[Plugin],
            [p for p in client.config()["plugins"] if isinstance(p, Plugin)],
        )
        for client_plugin in plugins_from_client:
            if type(client_plugin) in [type(p) for p in plugins]:
                warnings.warn(
                    f"The same plugin type {type(client_plugin)} is present from both client and worker. It may run twice and may not be the intended behavior."
                )
        plugins = plugins_from_client + list(plugins)
        self._initial_config = config.copy()

        self._plugins = plugins
        for plugin in plugins:
            config = plugin.configure_worker(config)

(that code is for _worker.py, but point still stands).

Do you have an intuition for which is the correct behavior and why? I would think that the Java behavior is more natural.

options = builder.validateAndBuildWithDefaults();
workflowServiceStubs =
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
this.options = options;
Expand Down Expand Up @@ -771,4 +787,73 @@ public NexusStartWorkflowResponse startNexus(
WorkflowInvocationHandler.closeAsyncInvocation();
}
}

/**
* Applies workflow client plugin configuration phase. Plugins are called in forward
* (registration) order to modify the client options.
*/
private static void applyClientPluginConfiguration(
WorkflowClientOptions.Builder builder, WorkflowClientPlugin[] plugins) {
if (plugins == null || plugins.length == 0) {
return;
}
for (WorkflowClientPlugin plugin : plugins) {
plugin.configureWorkflowClient(builder);
}
}

/**
* Extracts WorkflowClientPlugins from service stubs plugins. Only plugins that also implement
* {@link WorkflowClientPlugin} are included. This enables plugin propagation from service stubs
* to workflow client.
*/
private static WorkflowClientPlugin[] extractClientPlugins(
WorkflowServiceStubsPlugin[] stubsPlugins) {
if (stubsPlugins == null || stubsPlugins.length == 0) {
return new WorkflowClientPlugin[0];
}
List<WorkflowClientPlugin> clientPlugins = new ArrayList<>();
for (WorkflowServiceStubsPlugin plugin : stubsPlugins) {
if (plugin instanceof WorkflowClientPlugin) {
clientPlugins.add((WorkflowClientPlugin) plugin);
}
}
return clientPlugins.toArray(new WorkflowClientPlugin[0]);
}

/**
* Merges propagated plugins with explicitly specified plugins. Propagated plugins come first
* (from service stubs), followed by client-specific plugins.
*/
private static WorkflowClientPlugin[] mergePlugins(
WorkflowClientPlugin[] propagated, WorkflowClientPlugin[] explicit) {
boolean propagatedEmpty = propagated == null || propagated.length == 0;
boolean explicitEmpty = explicit == null || explicit.length == 0;
if (propagatedEmpty && explicitEmpty) {
return new WorkflowClientPlugin[0];
}
if (propagatedEmpty) {
return explicit;
}
if (explicitEmpty) {
return propagated;
}
// Warn about duplicate plugin types
Set<Class<?>> propagatedTypes = new HashSet<>();
for (WorkflowClientPlugin p : propagated) {
propagatedTypes.add(p.getClass());
}
for (WorkflowClientPlugin p : explicit) {
if (propagatedTypes.contains(p.getClass())) {
log.warn(
"Plugin type {} is present in both propagated plugins (from service stubs) and "
+ "explicit plugins. It may run twice which may not be the intended behavior.",
p.getClass().getName());
}
}
WorkflowClientPlugin[] merged = new WorkflowClientPlugin[propagated.length + explicit.length];
System.arraycopy(propagated, 0, merged, 0, propagated.length);
System.arraycopy(explicit, 0, merged, propagated.length, explicit.length);
return merged;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.client;

import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
Expand Down Expand Up @@ -47,6 +48,7 @@ public static final class Builder {
private String binaryChecksum;
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private WorkflowClientPlugin[] plugins;

private Builder() {}

Expand All @@ -61,6 +63,7 @@ private Builder(WorkflowClientOptions options) {
binaryChecksum = options.binaryChecksum;
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -132,6 +135,24 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition
return this;
}

/**
* Sets the workflow client plugins to use with this client. Plugins can modify client
* configuration.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated to workers created from this client.
*
* @param plugins the workflow client plugins to use
* @return this builder for chaining
* @see WorkflowClientPlugin
* @see io.temporal.worker.WorkerPlugin
*/
@Experimental
public Builder setPlugins(WorkflowClientPlugin... plugins) {
this.plugins = Objects.requireNonNull(plugins);
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -140,9 +161,21 @@ public WorkflowClientOptions build() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
}

/**
* Validates options and builds with defaults applied.
*
* <p>Note: If plugins are configured via {@link #setPlugins(WorkflowClientPlugin...)}, they
* will have an opportunity to modify options after this method is called, when the options are
* passed to {@link WorkflowClient#newInstance}. This means validation performed here occurs
* before plugin modifications. In most cases, users should simply call {@link #build()} and let
* the client creation handle validation.
*
* @return validated options with defaults applied
*/
public WorkflowClientOptions validateAndBuildWithDefaults() {
String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity;
return new WorkflowClientOptions(
Expand All @@ -154,7 +187,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition);
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
}
}

Expand All @@ -163,6 +197,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS = Collections.emptyList();

private static final WorkflowClientPlugin[] EMPTY_PLUGINS = new WorkflowClientPlugin[0];

private final String namespace;

private final DataConverter dataConverter;
Expand All @@ -177,21 +213,25 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final QueryRejectCondition queryRejectCondition;

private final WorkflowClientPlugin[] plugins;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
WorkflowClientInterceptor[] interceptors,
String identity,
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition) {
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
}

/**
Expand Down Expand Up @@ -236,6 +276,19 @@ public QueryRejectCondition getQueryRejectCondition() {
return queryRejectCondition;
}

/**
* Returns the workflow client plugins configured for this client.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated to workers created from this client.
*
* @return the array of workflow client plugins, never null
*/
@Experimental
public WorkflowClientPlugin[] getPlugins() {
return plugins;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -256,6 +309,8 @@ public String toString() {
+ contextPropagators
+ ", queryRejectCondition="
+ queryRejectCondition
+ ", plugins="
+ Arrays.toString(plugins)
+ '}';
}

Expand All @@ -270,7 +325,8 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition;
&& queryRejectCondition == that.queryRejectCondition
&& Arrays.equals(plugins, that.plugins);
}

@Override
Expand All @@ -282,6 +338,7 @@ public int hashCode() {
identity,
binaryChecksum,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
Arrays.hashCode(plugins));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import io.temporal.common.Experimental;
import io.temporal.common.SimplePlugin;
import javax.annotation.Nonnull;

/**
* Plugin interface for customizing Temporal workflow client configuration.
*
* <p>This interface is separate from {@link
* io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin} to allow plugins that only
* need to configure the workflow client without affecting the underlying gRPC connection.
*
* <p>Plugins that implement both {@code ServiceStubsPlugin} and {@code WorkflowClientPlugin} will
* have their service stubs configuration applied when creating the service stubs, and their client
* configuration applied when creating the workflow client.
*
* <p>Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically
* propagated from the client to workers created from that client.
*
* <p>Example implementation:
*
* <pre>{@code
* public class LoggingPlugin extends SimplePlugin {
* public LoggingPlugin() {
* super("my-org.logging");
* }
*
* @Override
* public void configureClient(WorkflowClientOptions.Builder builder) {
* // Add custom interceptor
* builder.setInterceptors(new LoggingInterceptor());
* }
* }
* }</pre>
*
* @see io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin
* @see io.temporal.worker.WorkerPlugin
* @see SimplePlugin
*/
@Experimental
public interface WorkflowClientPlugin {

/**
* Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended
* format: "organization.plugin-name" (e.g., "io.temporal.tracing")
*
* @return fully qualified plugin name
*/
@Nonnull
String getName();

/**
* Allows the plugin to modify workflow client options before the client is created. Called during
* configuration phase in forward (registration) order.
*
* @param builder the options builder to modify
*/
void configureWorkflowClient(@Nonnull WorkflowClientOptions.Builder builder);
}
Loading
Loading