diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index 9b9a36897c..489bab71c3 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -24,6 +24,7 @@ import io.temporal.internal.sync.StubMarker; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsPlugin; import io.temporal.worker.WorkerFactory; import io.temporal.workflow.*; import java.lang.annotation.Annotation; @@ -36,9 +37,13 @@ import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClientInternal { + private static final Logger log = LoggerFactory.getLogger(WorkflowClientInternalImpl.class); + private final GenericWorkflowClient genericClient; private final WorkflowClientOptions options; private final ManualActivityCompletionClientFactory manualActivityCompletionClientFactory; @@ -65,7 +70,18 @@ public static WorkflowClient newInstance( WorkflowClientInternalImpl( WorkflowServiceStubs workflowServiceStubs, WorkflowClientOptions options) { - options = WorkflowClientOptions.newBuilder(options).validateAndBuildWithDefaults(); + // Extract WorkflowClientPlugins from service stubs plugins (propagation) + WorkflowClientPlugin[] propagatedPlugins = + extractClientPlugins(workflowServiceStubs.getOptions().getPlugins()); + + // Merge propagated plugins with client-specified plugins + WorkflowClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins()); + + // Apply plugin configuration phase (forward order), then validate + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(options); + builder.setPlugins(mergedPlugins); + applyClientPluginConfiguration(builder, mergedPlugins); + options = builder.validateAndBuildWithDefaults(); workflowServiceStubs = new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); this.options = options; @@ -771,4 +787,70 @@ public NexusStartWorkflowResponse startNexus( WorkflowInvocationHandler.closeAsyncInvocation(); } } + + /** + * Applies workflow client plugin configuration phase. Plugins are called in forward + * (registration) order to modify the client options. + */ + private static void applyClientPluginConfiguration( + WorkflowClientOptions.Builder builder, WorkflowClientPlugin[] plugins) { + if (plugins == null || plugins.length == 0) { + return; + } + for (WorkflowClientPlugin plugin : plugins) { + plugin.configureWorkflowClient(builder); + } + } + + /** + * Extracts WorkflowClientPlugins from service stubs plugins. Only plugins that also implement + * {@link WorkflowClientPlugin} are included. This enables plugin propagation from service stubs + * to workflow client. + */ + private static WorkflowClientPlugin[] extractClientPlugins( + WorkflowServiceStubsPlugin[] stubsPlugins) { + if (stubsPlugins == null || stubsPlugins.length == 0) { + return new WorkflowClientPlugin[0]; + } + List clientPlugins = new ArrayList<>(); + for (WorkflowServiceStubsPlugin plugin : stubsPlugins) { + if (plugin instanceof WorkflowClientPlugin) { + clientPlugins.add((WorkflowClientPlugin) plugin); + } + } + return clientPlugins.toArray(new WorkflowClientPlugin[0]); + } + + /** + * Merges propagated plugins with explicitly specified plugins. Propagated plugins come first + * (from service stubs), followed by client-specific plugins. + */ + private static WorkflowClientPlugin[] mergePlugins( + WorkflowClientPlugin[] propagated, WorkflowClientPlugin[] explicit) { + boolean propagatedEmpty = propagated == null || propagated.length == 0; + boolean explicitEmpty = explicit == null || explicit.length == 0; + if (propagatedEmpty && explicitEmpty) { + return new WorkflowClientPlugin[0]; + } + if (propagatedEmpty) { + return explicit; + } + if (explicitEmpty) { + return propagated; + } + // Warn about duplicate plugin instances (same object in both lists) + Set propagatedSet = new HashSet<>(Arrays.asList(propagated)); + for (WorkflowClientPlugin p : explicit) { + if (propagatedSet.contains(p)) { + log.warn( + "Plugin instance {} is present in both propagated plugins (from service stubs) and " + + "explicit plugins. It will run twice which may not be the intended behavior.", + p.getName()); + } + } + WorkflowClientPlugin[] merged = new WorkflowClientPlugin[propagated.length + explicit.length]; + System.arraycopy(propagated, 0, merged, 0, propagated.length); + System.arraycopy(explicit, 0, merged, propagated.length, explicit.length); + return merged; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java index 944f395a48..ca67852751 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java @@ -1,6 +1,7 @@ package io.temporal.client; import io.temporal.api.enums.v1.QueryRejectCondition; +import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.GlobalDataConverter; @@ -47,6 +48,7 @@ public static final class Builder { private String binaryChecksum; private List contextPropagators; private QueryRejectCondition queryRejectCondition; + private WorkflowClientPlugin[] plugins; private Builder() {} @@ -61,6 +63,7 @@ private Builder(WorkflowClientOptions options) { binaryChecksum = options.binaryChecksum; contextPropagators = options.contextPropagators; queryRejectCondition = options.queryRejectCondition; + plugins = options.plugins; } public Builder setNamespace(String namespace) { @@ -132,6 +135,24 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition return this; } + /** + * Sets the workflow client plugins to use with this client. Plugins can modify client + * configuration. + * + *

Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically + * propagated to workers created from this client. + * + * @param plugins the workflow client plugins to use + * @return this builder for chaining + * @see WorkflowClientPlugin + * @see io.temporal.worker.WorkerPlugin + */ + @Experimental + public Builder setPlugins(WorkflowClientPlugin... plugins) { + this.plugins = Objects.requireNonNull(plugins); + return this; + } + public WorkflowClientOptions build() { return new WorkflowClientOptions( namespace, @@ -140,9 +161,21 @@ public WorkflowClientOptions build() { identity, binaryChecksum, contextPropagators, - queryRejectCondition); + queryRejectCondition, + plugins == null ? EMPTY_PLUGINS : plugins); } + /** + * Validates options and builds with defaults applied. + * + *

Note: If plugins are configured via {@link #setPlugins(WorkflowClientPlugin...)}, they + * will have an opportunity to modify options after this method is called, when the options are + * passed to {@link WorkflowClient#newInstance}. This means validation performed here occurs + * before plugin modifications. In most cases, users should simply call {@link #build()} and let + * the client creation handle validation. + * + * @return validated options with defaults applied + */ public WorkflowClientOptions validateAndBuildWithDefaults() { String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity; return new WorkflowClientOptions( @@ -154,7 +187,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators, queryRejectCondition == null ? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED - : queryRejectCondition); + : queryRejectCondition, + plugins == null ? EMPTY_PLUGINS : plugins); } } @@ -163,6 +197,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private static final List EMPTY_CONTEXT_PROPAGATORS = Collections.emptyList(); + private static final WorkflowClientPlugin[] EMPTY_PLUGINS = new WorkflowClientPlugin[0]; + private final String namespace; private final DataConverter dataConverter; @@ -177,6 +213,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private final QueryRejectCondition queryRejectCondition; + private final WorkflowClientPlugin[] plugins; + private WorkflowClientOptions( String namespace, DataConverter dataConverter, @@ -184,7 +222,8 @@ private WorkflowClientOptions( String identity, String binaryChecksum, List contextPropagators, - QueryRejectCondition queryRejectCondition) { + QueryRejectCondition queryRejectCondition, + WorkflowClientPlugin[] plugins) { this.namespace = namespace; this.dataConverter = dataConverter; this.interceptors = interceptors; @@ -192,6 +231,7 @@ private WorkflowClientOptions( this.binaryChecksum = binaryChecksum; this.contextPropagators = contextPropagators; this.queryRejectCondition = queryRejectCondition; + this.plugins = plugins; } /** @@ -236,6 +276,19 @@ public QueryRejectCondition getQueryRejectCondition() { return queryRejectCondition; } + /** + * Returns the workflow client plugins configured for this client. + * + *

Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically + * propagated to workers created from this client. + * + * @return the array of workflow client plugins, never null + */ + @Experimental + public WorkflowClientPlugin[] getPlugins() { + return plugins; + } + @Override public String toString() { return "WorkflowClientOptions{" @@ -256,6 +309,8 @@ public String toString() { + contextPropagators + ", queryRejectCondition=" + queryRejectCondition + + ", plugins=" + + Arrays.toString(plugins) + '}'; } @@ -270,7 +325,8 @@ public boolean equals(Object o) { && com.google.common.base.Objects.equal(identity, that.identity) && com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum) && com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators) - && queryRejectCondition == that.queryRejectCondition; + && queryRejectCondition == that.queryRejectCondition + && Arrays.equals(plugins, that.plugins); } @Override @@ -282,6 +338,7 @@ public int hashCode() { identity, binaryChecksum, contextPropagators, - queryRejectCondition); + queryRejectCondition, + Arrays.hashCode(plugins)); } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientPlugin.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientPlugin.java new file mode 100644 index 0000000000..702fa141bf --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientPlugin.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.common.SimplePlugin; +import javax.annotation.Nonnull; + +/** + * Plugin interface for customizing Temporal workflow client configuration. + * + *

This interface is separate from {@link + * io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin} to allow plugins that only + * need to configure the workflow client without affecting the underlying gRPC connection. + * + *

Plugins that implement both {@code ServiceStubsPlugin} and {@code WorkflowClientPlugin} will + * have their service stubs configuration applied when creating the service stubs, and their client + * configuration applied when creating the workflow client. + * + *

Plugins that also implement {@link io.temporal.worker.WorkerPlugin} are automatically + * propagated from the client to workers created from that client. + * + *

Example implementation: + * + *

{@code
+ * public class LoggingPlugin extends SimplePlugin {
+ *     public LoggingPlugin() {
+ *         super("my-org.logging");
+ *     }
+ *
+ *     @Override
+ *     public void configureClient(WorkflowClientOptions.Builder builder) {
+ *         // Add custom interceptor
+ *         builder.setInterceptors(new LoggingInterceptor());
+ *     }
+ * }
+ * }
+ * + * @see io.temporal.serviceclient.WorkflowServiceStubs.ServiceStubsPlugin + * @see io.temporal.worker.WorkerPlugin + * @see SimplePlugin + */ +@Experimental +public interface WorkflowClientPlugin { + + /** + * Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended + * format: "organization.plugin-name" (e.g., "io.temporal.tracing") + * + * @return fully qualified plugin name + */ + @Nonnull + String getName(); + + /** + * Allows the plugin to modify workflow client options before the client is created. Called during + * configuration phase in forward (registration) order. + * + * @param builder the options builder to modify + */ + void configureWorkflowClient(@Nonnull WorkflowClientOptions.Builder builder); +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java index 5ecc273313..8784cf6a95 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientImpl.java @@ -12,11 +12,20 @@ import io.temporal.internal.client.external.GenericWorkflowClientImpl; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsPlugin; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Stream; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class ScheduleClientImpl implements ScheduleClient { + + private static final Logger log = LoggerFactory.getLogger(ScheduleClientImpl.class); private final WorkflowServiceStubs workflowServiceStubs; private final ScheduleClientOptions options; private final GenericWorkflowClient genericClient; @@ -41,6 +50,19 @@ public static ScheduleClient newInstance( } ScheduleClientImpl(WorkflowServiceStubs workflowServiceStubs, ScheduleClientOptions options) { + // Extract ScheduleClientPlugins from service stubs plugins (propagation) + ScheduleClientPlugin[] propagatedPlugins = + extractScheduleClientPlugins(workflowServiceStubs.getOptions().getPlugins()); + + // Merge propagated plugins with schedule client-specified plugins + ScheduleClientPlugin[] mergedPlugins = mergePlugins(propagatedPlugins, options.getPlugins()); + + // Apply plugin configuration phase (forward order) + ScheduleClientOptions.Builder builder = ScheduleClientOptions.newBuilder(options); + builder.setPlugins(mergedPlugins); + applyPluginConfiguration(builder, mergedPlugins); + options = builder.build(); + workflowServiceStubs = new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); this.workflowServiceStubs = workflowServiceStubs; @@ -55,6 +77,58 @@ public static ScheduleClient newInstance( this.scheduleClientCallsInvoker = initializeClientInvoker(); } + private static ScheduleClientPlugin[] extractScheduleClientPlugins( + WorkflowServiceStubsPlugin[] stubsPlugins) { + if (stubsPlugins == null || stubsPlugins.length == 0) { + return new ScheduleClientPlugin[0]; + } + List schedulePlugins = new ArrayList<>(); + for (WorkflowServiceStubsPlugin plugin : stubsPlugins) { + if (plugin instanceof ScheduleClientPlugin) { + schedulePlugins.add((ScheduleClientPlugin) plugin); + } + } + return schedulePlugins.toArray(new ScheduleClientPlugin[0]); + } + + private static ScheduleClientPlugin[] mergePlugins( + ScheduleClientPlugin[] propagated, ScheduleClientPlugin[] explicit) { + if ((propagated == null || propagated.length == 0) + && (explicit == null || explicit.length == 0)) { + return new ScheduleClientPlugin[0]; + } + // Warn about duplicate plugin instances (same object in both lists) + if (propagated != null && propagated.length > 0 && explicit != null && explicit.length > 0) { + Set propagatedSet = new HashSet<>(Arrays.asList(propagated)); + for (ScheduleClientPlugin p : explicit) { + if (propagatedSet.contains(p)) { + log.warn( + "Plugin instance {} is present in both propagated plugins (from service stubs) and " + + "explicit plugins. It will run twice which may not be the intended behavior.", + p.getName()); + } + } + } + List merged = new ArrayList<>(); + if (propagated != null) { + merged.addAll(Arrays.asList(propagated)); + } + if (explicit != null) { + merged.addAll(Arrays.asList(explicit)); + } + return merged.toArray(new ScheduleClientPlugin[0]); + } + + private static void applyPluginConfiguration( + ScheduleClientOptions.Builder builder, ScheduleClientPlugin[] plugins) { + if (plugins == null) { + return; + } + for (ScheduleClientPlugin plugin : plugins) { + plugin.configureScheduleClient(builder); + } + } + private ScheduleClientCallsInterceptor initializeClientInvoker() { ScheduleClientCallsInterceptor scheduleClientInvoker = new RootScheduleClientInvoker(genericClient, options); diff --git a/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientOptions.java index 9f830eb63a..775ad59fcf 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientOptions.java @@ -1,10 +1,12 @@ package io.temporal.client.schedules; +import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.GlobalDataConverter; import io.temporal.common.interceptors.ScheduleClientInterceptor; import java.lang.management.ManagementFactory; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -39,12 +41,14 @@ public static final class Builder { Collections.emptyList(); private static final List EMPTY_INTERCEPTORS = Collections.emptyList(); + private static final ScheduleClientPlugin[] EMPTY_PLUGINS = new ScheduleClientPlugin[0]; private String namespace; private DataConverter dataConverter; private String identity; private List contextPropagators; private List interceptors; + private ScheduleClientPlugin[] plugins; private Builder() {} @@ -57,6 +61,7 @@ private Builder(ScheduleClientOptions options) { identity = options.identity; contextPropagators = options.contextPropagators; interceptors = options.interceptors; + plugins = options.plugins; } /** Set the namespace this client will operate on. */ @@ -101,6 +106,17 @@ public Builder setInterceptors(List interceptors) { return this; } + /** + * Set the plugins for this client. + * + * @param plugins specifies the plugins to use with the client. + */ + @Experimental + public Builder setPlugins(ScheduleClientPlugin... plugins) { + this.plugins = plugins; + return this; + } + public ScheduleClientOptions build() { String name = identity == null ? ManagementFactory.getRuntimeMXBean().getName() : identity; return new ScheduleClientOptions( @@ -108,7 +124,8 @@ public ScheduleClientOptions build() { dataConverter == null ? GlobalDataConverter.get() : dataConverter, name, contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators, - interceptors == null ? EMPTY_INTERCEPTORS : interceptors); + interceptors == null ? EMPTY_INTERCEPTORS : interceptors, + plugins == null ? EMPTY_PLUGINS : plugins); } } @@ -117,18 +134,21 @@ public ScheduleClientOptions build() { private final String identity; private final List contextPropagators; private final List interceptors; + private final ScheduleClientPlugin[] plugins; private ScheduleClientOptions( String namespace, DataConverter dataConverter, String identity, List contextPropagators, - List interceptors) { + List interceptors, + ScheduleClientPlugin[] plugins) { this.namespace = namespace; this.dataConverter = dataConverter; this.identity = identity; this.contextPropagators = contextPropagators; this.interceptors = interceptors; + this.plugins = plugins; } /** @@ -175,4 +195,14 @@ public List getContextPropagators() { public List getInterceptors() { return interceptors; } + + /** + * Get the plugins of this client + * + * @return The plugins to use with the client. + */ + @Experimental + public ScheduleClientPlugin[] getPlugins() { + return plugins == null ? new ScheduleClientPlugin[0] : Arrays.copyOf(plugins, plugins.length); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientPlugin.java b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientPlugin.java new file mode 100644 index 0000000000..a97bdc365e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/schedules/ScheduleClientPlugin.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client.schedules; + +import io.temporal.common.Experimental; +import javax.annotation.Nonnull; + +/** + * Plugin interface for customizing Temporal schedule client configuration. + * + *

Plugins that implement both {@link io.temporal.serviceclient.WorkflowServiceStubsPlugin} and + * {@code ScheduleClientPlugin} are automatically propagated from the service stubs to the schedule + * client. + * + * @see io.temporal.serviceclient.WorkflowServiceStubsPlugin + */ +@Experimental +public interface ScheduleClientPlugin { + + /** + * Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended + * format: "organization.plugin-name" (e.g., "io.temporal.tracing") + * + * @return fully qualified plugin name + */ + @Nonnull + String getName(); + + /** + * Allows the plugin to modify schedule client options before the client is created. Called during + * configuration phase in forward (registration) order. + * + * @param builder the options builder to modify + */ + void configureScheduleClient(@Nonnull ScheduleClientOptions.Builder builder); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java b/temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java new file mode 100644 index 0000000000..369bc0270a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/SimplePlugin.java @@ -0,0 +1,789 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowClientPlugin; +import io.temporal.client.schedules.ScheduleClientOptions; +import io.temporal.client.schedules.ScheduleClientPlugin; +import io.temporal.common.context.ContextPropagator; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; +import io.temporal.common.interceptors.WorkflowClientInterceptor; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.serviceclient.WorkflowServiceStubsPlugin; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkerPlugin; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +/** + * A plugin that implements {@link WorkflowServiceStubsPlugin}, {@link WorkflowClientPlugin}, {@link + * ScheduleClientPlugin}, and {@link WorkerPlugin}. This class can be used in two ways: + * + *

    + *
  1. Builder pattern: Use {@link #newBuilder(String)} to declaratively configure a plugin + * with callbacks + *
  2. Subclassing: Extend this class and override specific methods for custom behavior + *
+ * + *

Builder Pattern Example

+ * + *
{@code
+ * SimplePlugin myPlugin = SimplePlugin.newBuilder("my-plugin")
+ *     .addWorkerInterceptors(new TracingInterceptor())
+ *     .addClientInterceptors(new LoggingInterceptor())
+ *     .customizeClient(b -> b.setIdentity("custom-identity"))
+ *     .build();
+ * }
+ * + *

Subclassing Example

+ * + *
{@code
+ * public class TracingPlugin extends SimplePlugin {
+ *     private final Tracer tracer;
+ *
+ *     public TracingPlugin(Tracer tracer) {
+ *         super("io.temporal.tracing");
+ *         this.tracer = tracer;
+ *     }
+ *
+ *     @Override
+ *     public void configureWorkflowClient(WorkflowClientOptions.Builder builder) {
+ *         builder.setInterceptors(new TracingClientInterceptor(tracer));
+ *     }
+ * }
+ * }
+ * + *

Hybrid Example (Builder + Override)

+ * + *
{@code
+ * public class HybridPlugin extends SimplePlugin {
+ *     public HybridPlugin() {
+ *         super(SimplePlugin.newBuilder("hybrid")
+ *             .addClientInterceptors(new LoggingInterceptor()));
+ *     }
+ *
+ *     @Override
+ *     public void initializeWorker(String taskQueue, Worker worker) {
+ *         worker.registerWorkflowImplementationTypes(MyWorkflow.class);
+ *     }
+ * }
+ * }
+ * + * @see WorkflowServiceStubsPlugin + * @see WorkflowClientPlugin + * @see ScheduleClientPlugin + * @see WorkerPlugin + */ +@Experimental +public abstract class SimplePlugin + implements WorkflowServiceStubsPlugin, + WorkflowClientPlugin, + ScheduleClientPlugin, + WorkerPlugin { + + private final String name; + private final List> stubsCustomizers; + private final List> clientCustomizers; + private final List> scheduleCustomizers; + private final List> factoryCustomizers; + private final List> workerCustomizers; + private final List> workerInitializers; + private final List> workerStartCallbacks; + private final List> workerShutdownCallbacks; + private final List> workerFactoryStartCallbacks; + private final List> workerFactoryShutdownCallbacks; + private final List> replayWorkerCustomizers; + private final List> replayWorkerInitializers; + private final List> replayExecutionCallbacks; + private final List workerInterceptors; + private final List clientInterceptors; + private final List contextPropagators; + private final DataConverter dataConverter; + private final List> workflowImplementationTypes; + private final List activitiesImplementations; + private final List nexusServiceImplementations; + + /** + * Creates a new plugin with the specified name. Use this constructor when subclassing to override + * specific methods. + * + * @param name a unique name for this plugin, used for logging and duplicate detection. + * Recommended format: "organization.plugin-name" (e.g., "io.temporal.tracing") + * @throws NullPointerException if name is null + */ + protected SimplePlugin(@Nonnull String name) { + this.name = Objects.requireNonNull(name, "Plugin name cannot be null"); + this.stubsCustomizers = Collections.emptyList(); + this.clientCustomizers = Collections.emptyList(); + this.scheduleCustomizers = Collections.emptyList(); + this.factoryCustomizers = Collections.emptyList(); + this.workerCustomizers = Collections.emptyList(); + this.workerInitializers = Collections.emptyList(); + this.workerStartCallbacks = Collections.emptyList(); + this.workerShutdownCallbacks = Collections.emptyList(); + this.workerFactoryStartCallbacks = Collections.emptyList(); + this.workerFactoryShutdownCallbacks = Collections.emptyList(); + this.replayWorkerCustomizers = Collections.emptyList(); + this.replayWorkerInitializers = Collections.emptyList(); + this.replayExecutionCallbacks = Collections.emptyList(); + this.workerInterceptors = Collections.emptyList(); + this.clientInterceptors = Collections.emptyList(); + this.contextPropagators = Collections.emptyList(); + this.dataConverter = null; + this.workflowImplementationTypes = Collections.emptyList(); + this.activitiesImplementations = Collections.emptyList(); + this.nexusServiceImplementations = Collections.emptyList(); + } + + /** + * Creates a new plugin from a builder. Use this constructor when subclassing to combine builder + * configuration with method overrides. + * + * @param builder the builder with configuration + * @throws NullPointerException if builder is null + */ + protected SimplePlugin(@Nonnull Builder builder) { + Objects.requireNonNull(builder, "Builder cannot be null"); + this.name = builder.name; + this.stubsCustomizers = new ArrayList<>(builder.stubsCustomizers); + this.clientCustomizers = new ArrayList<>(builder.clientCustomizers); + this.scheduleCustomizers = new ArrayList<>(builder.scheduleCustomizers); + this.factoryCustomizers = new ArrayList<>(builder.factoryCustomizers); + this.workerCustomizers = new ArrayList<>(builder.workerCustomizers); + this.workerInitializers = new ArrayList<>(builder.workerInitializers); + this.workerStartCallbacks = new ArrayList<>(builder.workerStartCallbacks); + this.workerShutdownCallbacks = new ArrayList<>(builder.workerShutdownCallbacks); + this.workerFactoryStartCallbacks = new ArrayList<>(builder.workerFactoryStartCallbacks); + this.workerFactoryShutdownCallbacks = new ArrayList<>(builder.workerFactoryShutdownCallbacks); + this.replayWorkerCustomizers = new ArrayList<>(builder.replayWorkerCustomizers); + this.replayWorkerInitializers = new ArrayList<>(builder.replayWorkerInitializers); + this.replayExecutionCallbacks = new ArrayList<>(builder.replayExecutionCallbacks); + this.workerInterceptors = new ArrayList<>(builder.workerInterceptors); + this.clientInterceptors = new ArrayList<>(builder.clientInterceptors); + this.contextPropagators = new ArrayList<>(builder.contextPropagators); + this.dataConverter = builder.dataConverter; + this.workflowImplementationTypes = new ArrayList<>(builder.workflowImplementationTypes); + this.activitiesImplementations = new ArrayList<>(builder.activitiesImplementations); + this.nexusServiceImplementations = new ArrayList<>(builder.nexusServiceImplementations); + } + + /** + * Creates a new builder with the specified plugin name. + * + * @param name a unique name for the plugin, used for logging and duplicate detection. Recommended + * format: "organization.plugin-name" (e.g., "my-org.tracing") + * @return a new builder instance + */ + public static Builder newBuilder(@Nonnull String name) { + return new Builder(name); + } + + @Override + @Nonnull + public String getName() { + return name; + } + + @Override + public void configureServiceStubs(@Nonnull WorkflowServiceStubsOptions.Builder builder) { + for (Consumer customizer : stubsCustomizers) { + customizer.accept(builder); + } + } + + @Override + public void configureWorkflowClient(@Nonnull WorkflowClientOptions.Builder builder) { + // Apply customizers + for (Consumer customizer : clientCustomizers) { + customizer.accept(builder); + } + + // Set data converter + if (dataConverter != null) { + builder.setDataConverter(dataConverter); + } + + // Add client interceptors + if (!clientInterceptors.isEmpty()) { + WorkflowClientInterceptor[] existing = builder.build().getInterceptors(); + List combined = + new ArrayList<>(existing != null ? Arrays.asList(existing) : new ArrayList<>()); + combined.addAll(clientInterceptors); + builder.setInterceptors(combined.toArray(new WorkflowClientInterceptor[0])); + } + + // Add context propagators + if (!contextPropagators.isEmpty()) { + List existing = builder.build().getContextPropagators(); + List combined = + new ArrayList<>(existing != null ? existing : new ArrayList<>()); + combined.addAll(contextPropagators); + builder.setContextPropagators(combined); + } + } + + @Override + public void configureScheduleClient(@Nonnull ScheduleClientOptions.Builder builder) { + // Apply customizers + for (Consumer customizer : scheduleCustomizers) { + customizer.accept(builder); + } + } + + @Override + public void configureWorkerFactory(@Nonnull WorkerFactoryOptions.Builder builder) { + // Apply customizers + for (Consumer customizer : factoryCustomizers) { + customizer.accept(builder); + } + + // Add worker interceptors + if (!workerInterceptors.isEmpty()) { + WorkerInterceptor[] existing = builder.build().getWorkerInterceptors(); + List combined = + new ArrayList<>(existing != null ? Arrays.asList(existing) : new ArrayList<>()); + combined.addAll(workerInterceptors); + builder.setWorkerInterceptors(combined.toArray(new WorkerInterceptor[0])); + } + } + + @Override + public void configureWorker(@Nonnull String taskQueue, @Nonnull WorkerOptions.Builder builder) { + for (Consumer customizer : workerCustomizers) { + customizer.accept(builder); + } + } + + @Override + public void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker) { + // Register workflow implementation types + if (!workflowImplementationTypes.isEmpty()) { + worker.registerWorkflowImplementationTypes( + workflowImplementationTypes.toArray(new Class[0])); + } + + // Register activities implementations + if (!activitiesImplementations.isEmpty()) { + worker.registerActivitiesImplementations(activitiesImplementations.toArray()); + } + + // Register nexus service implementations + for (Object nexusService : nexusServiceImplementations) { + worker.registerNexusServiceImplementation(nexusService); + } + + // Apply custom initializers + for (BiConsumer initializer : workerInitializers) { + initializer.accept(taskQueue, worker); + } + } + + @Override + public void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) + throws Exception { + next.run(); + for (BiConsumer callback : workerStartCallbacks) { + callback.accept(taskQueue, worker); + } + } + + @Override + public void shutdownWorker( + @Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) { + for (BiConsumer callback : workerShutdownCallbacks) { + callback.accept(taskQueue, worker); + } + next.run(); + } + + @Override + public WorkflowServiceStubs connectServiceClient( + WorkflowServiceStubsOptions options, Supplier next) { + return next.get(); + } + + @Override + public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception { + next.run(); + for (Consumer callback : workerFactoryStartCallbacks) { + callback.accept(factory); + } + } + + @Override + public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) throws Exception { + for (Consumer callback : workerFactoryShutdownCallbacks) { + callback.accept(factory); + } + next.run(); + } + + @Override + public void configureReplayWorker( + @Nonnull String taskQueue, @Nonnull WorkerOptions.Builder builder) { + if (replayWorkerCustomizers.isEmpty()) { + // Default: delegate to configureWorker + WorkerPlugin.super.configureReplayWorker(taskQueue, builder); + } else { + for (Consumer customizer : replayWorkerCustomizers) { + customizer.accept(builder); + } + } + } + + @Override + public void initializeReplayWorker(@Nonnull String taskQueue, @Nonnull Worker worker) { + if (replayWorkerInitializers.isEmpty()) { + // Default: delegate to initializeWorker + WorkerPlugin.super.initializeReplayWorker(taskQueue, worker); + } else { + for (BiConsumer initializer : replayWorkerInitializers) { + initializer.accept(taskQueue, worker); + } + } + } + + @Override + public void replayWorkflowExecution( + @Nonnull Worker worker, @Nonnull WorkflowExecutionHistory history, @Nonnull Runnable next) + throws Exception { + next.run(); + for (BiConsumer callback : replayExecutionCallbacks) { + callback.accept(worker, history); + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{name='" + name + "'}"; + } + + /** Builder for creating {@link SimplePlugin} instances with declarative configuration. */ + public static final class Builder { + + private final String name; + private final List> stubsCustomizers = + new ArrayList<>(); + private final List> clientCustomizers = + new ArrayList<>(); + private final List> scheduleCustomizers = + new ArrayList<>(); + private final List> factoryCustomizers = + new ArrayList<>(); + private final List> workerCustomizers = new ArrayList<>(); + private final List> workerInitializers = new ArrayList<>(); + private final List> workerStartCallbacks = new ArrayList<>(); + private final List> workerShutdownCallbacks = new ArrayList<>(); + private final List> workerFactoryStartCallbacks = new ArrayList<>(); + private final List> workerFactoryShutdownCallbacks = new ArrayList<>(); + private final List> replayWorkerCustomizers = new ArrayList<>(); + private final List> replayWorkerInitializers = new ArrayList<>(); + private final List> replayExecutionCallbacks = + new ArrayList<>(); + private final List workerInterceptors = new ArrayList<>(); + private final List clientInterceptors = new ArrayList<>(); + private final List contextPropagators = new ArrayList<>(); + private DataConverter dataConverter; + private final List> workflowImplementationTypes = new ArrayList<>(); + private final List activitiesImplementations = new ArrayList<>(); + private final List nexusServiceImplementations = new ArrayList<>(); + + private Builder(@Nonnull String name) { + this.name = Objects.requireNonNull(name, "Plugin name cannot be null"); + } + + /** + * Adds a customizer for {@link WorkflowServiceStubsOptions}. Multiple customizers are applied + * in the order they are added. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeServiceStubs( + @Nonnull Consumer customizer) { + stubsCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds a customizer for {@link WorkflowClientOptions}. Multiple customizers are applied in the + * order they are added. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeClient(@Nonnull Consumer customizer) { + clientCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds a customizer for {@link ScheduleClientOptions}. Multiple customizers are applied in the + * order they are added. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeScheduleClient( + @Nonnull Consumer customizer) { + scheduleCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds a customizer for {@link WorkerFactoryOptions}. Multiple customizers are applied in the + * order they are added. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeWorkerFactory( + @Nonnull Consumer customizer) { + factoryCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds a customizer for {@link WorkerOptions}. Multiple customizers are applied in the order + * they are added. The customizer is applied to all workers created by the factory. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeWorker(@Nonnull Consumer customizer) { + workerCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds an initializer that is called after a worker is created. This can be used to register + * workflows, activities, and Nexus services on the worker. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .initializeWorker((taskQueue, worker) -> {
+     *         worker.registerWorkflowImplementationTypes(MyWorkflow.class);
+     *         worker.registerActivitiesImplementations(new MyActivityImpl());
+     *     })
+     *     .build();
+     * }
+ * + * @param initializer a consumer that receives the task queue name and worker + * @return this builder for chaining + */ + public Builder initializeWorker(@Nonnull BiConsumer initializer) { + workerInitializers.add(Objects.requireNonNull(initializer)); + return this; + } + + /** + * Adds a callback that is invoked when a worker starts. This can be used to start per-worker + * resources or record metrics. + * + *

Note: For registering workflows and activities, use {@link #initializeWorker} instead, as + * registrations must happen before the worker starts polling. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .onWorkerStart((taskQueue, worker) -> {
+     *         logger.info("Worker started for task queue: {}", taskQueue);
+     *         perWorkerResources.put(taskQueue, new ResourcePool());
+     *     })
+     *     .build();
+     * }
+ * + * @param callback a consumer that receives the task queue name and worker when the worker + * starts + * @return this builder for chaining + */ + public Builder onWorkerStart(@Nonnull BiConsumer callback) { + workerStartCallbacks.add(Objects.requireNonNull(callback)); + return this; + } + + /** + * Adds a callback that is invoked when a worker shuts down. This can be used to clean up + * per-worker resources initialized in {@link #initializeWorker} or {@link #onWorkerStart}. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .onWorkerShutdown((taskQueue, worker) -> {
+     *         logger.info("Worker shutting down for task queue: {}", taskQueue);
+     *         ResourcePool pool = perWorkerResources.remove(taskQueue);
+     *         if (pool != null) {
+     *             pool.close();
+     *         }
+     *     })
+     *     .build();
+     * }
+ * + * @param callback a consumer that receives the task queue name and worker when the worker shuts + * down + * @return this builder for chaining + */ + public Builder onWorkerShutdown(@Nonnull BiConsumer callback) { + workerShutdownCallbacks.add(Objects.requireNonNull(callback)); + return this; + } + + /** + * Adds a callback that is invoked when the worker factory starts. This can be used to + * initialize factory-level resources or record metrics. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .onWorkerFactoryStart(factory -> {
+     *         logger.info("Worker factory started");
+     *         globalResources.initialize();
+     *     })
+     *     .build();
+     * }
+ * + * @param callback a consumer that receives the worker factory when it starts + * @return this builder for chaining + */ + public Builder onWorkerFactoryStart(@Nonnull Consumer callback) { + workerFactoryStartCallbacks.add(Objects.requireNonNull(callback)); + return this; + } + + /** + * Adds a callback that is invoked when the worker factory shuts down. This can be used to clean + * up factory-level resources. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .onWorkerFactoryShutdown(factory -> {
+     *         logger.info("Worker factory shutting down");
+     *         globalResources.cleanup();
+     *     })
+     *     .build();
+     * }
+ * + * @param callback a consumer that receives the worker factory when it shuts down + * @return this builder for chaining + */ + public Builder onWorkerFactoryShutdown(@Nonnull Consumer callback) { + workerFactoryShutdownCallbacks.add(Objects.requireNonNull(callback)); + return this; + } + + // ==================== Replay Methods ==================== + + /** + * Adds a customizer for {@link WorkerOptions} that is applied only when creating replay + * workers. If no replay-specific customizers are set, the regular worker customizers are used. + * + *

Use this when replay workers need different configuration than normal workers. + * + * @param customizer a consumer that modifies the options builder + * @return this builder for chaining + */ + public Builder customizeReplayWorker(@Nonnull Consumer customizer) { + replayWorkerCustomizers.add(Objects.requireNonNull(customizer)); + return this; + } + + /** + * Adds an initializer that is called after a replay worker is created. If no replay-specific + * initializers are set, the regular worker initializers are used. + * + *

Use this when replay workers need different initialization than normal workers. + * + * @param initializer a consumer that receives the task queue name and worker + * @return this builder for chaining + */ + public Builder initializeReplayWorker(@Nonnull BiConsumer initializer) { + replayWorkerInitializers.add(Objects.requireNonNull(initializer)); + return this; + } + + /** + * Adds a callback that is invoked after a workflow execution is replayed. This can be used for + * logging, metrics, or other observability around replay operations. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .onReplayWorkflowExecution((worker, history) -> {
+     *         logger.info("Replayed workflow: {}", history.getWorkflowExecution().getWorkflowId());
+     *     })
+     *     .build();
+     * }
+ * + * @param callback a consumer that receives the worker and history after replay completes + * @return this builder for chaining + */ + public Builder onReplayWorkflowExecution( + @Nonnull BiConsumer callback) { + replayExecutionCallbacks.add(Objects.requireNonNull(callback)); + return this; + } + + /** + * Adds worker interceptors. Interceptors are appended to any existing interceptors in the + * configuration. + * + * @param interceptors the interceptors to add + * @return this builder for chaining + */ + public Builder addWorkerInterceptors(WorkerInterceptor... interceptors) { + workerInterceptors.addAll(Arrays.asList(interceptors)); + return this; + } + + /** + * Adds client interceptors. Interceptors are appended to any existing interceptors in the + * configuration. + * + * @param interceptors the interceptors to add + * @return this builder for chaining + */ + public Builder addClientInterceptors(WorkflowClientInterceptor... interceptors) { + clientInterceptors.addAll(Arrays.asList(interceptors)); + return this; + } + + /** + * Adds context propagators. Propagators are appended to any existing propagators in the + * configuration. + * + * @param propagators the propagators to add + * @return this builder for chaining + */ + public Builder addContextPropagators(ContextPropagator... propagators) { + contextPropagators.addAll(Arrays.asList(propagators)); + return this; + } + + /** + * Sets the data converter to use for serializing workflow and activity arguments and results. + * This overrides any data converter previously set on the client options. + * + * @param dataConverter the data converter to use + * @return this builder for chaining + */ + public Builder setDataConverter(@Nonnull DataConverter dataConverter) { + this.dataConverter = Objects.requireNonNull(dataConverter); + return this; + } + + /** + * Registers workflow implementation types. These workflows will be registered on all workers + * created by the factory. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .registerWorkflowImplementationTypes(MyWorkflowImpl.class, OtherWorkflowImpl.class)
+     *     .build();
+     * }
+ * + * @param workflowImplementationTypes workflow implementation classes to register + * @return this builder for chaining + */ + public Builder registerWorkflowImplementationTypes(Class... workflowImplementationTypes) { + this.workflowImplementationTypes.addAll(Arrays.asList(workflowImplementationTypes)); + return this; + } + + /** + * Registers activity implementations. These activities will be registered on all workers + * created by the factory. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .registerActivitiesImplementations(new MyActivityImpl(), new OtherActivityImpl())
+     *     .build();
+     * }
+ * + * @param activitiesImplementations activity implementation instances to register + * @return this builder for chaining + */ + public Builder registerActivitiesImplementations(Object... activitiesImplementations) { + this.activitiesImplementations.addAll(Arrays.asList(activitiesImplementations)); + return this; + } + + /** + * Registers a Nexus service implementation. The service will be registered on all workers + * created by the factory. + * + *

Example: + * + *

{@code
+     * SimplePlugin.newBuilder("my-plugin")
+     *     .registerNexusServiceImplementation(new MyNexusServiceImpl())
+     *     .build();
+     * }
+ * + * @param nexusServiceImplementation the Nexus service implementation to register + * @return this builder for chaining + */ + public Builder registerNexusServiceImplementation(@Nonnull Object nexusServiceImplementation) { + this.nexusServiceImplementations.add(Objects.requireNonNull(nexusServiceImplementation)); + return this; + } + + /** + * Builds the plugin with the configured settings. + * + * @return a new plugin instance + */ + public SimplePlugin build() { + return new SimplePluginImpl(this); + } + } + + /** Private concrete implementation returned by the builder. */ + private static final class SimplePluginImpl extends SimplePlugin { + SimplePluginImpl(Builder builder) { + super(builder); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index d2aaf4728e..c421c29044 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -42,6 +42,7 @@ public final class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); private final WorkerOptions options; private final String taskQueue; + private final List plugins; final SyncWorkflowWorker workflowWorker; final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; @@ -67,9 +68,11 @@ public final class Worker { @Nonnull WorkflowExecutorCache cache, boolean useStickyTaskQueue, WorkflowThreadExecutor workflowThreadExecutor, - List contextPropagators) { + List contextPropagators, + @Nonnull List plugins) { Objects.requireNonNull(client, "client should not be null"); + this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); Preconditions.checkArgument( !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); this.taskQueue = taskQueue; @@ -469,12 +472,57 @@ public String toString() { @SuppressWarnings("deprecation") public void replayWorkflowExecution(io.temporal.internal.common.WorkflowExecutionHistory history) throws Exception { - workflowWorker.queryWorkflowExecution( - history, - WorkflowClient.QUERY_TYPE_REPLAY_ONLY, - String.class, - String.class, - new Object[] {}); + // Convert to public history type for plugin API + WorkflowExecutionHistory publicHistory = + new WorkflowExecutionHistory( + history.getHistory(), history.getWorkflowExecution().getWorkflowId()); + + // Build plugin chain in reverse order (first plugin wraps all others) + // Wrap checked exception in RuntimeException for Runnable compatibility + Runnable chain = + () -> { + try { + workflowWorker.queryWorkflowExecution( + history, + WorkflowClient.QUERY_TYPE_REPLAY_ONLY, + String.class, + String.class, + new Object[] {}); + } catch (Exception e) { + throw new ReplayException(e); + } + }; + + for (int i = plugins.size() - 1; i >= 0; i--) { + WorkerPlugin plugin = plugins.get(i); + Runnable next = chain; + chain = + () -> { + try { + plugin.replayWorkflowExecution(this, publicHistory, next); + } catch (Exception e) { + throw new ReplayException(e); + } + }; + } + + try { + chain.run(); + } catch (ReplayException e) { + throw e.getCause(); + } + } + + /** Internal exception to wrap checked exceptions during replay. */ + private static class ReplayException extends RuntimeException { + ReplayException(Exception cause) { + super(cause); + } + + @Override + public synchronized Exception getCause() { + return (Exception) super.getCause(); + } } /** diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 20540f4a9a..ce5444023d 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -7,6 +7,7 @@ import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.Experimental; import io.temporal.common.converter.DataConverter; import io.temporal.internal.client.WorkflowClientInternal; import io.temporal.internal.sync.WorkflowThreadExecutor; @@ -15,9 +16,15 @@ import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.internal.worker.WorkflowRunLockManager; import io.temporal.serviceclient.MetricsTag; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -46,6 +53,9 @@ public final class WorkerFactory { private final @Nonnull WorkflowExecutorCache cache; + /** Plugins propagated from the client and applied to this factory. */ + private final List plugins; + private State state = State.Initial; private final String statusErrorMessage = @@ -72,6 +82,18 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor WorkflowClientOptions workflowClientOptions = workflowClient.getOptions(); String namespace = workflowClientOptions.getNamespace(); + // Extract worker plugins from client (auto-propagation) + List propagatedPlugins = extractWorkerPlugins(workflowClientOptions.getPlugins()); + + // Get plugins explicitly set on factory options + WorkerPlugin[] explicitPlugins = factoryOptions != null ? factoryOptions.getPlugins() : null; + + // Merge propagated plugins with explicit plugins (propagated first) + this.plugins = mergePlugins(propagatedPlugins, explicitPlugins); + + // Apply plugin configuration to factory options (forward order) + factoryOptions = applyPluginConfiguration(factoryOptions, this.plugins); + this.factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); @@ -137,6 +159,9 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { state == State.Initial, String.format(statusErrorMessage, "create new worker", state.name(), State.Initial.name())); + // Apply plugin configuration to worker options (forward order) + options = applyWorkerPluginConfiguration(taskQueue, options, this.plugins); + // Only one worker can exist for a task queue Worker existingWorker = workers.get(taskQueue); if (existingWorker == null) { @@ -151,8 +176,16 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { cache, true, workflowThreadExecutor, - workflowClient.getOptions().getContextPropagators()); + workflowClient.getOptions().getContextPropagators(), + plugins); workers.put(taskQueue, worker); + + // Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, + // activities, etc.) + for (WorkerPlugin plugin : plugins) { + plugin.initializeWorker(taskQueue, worker); + } + return worker; } else { log.warn( @@ -163,6 +196,71 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { } } + /** + * Creates a worker specifically for replay operations. This method should be used when replaying + * workflow histories to ensure plugins receive the replay-specific configuration callbacks. + * + *

Unlike {@link #newWorker(String, WorkerOptions)}, this method: + * + *

    + *
  • Calls {@link WorkerPlugin#configureReplayWorker} instead of {@link + * WorkerPlugin#configureWorker} + *
  • Calls {@link WorkerPlugin#initializeReplayWorker} instead of {@link + * WorkerPlugin#initializeWorker} + *
+ * + *

This allows plugins to apply different configuration for replay scenarios (e.g., disabling + * certain interceptors that shouldn't run during replay). + * + * @param taskQueue task queue name for the replay worker + * @param options Options for configuring the replay worker + * @return Worker configured for replay + */ + @Experimental + public synchronized Worker newReplayWorker(String taskQueue, WorkerOptions options) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); + Preconditions.checkState( + state == State.Initial, + String.format(statusErrorMessage, "create new worker", state.name(), State.Initial.name())); + + // Apply replay-specific plugin configuration to worker options (forward order) + options = applyReplayWorkerPluginConfiguration(taskQueue, options, this.plugins); + + // Only one worker can exist for a task queue + Worker existingWorker = workers.get(taskQueue); + if (existingWorker == null) { + Worker worker = + new Worker( + workflowClient, + taskQueue, + factoryOptions, + options, + metricsScope, + runLocks, + cache, + true, + workflowThreadExecutor, + workflowClient.getOptions().getContextPropagators(), + plugins); + workers.put(taskQueue, worker); + + // Go through the plugins to call plugin initializeReplayWorker hooks (e.g. register + // workflows) + for (WorkerPlugin plugin : plugins) { + plugin.initializeReplayWorker(taskQueue, worker); + } + + return worker; + } else { + log.warn( + "Only one worker can be registered for a task queue, " + + "subsequent calls to WorkerFactory#newReplayWorker with the same task queue are ignored and " + + "initially created worker is returned"); + return existingWorker; + } + } + /** * @param taskQueue task queue name to lookup an existing worker for * @return a worker created previously through {@link #newWorker(String)} for the given task @@ -211,8 +309,59 @@ public synchronized void start() { .setNamespace(workflowClient.getOptions().getNamespace()) .build()); - for (Worker worker : workers.values()) { - worker.start(); + // Build plugin execution chain (reverse order for proper nesting) + Runnable startChain = this::doStart; + for (int i = plugins.size() - 1; i >= 0; i--) { + final Runnable next = startChain; + final WorkerPlugin workerPlugin = plugins.get(i); + startChain = + () -> { + try { + workerPlugin.startWorkerFactory(this, next); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Plugin " + workerPlugin.getName() + " failed during startup", e); + } + }; + } + + // Execute the chain + startChain.run(); + } + + /** Internal method that actually starts the workers. Called from the plugin chain. */ + private void doStart() { + // Start each worker with plugin hooks + for (Map.Entry entry : workers.entrySet()) { + String taskQueue = entry.getKey(); + Worker worker = entry.getValue(); + + // Build plugin chain for this worker (reverse order for proper nesting) + Runnable startChain = worker::start; + for (int i = plugins.size() - 1; i >= 0; i--) { + final Runnable next = startChain; + final WorkerPlugin workerPlugin = plugins.get(i); + startChain = + () -> { + try { + workerPlugin.startWorker(taskQueue, worker, next); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + "Plugin " + + workerPlugin.getName() + + " failed during worker startup for task queue " + + taskQueue, + e); + } + }; + } + + // Execute the chain for this worker + startChain.run(); } state = State.Started; @@ -286,12 +435,73 @@ public synchronized void shutdownNow() { private void shutdownInternal(boolean interruptUserTasks) { state = State.Shutdown; + + // Build plugin shutdown chain (reverse order for proper nesting) + Runnable shutdownChain = () -> doShutdown(interruptUserTasks); + for (int i = plugins.size() - 1; i >= 0; i--) { + final Runnable next = shutdownChain; + final WorkerPlugin workerPlugin = plugins.get(i); + shutdownChain = + () -> { + try { + workerPlugin.shutdownWorkerFactory(this, next); + } catch (Exception e) { + log.warn("Plugin {} failed during shutdown", workerPlugin.getName(), e); + // Still try to continue shutdown + next.run(); + } + }; + } + + // Execute the chain + shutdownChain.run(); + } + + /** Internal method that actually shuts down workers. Called from the plugin chain. */ + private void doShutdown(boolean interruptUserTasks) { ((WorkflowClientInternal) workflowClient.getInternal()).deregisterWorkerFactory(this); ShutdownManager shutdownManager = new ShutdownManager(); - CompletableFuture.allOf( - workers.values().stream() - .map(worker -> worker.shutdown(shutdownManager, interruptUserTasks)) - .toArray(CompletableFuture[]::new)) + + // Shutdown each worker with plugin hooks + List> shutdownFutures = new ArrayList<>(); + for (Map.Entry entry : workers.entrySet()) { + String taskQueue = entry.getKey(); + Worker worker = entry.getValue(); + + // Build plugin chain for this worker's shutdown (reverse order for proper nesting) + // We use a holder to capture the future from the terminal action + @SuppressWarnings("unchecked") + CompletableFuture[] futureHolder = new CompletableFuture[1]; + Runnable shutdownChain = + () -> futureHolder[0] = worker.shutdown(shutdownManager, interruptUserTasks); + + for (int i = plugins.size() - 1; i >= 0; i--) { + final Runnable next = shutdownChain; + final WorkerPlugin workerPlugin = plugins.get(i); + shutdownChain = + () -> { + try { + workerPlugin.shutdownWorker(taskQueue, worker, next); + } catch (Exception e) { + log.warn( + "Plugin {} failed during worker shutdown for task queue {}", + workerPlugin.getName(), + taskQueue, + e); + // Still try to continue shutdown + next.run(); + } + }; + } + + // Execute the shutdown chain for this worker + shutdownChain.run(); + if (futureHolder[0] != null) { + shutdownFutures.add(futureHolder[0]); + } + } + + CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])) .thenApply( r -> { cache.invalidateAll(); @@ -359,6 +569,118 @@ public String toString() { return String.format("WorkerFactory{identity=%s}", workflowClient.getOptions().getIdentity()); } + /** + * Extracts worker plugins from the workflow client plugins array. Only plugins that also + * implement {@link WorkerPlugin} are included. + */ + private static List extractWorkerPlugins( + io.temporal.client.WorkflowClientPlugin[] clientPlugins) { + if (clientPlugins == null || clientPlugins.length == 0) { + return Collections.emptyList(); + } + + List workerPlugins = new ArrayList<>(); + for (io.temporal.client.WorkflowClientPlugin plugin : clientPlugins) { + if (plugin instanceof WorkerPlugin) { + workerPlugins.add((WorkerPlugin) plugin); + } + } + return Collections.unmodifiableList(workerPlugins); + } + + /** + * Merges propagated plugins with explicitly specified plugins. Propagated plugins come first + * (from client), followed by factory-specific plugins. + */ + private static List mergePlugins( + List propagated, WorkerPlugin[] explicit) { + if ((propagated == null || propagated.isEmpty()) + && (explicit == null || explicit.length == 0)) { + return Collections.emptyList(); + } + if (propagated == null || propagated.isEmpty()) { + return Collections.unmodifiableList(Arrays.asList(explicit)); + } + if (explicit == null || explicit.length == 0) { + return propagated; + } + // Warn about duplicate plugin instances (same object in both lists) + Set propagatedSet = new HashSet<>(propagated); + for (WorkerPlugin p : explicit) { + if (propagatedSet.contains(p)) { + log.warn( + "Plugin instance {} is present in both propagated plugins (from client) and " + + "explicit plugins. It will run twice which may not be the intended behavior.", + p.getName()); + } + } + List merged = new ArrayList<>(propagated.size() + explicit.length); + merged.addAll(propagated); + merged.addAll(Arrays.asList(explicit)); + return Collections.unmodifiableList(merged); + } + + /** + * Applies plugin configuration to worker factory options. Plugins are called in forward + * (registration) order. The merged plugins are set on the builder so plugins can see the complete + * list if they inspect the builder. + */ + private static WorkerFactoryOptions applyPluginConfiguration( + WorkerFactoryOptions options, List plugins) { + WorkerFactoryOptions.Builder builder = + options == null + ? WorkerFactoryOptions.newBuilder() + : WorkerFactoryOptions.newBuilder(options); + + // Set the merged plugins on the builder so plugins see the complete list + builder.setPlugins(plugins.toArray(new WorkerPlugin[0])); + + if (plugins != null) { + for (WorkerPlugin plugin : plugins) { + plugin.configureWorkerFactory(builder); + } + } + return builder.build(); + } + + /** + * Applies plugin configuration to worker options. Plugins are called in forward (registration) + * order. + */ + private static WorkerOptions applyWorkerPluginConfiguration( + String taskQueue, WorkerOptions options, List plugins) { + if (plugins == null || plugins.isEmpty()) { + return options; + } + + WorkerOptions.Builder builder = + options == null ? WorkerOptions.newBuilder() : WorkerOptions.newBuilder(options); + + for (WorkerPlugin plugin : plugins) { + plugin.configureWorker(taskQueue, builder); + } + return builder.build(); + } + + /** + * Applies replay-specific plugin configuration to worker options. Plugins are called in forward + * (registration) order. + */ + private static WorkerOptions applyReplayWorkerPluginConfiguration( + String taskQueue, WorkerOptions options, List plugins) { + if (plugins == null || plugins.isEmpty()) { + return options; + } + + WorkerOptions.Builder builder = + options == null ? WorkerOptions.newBuilder() : WorkerOptions.newBuilder(options); + + for (WorkerPlugin plugin : plugins) { + plugin.configureReplayWorker(taskQueue, builder); + } + return builder.build(); + } + enum State { Initial, Started, diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java index c50da81cd5..ee24188110 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java @@ -37,6 +37,7 @@ public static class Builder { private int workflowCacheSize; private int maxWorkflowThreadCount; private WorkerInterceptor[] workerInterceptors; + private WorkerPlugin[] plugins; private boolean enableLoggingInReplay; private boolean usingVirtualWorkflowThreads; private ExecutorService overrideLocalActivityTaskExecutor; @@ -52,6 +53,7 @@ private Builder(WorkerFactoryOptions options) { this.workflowCacheSize = options.workflowCacheSize; this.maxWorkflowThreadCount = options.maxWorkflowThreadCount; this.workerInterceptors = options.workerInterceptors; + this.plugins = options.plugins; this.enableLoggingInReplay = options.enableLoggingInReplay; this.usingVirtualWorkflowThreads = options.usingVirtualWorkflowThreads; this.overrideLocalActivityTaskExecutor = options.overrideLocalActivityTaskExecutor; @@ -101,6 +103,24 @@ public Builder setWorkerInterceptors(WorkerInterceptor... workerInterceptors) { return this; } + /** + * Sets the worker plugins to use with workers created by this factory. Plugins can modify + * worker configuration and wrap worker lifecycle. + * + *

Note: Plugins that implement both {@link io.temporal.client.ClientPlugin} and {@link + * WorkerPlugin} are automatically propagated from the client. Use this method for worker-only + * plugins that don't need client-side configuration. + * + * @param plugins the worker plugins to use + * @return this builder for chaining + * @see WorkerPlugin + */ + @Experimental + public Builder setPlugins(WorkerPlugin... plugins) { + this.plugins = plugins; + return this; + } + public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { this.enableLoggingInReplay = enableLoggingInReplay; return this; @@ -141,18 +161,31 @@ public WorkerFactoryOptions build() { maxWorkflowThreadCount, workflowHostLocalTaskQueueScheduleToStartTimeout, workerInterceptors, + plugins, enableLoggingInReplay, usingVirtualWorkflowThreads, overrideLocalActivityTaskExecutor, false); } + /** + * Validates options and builds with defaults applied. + * + *

Note: If plugins are configured via {@link #setPlugins(WorkerPlugin...)}, they will have + * an opportunity to modify options after this method is called, when the options are passed to + * {@link WorkerFactory#newInstance}. This means validation performed here occurs before plugin + * modifications. In most cases, users should simply call {@link #build()} and let the factory + * creation handle validation. + * + * @return validated options with defaults applied + */ public WorkerFactoryOptions validateAndBuildWithDefaults() { return new WorkerFactoryOptions( workflowCacheSize, maxWorkflowThreadCount, workflowHostLocalTaskQueueScheduleToStartTimeout, workerInterceptors == null ? new WorkerInterceptor[0] : workerInterceptors, + plugins == null ? new WorkerPlugin[0] : plugins, enableLoggingInReplay, usingVirtualWorkflowThreads, overrideLocalActivityTaskExecutor, @@ -164,6 +197,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { private final int maxWorkflowThreadCount; private final @Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout; private final WorkerInterceptor[] workerInterceptors; + private final WorkerPlugin[] plugins; private final boolean enableLoggingInReplay; private final boolean usingVirtualWorkflowThreads; private final ExecutorService overrideLocalActivityTaskExecutor; @@ -173,6 +207,7 @@ private WorkerFactoryOptions( int maxWorkflowThreadCount, @Nullable Duration workflowHostLocalTaskQueueScheduleToStartTimeout, WorkerInterceptor[] workerInterceptors, + WorkerPlugin[] plugins, boolean enableLoggingInReplay, boolean usingVirtualWorkflowThreads, ExecutorService overrideLocalActivityTaskExecutor, @@ -195,12 +230,16 @@ private WorkerFactoryOptions( if (workerInterceptors == null) { workerInterceptors = new WorkerInterceptor[0]; } + if (plugins == null) { + plugins = new WorkerPlugin[0]; + } } this.workflowCacheSize = workflowCacheSize; this.maxWorkflowThreadCount = maxWorkflowThreadCount; this.workflowHostLocalTaskQueueScheduleToStartTimeout = workflowHostLocalTaskQueueScheduleToStartTimeout; this.workerInterceptors = workerInterceptors; + this.plugins = plugins; this.enableLoggingInReplay = enableLoggingInReplay; this.usingVirtualWorkflowThreads = usingVirtualWorkflowThreads; this.overrideLocalActivityTaskExecutor = overrideLocalActivityTaskExecutor; @@ -223,6 +262,16 @@ public WorkerInterceptor[] getWorkerInterceptors() { return workerInterceptors; } + /** + * Returns the worker plugins configured for this factory. + * + * @return the array of worker plugins, never null + */ + @Experimental + public WorkerPlugin[] getPlugins() { + return plugins; + } + public boolean isEnableLoggingInReplay() { return enableLoggingInReplay; } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerPlugin.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerPlugin.java new file mode 100644 index 0000000000..32646914fc --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerPlugin.java @@ -0,0 +1,293 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +import io.temporal.common.Experimental; +import io.temporal.common.SimplePlugin; +import io.temporal.common.WorkflowExecutionHistory; +import javax.annotation.Nonnull; + +/** + * Plugin interface for customizing Temporal worker configuration and lifecycle. + * + *

Plugins that also implement {@link io.temporal.client.ClientPlugin} are automatically + * propagated from the client to workers created from that client. + * + *

Example implementation: + * + *

{@code
+ * public class MetricsPlugin extends SimplePlugin {
+ *     private final MetricsRegistry registry;
+ *
+ *     public MetricsPlugin(MetricsRegistry registry) {
+ *         super("my-org.metrics");
+ *         this.registry = registry;
+ *     }
+ *
+ *     @Override
+ *     public void configureWorkerFactory(WorkerFactoryOptions.Builder builder) {
+ *         builder.setWorkerInterceptors(new MetricsWorkerInterceptor(registry));
+ *     }
+ *
+ *     @Override
+ *     public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
+ *         registry.recordWorkerStart();
+ *         try {
+ *             next.run();
+ *         } finally {
+ *             registry.recordWorkerStop();
+ *         }
+ *     }
+ * }
+ * }
+ * + * @see io.temporal.client.ClientPlugin + * @see SimplePlugin + */ +@Experimental +public interface WorkerPlugin { + + /** + * Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended + * format: "organization.plugin-name" (e.g., "io.temporal.tracing") + * + * @return fully qualified plugin name + */ + @Nonnull + String getName(); + + /** + * Allows the plugin to modify worker factory options before the factory is created. Called during + * configuration phase in forward (registration) order. + * + * @param builder the options builder to modify + */ + void configureWorkerFactory(@Nonnull WorkerFactoryOptions.Builder builder); + + /** + * Allows the plugin to modify worker options before a worker is created. Called during + * configuration phase in forward (registration) order. + * + * @param taskQueue the task queue name for the worker being created + * @param builder the options builder to modify + */ + void configureWorker(@Nonnull String taskQueue, @Nonnull WorkerOptions.Builder builder); + + /** + * Called after a worker is created, allowing plugins to register workflows, activities, Nexus + * services, and other components on the worker. + * + *

This method is called in forward (registration) order immediately after the worker is + * created in {@link WorkerFactory#newWorker}. This is the appropriate place for registrations + * because it is called before the worker starts polling. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void initializeWorker(String taskQueue, Worker worker) {
+   *     worker.registerWorkflowImplementationTypes(MyWorkflow.class);
+   *     worker.registerActivitiesImplementations(new MyActivityImpl());
+   * }
+   * }
+ * + * @param taskQueue the task queue name for the worker + * @param worker the newly created worker + */ + void initializeWorker(@Nonnull String taskQueue, @Nonnull Worker worker); + + /** + * Allows the plugin to wrap individual worker startup. Called during execution phase in reverse + * order (first plugin wraps all others) when {@link WorkerFactory#start()} is invoked. + * + *

This method is called for each worker when the factory starts. Use this for per-worker + * resource initialization, logging, or metrics. Note that workflow/activity registration should + * be done in {@link #initializeWorker} instead, as this method is called after registrations are + * finalized. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void startWorker(String taskQueue, Worker worker, Runnable next) throws Exception {
+   *     logger.info("Starting worker for task queue: {}", taskQueue);
+   *     perWorkerResources.put(taskQueue, new ResourcePool());
+   *     next.run();
+   * }
+   * }
+ * + * @param taskQueue the task queue name for the worker + * @param worker the worker being started + * @param next runnable that starts the next in chain (eventually starts the actual worker) + * @throws Exception if startup fails + */ + void startWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next) + throws Exception; + + /** + * Allows the plugin to wrap individual worker shutdown. Called during shutdown phase in reverse + * order (first plugin wraps all others) when {@link WorkerFactory#shutdown()} or {@link + * WorkerFactory#shutdownNow()} is invoked. + * + *

This method is called for each worker when the factory shuts down. Use this for per-worker + * resource cleanup that was initialized in {@link #startWorker} or {@link #initializeWorker}. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void shutdownWorker(String taskQueue, Worker worker, Runnable next) {
+   *     logger.info("Shutting down worker for task queue: {}", taskQueue);
+   *     next.run();
+   *     ResourcePool pool = perWorkerResources.remove(taskQueue);
+   *     if (pool != null) {
+   *         pool.close();
+   *     }
+   * }
+   * }
+ * + * @param taskQueue the task queue name for the worker + * @param worker the worker being shut down + * @param next runnable that shuts down the next in chain (eventually shuts down the actual + * worker) + */ + void shutdownWorker(@Nonnull String taskQueue, @Nonnull Worker worker, @Nonnull Runnable next); + + /** + * Allows the plugin to wrap worker factory startup. Called during execution phase in reverse + * order (first plugin wraps all others). + * + *

This method is called when {@link WorkerFactory#start()} is invoked. The plugin can perform + * setup before starting and cleanup logic. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void startWorkerFactory(WorkerFactory factory, Runnable next) throws Exception {
+   *     logger.info("Starting workers...");
+   *     next.run();
+   *     logger.info("Workers started");
+   * }
+   * }
+ * + * @param factory the worker factory being started + * @param next runnable that starts the next in chain (eventually starts actual workers) + * @throws Exception if startup fails + */ + void startWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next) throws Exception; + + /** + * Allows the plugin to wrap worker factory shutdown. Called during shutdown phase in reverse + * order (first plugin wraps all others). + * + *

This method is called when {@link WorkerFactory#shutdown()} or {@link + * WorkerFactory#shutdownNow()} is invoked. The plugin can perform actions before and after the + * actual shutdown occurs. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void shutdownWorkerFactory(WorkerFactory factory, Runnable next) {
+   *     logger.info("Shutting down workers...");
+   *     next.run();
+   *     logger.info("Workers shut down");
+   * }
+   * }
+ * + * @param factory the worker factory being shut down + * @param next runnable that shuts down the next in chain (eventually shuts down actual workers) + */ + void shutdownWorkerFactory(@Nonnull WorkerFactory factory, @Nonnull Runnable next) + throws Exception; + + // ==================== Replay Methods ==================== + + /** + * Allows the plugin to modify worker options when configuring a worker for replay. Called during + * replay configuration in forward (registration) order. + * + *

By default, this delegates to {@link #configureWorker}. Override this method if the plugin + * needs replay-specific configuration that differs from normal worker configuration. + * + *

This is useful when a plugin needs to apply the same settings to replay that it applies to + * normal workers (e.g., data converters, interceptors) to ensure replay behavior matches + * execution behavior. + * + * @param taskQueue the task queue name for the replay worker + * @param builder the options builder to modify + */ + default void configureReplayWorker( + @Nonnull String taskQueue, @Nonnull WorkerOptions.Builder builder) { + configureWorker(taskQueue, builder); + } + + /** + * Called after a replay worker is created, allowing plugins to register workflows and other + * components needed for replay. + * + *

By default, this delegates to {@link #initializeWorker}. Override this method if the plugin + * needs replay-specific initialization that differs from normal worker initialization. + * + * @param taskQueue the task queue name for the replay worker + * @param worker the newly created replay worker + */ + default void initializeReplayWorker(@Nonnull String taskQueue, @Nonnull Worker worker) { + initializeWorker(taskQueue, worker); + } + + /** + * Allows the plugin to wrap workflow execution replay. Called in reverse order (first plugin + * wraps all others) when replaying a workflow history. + * + *

This method allows plugins to perform setup/teardown around replay, add logging, metrics, or + * other observability for replay operations. + * + *

Example: + * + *

{@code
+   * @Override
+   * public void replayWorkflowExecution(
+   *     Worker worker, WorkflowExecutionHistory history, Runnable next) throws Exception {
+   *     logger.info("Replaying workflow: {}", history.getWorkflowExecution().getWorkflowId());
+   *     long start = System.currentTimeMillis();
+   *     try {
+   *         next.run();
+   *         logger.info("Replay succeeded in {}ms", System.currentTimeMillis() - start);
+   *     } catch (Exception e) {
+   *         logger.error("Replay failed after {}ms", System.currentTimeMillis() - start, e);
+   *         throw e;
+   *     }
+   * }
+   * }
+ * + * @param worker the worker performing the replay + * @param history the workflow execution history being replayed + * @param next runnable that performs the next in chain (eventually performs the actual replay) + * @throws Exception if replay fails + */ + default void replayWorkflowExecution( + @Nonnull Worker worker, @Nonnull WorkflowExecutionHistory history, @Nonnull Runnable next) + throws Exception { + next.run(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/client/WorkflowClientOptionsPluginTest.java b/temporal-sdk/src/test/java/io/temporal/client/WorkflowClientOptionsPluginTest.java new file mode 100644 index 0000000000..0ae9d22b3d --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/WorkflowClientOptionsPluginTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client; + +import static org.junit.Assert.*; + +import io.temporal.common.SimplePlugin; +import org.junit.Test; + +public class WorkflowClientOptionsPluginTest { + + @Test + public void testDefaultPluginsEmpty() { + WorkflowClientOptions options = WorkflowClientOptions.newBuilder().build(); + assertEquals("Default plugins should be empty", 0, options.getPlugins().length); + } + + @Test + public void testSetPlugins() { + SimplePlugin plugin1 = new TestPlugin("plugin1"); + SimplePlugin plugin2 = new TestPlugin("plugin2"); + + WorkflowClientOptions options = + WorkflowClientOptions.newBuilder().setPlugins(plugin1, plugin2).build(); + + WorkflowClientPlugin[] plugins = options.getPlugins(); + assertEquals(2, plugins.length); + assertEquals("plugin1", plugins[0].getName()); + assertEquals("plugin2", plugins[1].getName()); + } + + @Test + public void testToBuilder() { + SimplePlugin plugin = new TestPlugin("plugin"); + + WorkflowClientOptions original = WorkflowClientOptions.newBuilder().setPlugins(plugin).build(); + + WorkflowClientOptions copy = original.toBuilder().build(); + + assertEquals(1, copy.getPlugins().length); + assertEquals("plugin", copy.getPlugins()[0].getName()); + } + + @Test + public void testValidateAndBuildWithDefaults() { + SimplePlugin plugin = new TestPlugin("plugin"); + + WorkflowClientOptions options = + WorkflowClientOptions.newBuilder().setPlugins(plugin).validateAndBuildWithDefaults(); + + assertEquals(1, options.getPlugins().length); + assertEquals("plugin", options.getPlugins()[0].getName()); + } + + private static class TestPlugin extends SimplePlugin { + TestPlugin(String name) { + super(name); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/common/PluginPropagationTest.java b/temporal-sdk/src/test/java/io/temporal/common/PluginPropagationTest.java new file mode 100644 index 0000000000..1026b407d5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/PluginPropagationTest.java @@ -0,0 +1,282 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common; + +import static org.junit.Assert.*; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.testing.TestEnvironmentOptions; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.worker.WorkerPlugin; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +/** + * Tests that plugins propagate through the full chain: WorkflowServiceStubsOptions → + * WorkflowClientOptions → WorkerFactory + */ +public class PluginPropagationTest { + + @Test + public void testPluginPropagatesFromServiceStubsToWorkerFactory() { + List callLog = new ArrayList<>(); + + // Create a plugin that tracks all configuration calls + SimplePlugin trackingPlugin = + SimplePlugin.newBuilder("tracking-plugin") + .customizeServiceStubs( + builder -> { + callLog.add("configureServiceStubs"); + }) + .customizeClient( + builder -> { + callLog.add("configureWorkflowClient"); + }) + .customizeWorkerFactory( + builder -> { + callLog.add("configureWorkerFactory"); + }) + .customizeWorker( + builder -> { + callLog.add("configureWorker"); + }) + .build(); + + // Set the plugin ONLY on WorkflowServiceStubsOptions + WorkflowServiceStubsOptions stubsOptions = + WorkflowServiceStubsOptions.newBuilder() + .setPlugins((io.temporal.serviceclient.WorkflowServiceStubsPlugin) trackingPlugin) + .build(); + + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder().setWorkflowServiceStubsOptions(stubsOptions).build(); + + // Create the test environment - this triggers the full propagation chain + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); + try { + // Create a worker to trigger configureWorker + env.newWorker("test-task-queue"); + + // Verify the plugin was called at each level + assertTrue( + "configureServiceStubs should be called", callLog.contains("configureServiceStubs")); + assertTrue( + "configureWorkflowClient should be called (propagated from service stubs)", + callLog.contains("configureWorkflowClient")); + assertTrue( + "configureWorkerFactory should be called (propagated from client)", + callLog.contains("configureWorkerFactory")); + assertTrue( + "configureWorker should be called (propagated from client)", + callLog.contains("configureWorker")); + + // Verify the order: service stubs -> client -> worker factory -> worker + assertEquals( + "Configuration should happen in correct order", + Arrays.asList( + "configureServiceStubs", + "configureWorkflowClient", + "configureWorkerFactory", + "configureWorker"), + callLog); + } finally { + env.close(); + } + } + + @Test + public void testPluginSetOnClientOnlyDoesNotAffectServiceStubs() { + List callLog = new ArrayList<>(); + + // Create a plugin that tracks all configuration calls + SimplePlugin trackingPlugin = + SimplePlugin.newBuilder("tracking-plugin") + .customizeServiceStubs( + builder -> { + callLog.add("configureServiceStubs"); + }) + .customizeClient( + builder -> { + callLog.add("configureWorkflowClient"); + }) + .customizeWorkerFactory( + builder -> { + callLog.add("configureWorkerFactory"); + }) + .build(); + + // Set the plugin ONLY on WorkflowClientOptions (not service stubs) + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder() + .setPlugins((io.temporal.client.WorkflowClientPlugin) trackingPlugin) + .build(); + + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder().setWorkflowClientOptions(clientOptions).build(); + + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); + try { + env.newWorker("test-task-queue"); + + // configureServiceStubs should NOT be called (plugin wasn't set there) + assertFalse( + "configureServiceStubs should NOT be called", callLog.contains("configureServiceStubs")); + + // But client and worker factory should be called + assertTrue( + "configureWorkflowClient should be called", callLog.contains("configureWorkflowClient")); + assertTrue( + "configureWorkerFactory should be called", callLog.contains("configureWorkerFactory")); + } finally { + env.close(); + } + } + + @Test + public void testMergedPluginsFromBothLevels() { + List callLog = new ArrayList<>(); + + // Plugin set on service stubs + SimplePlugin stubsPlugin = + SimplePlugin.newBuilder("stubs-plugin") + .customizeClient(builder -> callLog.add("stubs-plugin-configureWorkflowClient")) + .build(); + + // Different plugin set on client + SimplePlugin clientPlugin = + SimplePlugin.newBuilder("client-plugin") + .customizeClient(builder -> callLog.add("client-plugin-configureWorkflowClient")) + .build(); + + WorkflowServiceStubsOptions stubsOptions = + WorkflowServiceStubsOptions.newBuilder() + .setPlugins((io.temporal.serviceclient.WorkflowServiceStubsPlugin) stubsPlugin) + .build(); + + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder() + .setPlugins((io.temporal.client.WorkflowClientPlugin) clientPlugin) + .build(); + + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder() + .setWorkflowServiceStubsOptions(stubsOptions) + .setWorkflowClientOptions(clientOptions) + .build(); + + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); + try { + // Both plugins should have their configureWorkflowClient called + // Propagated plugins come first, then explicit client plugins + assertEquals( + "Both plugins should be called in correct order", + Arrays.asList( + "stubs-plugin-configureWorkflowClient", "client-plugin-configureWorkflowClient"), + callLog); + } finally { + env.close(); + } + } + + @Test + public void testWorkerOnlyPluginOnFactoryOptions() { + List callLog = new ArrayList<>(); + + // Create a plugin that only uses worker-level customization + // (Even though SimplePlugin implements all interfaces, we only set worker callbacks) + SimplePlugin workerOnlyPlugin = + SimplePlugin.newBuilder("worker-only-plugin") + .customizeWorkerFactory(builder -> callLog.add("worker-only-configureWorkerFactory")) + .customizeWorker(builder -> callLog.add("worker-only-configureWorker")) + .build(); + + // Set the plugin on WorkerFactoryOptions (not on client) + io.temporal.worker.WorkerFactoryOptions factoryOptions = + io.temporal.worker.WorkerFactoryOptions.newBuilder() + .setPlugins((WorkerPlugin) workerOnlyPlugin) + .build(); + + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder().setWorkerFactoryOptions(factoryOptions).build(); + + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); + try { + env.newWorker("test-task-queue"); + + // Worker-only plugin should have its methods called + assertTrue( + "configureWorkerFactory should be called", + callLog.contains("worker-only-configureWorkerFactory")); + assertTrue( + "configureWorker should be called", callLog.contains("worker-only-configureWorker")); + } finally { + env.close(); + } + } + + @Test + public void testMergedPluginsAtWorkerFactoryLevel() { + List callLog = new ArrayList<>(); + + // Plugin propagated from client + SimplePlugin clientPlugin = + SimplePlugin.newBuilder("client-plugin") + .customizeWorkerFactory(builder -> callLog.add("client-plugin-configureWorkerFactory")) + .build(); + + // Plugin set directly on factory options + SimplePlugin factoryPlugin = + SimplePlugin.newBuilder("factory-plugin") + .customizeWorkerFactory(builder -> callLog.add("factory-plugin-configureWorkerFactory")) + .build(); + + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder() + .setPlugins((io.temporal.client.WorkflowClientPlugin) clientPlugin) + .build(); + + io.temporal.worker.WorkerFactoryOptions factoryOptions = + io.temporal.worker.WorkerFactoryOptions.newBuilder() + .setPlugins((WorkerPlugin) factoryPlugin) + .build(); + + TestEnvironmentOptions testOptions = + TestEnvironmentOptions.newBuilder() + .setWorkflowClientOptions(clientOptions) + .setWorkerFactoryOptions(factoryOptions) + .build(); + + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(testOptions); + try { + // Both plugins should be called - propagated first, then explicit + assertEquals( + "Both plugins should be called in correct order", + Arrays.asList( + "client-plugin-configureWorkerFactory", "factory-plugin-configureWorkerFactory"), + callLog); + } finally { + env.close(); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/common/PluginTest.java b/temporal-sdk/src/test/java/io/temporal/common/PluginTest.java new file mode 100644 index 0000000000..018b2ef150 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/PluginTest.java @@ -0,0 +1,272 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common; + +import static org.junit.Assert.*; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class PluginTest { + + @Test + public void testSimplePluginDefaultBehavior() throws Exception { + SimplePlugin plugin = new SimplePlugin("test") {}; + + // Test configureServiceStubs doesn't throw (no customizers) + WorkflowServiceStubsOptions.Builder stubsBuilder = WorkflowServiceStubsOptions.newBuilder(); + plugin.configureServiceStubs(stubsBuilder); + + // Test configureWorkflowClient doesn't throw (no customizers) + WorkflowClientOptions.Builder clientBuilder = WorkflowClientOptions.newBuilder(); + plugin.configureWorkflowClient(clientBuilder); + + // Test configureWorkerFactory doesn't throw (no customizers) + WorkerFactoryOptions.Builder factoryBuilder = WorkerFactoryOptions.newBuilder(); + plugin.configureWorkerFactory(factoryBuilder); + + // Test configureWorker doesn't throw (no customizers) + WorkerOptions.Builder workerBuilder = WorkerOptions.newBuilder(); + plugin.configureWorker("test-queue", workerBuilder); + + // Test startWorkerFactory calls next + final boolean[] called = {false}; + plugin.startWorkerFactory(null, () -> called[0] = true); + assertTrue("startWorkerFactory should call next", called[0]); + + // Test startWorker calls next + called[0] = false; + plugin.startWorker("test-queue", null, () -> called[0] = true); + assertTrue("startWorker should call next", called[0]); + + // Test shutdownWorker calls next + called[0] = false; + plugin.shutdownWorker("test-queue", null, () -> called[0] = true); + assertTrue("shutdownWorker should call next", called[0]); + + // Test shutdownWorkerFactory calls next + called[0] = false; + plugin.shutdownWorkerFactory(null, () -> called[0] = true); + assertTrue("shutdownWorkerFactory should call next", called[0]); + + // Test initializeWorker is a no-op (doesn't throw) + plugin.initializeWorker("test-queue", null); + } + + @Test + public void testConfigurationPhaseOrder() { + List order = new ArrayList<>(); + + SimplePlugin pluginA = createTrackingPlugin("A", order); + SimplePlugin pluginB = createTrackingPlugin("B", order); + SimplePlugin pluginC = createTrackingPlugin("C", order); + + List plugins = Arrays.asList(pluginA, pluginB, pluginC); + + // Simulate configuration phase (forward order) + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(); + for (Object plugin : plugins) { + if (plugin instanceof io.temporal.client.WorkflowClientPlugin) { + ((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder); + } + } + + // Configuration should be in forward order + assertEquals(Arrays.asList("A-config", "B-config", "C-config"), order); + } + + @Test + public void testExecutionPhaseReverseOrder() throws Exception { + List order = new ArrayList<>(); + + SimplePlugin pluginA = createExecutionTrackingPlugin("A", order); + SimplePlugin pluginB = createExecutionTrackingPlugin("B", order); + SimplePlugin pluginC = createExecutionTrackingPlugin("C", order); + + List plugins = Arrays.asList(pluginA, pluginB, pluginC); + + // Build chain in reverse (like WorkerFactory does) + Runnable chain = + () -> { + order.add("terminal"); + }; + + List reversed = new ArrayList<>(plugins); + java.util.Collections.reverse(reversed); + for (Object plugin : reversed) { + if (plugin instanceof io.temporal.worker.WorkerPlugin) { + final Runnable next = chain; + final io.temporal.worker.WorkerPlugin workerPlugin = + (io.temporal.worker.WorkerPlugin) plugin; + chain = + () -> { + order.add(workerPlugin.getName() + "-before"); + try { + workerPlugin.startWorkerFactory(null, next); + } catch (Exception e) { + throw new RuntimeException(e); + } + order.add(workerPlugin.getName() + "-after"); + }; + } + } + + // Execute the chain + chain.run(); + + // First plugin should wrap all others + assertEquals( + Arrays.asList( + "A-before", "B-before", "C-before", "terminal", "C-after", "B-after", "A-after"), + order); + } + + @Test + public void testStartWorkerReverseOrder() throws Exception { + List order = new ArrayList<>(); + + SimplePlugin pluginA = createWorkerLifecycleTrackingPlugin("A", order); + SimplePlugin pluginB = createWorkerLifecycleTrackingPlugin("B", order); + SimplePlugin pluginC = createWorkerLifecycleTrackingPlugin("C", order); + + List plugins = Arrays.asList(pluginA, pluginB, pluginC); + + // Build chain in reverse (like WorkerFactory does) + Runnable chain = () -> order.add("worker-start"); + + List reversed = new ArrayList<>(plugins); + java.util.Collections.reverse(reversed); + for (Object plugin : reversed) { + if (plugin instanceof io.temporal.worker.WorkerPlugin) { + final Runnable next = chain; + final io.temporal.worker.WorkerPlugin workerPlugin = + (io.temporal.worker.WorkerPlugin) plugin; + chain = + () -> { + order.add(workerPlugin.getName() + "-startWorker-before"); + try { + workerPlugin.startWorker("test-queue", null, next); + } catch (Exception e) { + throw new RuntimeException(e); + } + order.add(workerPlugin.getName() + "-startWorker-after"); + }; + } + } + + chain.run(); + + // First plugin should wrap all others + assertEquals( + Arrays.asList( + "A-startWorker-before", + "B-startWorker-before", + "C-startWorker-before", + "worker-start", + "C-startWorker-after", + "B-startWorker-after", + "A-startWorker-after"), + order); + } + + @Test + public void testShutdownWorkerReverseOrder() { + List order = new ArrayList<>(); + + SimplePlugin pluginA = createWorkerLifecycleTrackingPlugin("A", order); + SimplePlugin pluginB = createWorkerLifecycleTrackingPlugin("B", order); + SimplePlugin pluginC = createWorkerLifecycleTrackingPlugin("C", order); + + List plugins = Arrays.asList(pluginA, pluginB, pluginC); + + // Build chain in reverse (like WorkerFactory does) + Runnable chain = () -> order.add("worker-shutdown"); + + List reversed = new ArrayList<>(plugins); + java.util.Collections.reverse(reversed); + for (Object plugin : reversed) { + if (plugin instanceof io.temporal.worker.WorkerPlugin) { + final Runnable next = chain; + final io.temporal.worker.WorkerPlugin workerPlugin = + (io.temporal.worker.WorkerPlugin) plugin; + chain = + () -> { + order.add(workerPlugin.getName() + "-shutdownWorker-before"); + workerPlugin.shutdownWorker("test-queue", null, next); + order.add(workerPlugin.getName() + "-shutdownWorker-after"); + }; + } + } + + chain.run(); + + // First plugin should wrap all others + assertEquals( + Arrays.asList( + "A-shutdownWorker-before", + "B-shutdownWorker-before", + "C-shutdownWorker-before", + "worker-shutdown", + "C-shutdownWorker-after", + "B-shutdownWorker-after", + "A-shutdownWorker-after"), + order); + } + + private SimplePlugin createTrackingPlugin(String name, List order) { + return new SimplePlugin(name) { + @Override + public void configureWorkflowClient(WorkflowClientOptions.Builder builder) { + order.add(name + "-config"); + } + }; + } + + private SimplePlugin createExecutionTrackingPlugin(String name, List order) { + return new SimplePlugin(name) { + @Override + public void startWorkerFactory(io.temporal.worker.WorkerFactory factory, Runnable next) { + next.run(); + } + }; + } + + private SimplePlugin createWorkerLifecycleTrackingPlugin(String name, List order) { + return new SimplePlugin(name) { + @Override + public void startWorker(String taskQueue, io.temporal.worker.Worker worker, Runnable next) { + next.run(); + } + + @Override + public void shutdownWorker( + String taskQueue, io.temporal.worker.Worker worker, Runnable next) { + next.run(); + } + }; + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/common/SimplePluginBuilderTest.java b/temporal-sdk/src/test/java/io/temporal/common/SimplePluginBuilderTest.java new file mode 100644 index 0000000000..60f7d7df3a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/SimplePluginBuilderTest.java @@ -0,0 +1,697 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; +import io.temporal.common.interceptors.WorkflowClientInterceptor; +import io.temporal.common.interceptors.WorkflowClientInterceptorBase; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkerPlugin; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; + +public class SimplePluginBuilderTest { + + @Test + public void testSimplePluginName() { + SimplePlugin plugin = SimplePlugin.newBuilder("test-plugin").build(); + assertEquals("test-plugin", plugin.getName()); + } + + @Test + public void testSimplePluginImplementsAllInterfaces() { + SimplePlugin plugin = SimplePlugin.newBuilder("test").build(); + assertTrue( + "Should implement WorkflowServiceStubsPlugin", + plugin instanceof io.temporal.serviceclient.WorkflowServiceStubsPlugin); + assertTrue( + "Should implement WorkflowClientPlugin", + plugin instanceof io.temporal.client.WorkflowClientPlugin); + assertTrue("Should implement WorkerPlugin", plugin instanceof io.temporal.worker.WorkerPlugin); + } + + @Test + public void testCustomizeServiceStubs() { + AtomicBoolean customized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeServiceStubs( + builder -> { + customized.set(true); + }) + .build(); + + WorkflowServiceStubsOptions.Builder builder = WorkflowServiceStubsOptions.newBuilder(); + ((io.temporal.serviceclient.WorkflowServiceStubsPlugin) plugin).configureServiceStubs(builder); + + assertTrue("Customizer should have been called", customized.get()); + } + + @Test + public void testCustomizeClient() { + AtomicBoolean customized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeClient( + builder -> { + customized.set(true); + builder.setIdentity("custom-identity"); + }) + .build(); + + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(); + ((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder); + + assertTrue("Customizer should have been called", customized.get()); + assertEquals("custom-identity", builder.build().getIdentity()); + } + + @Test + public void testCustomizeWorkerFactory() { + AtomicBoolean customized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeWorkerFactory( + builder -> { + customized.set(true); + builder.setWorkflowCacheSize(100); + }) + .build(); + + WorkerFactoryOptions.Builder builder = WorkerFactoryOptions.newBuilder(); + ((io.temporal.worker.WorkerPlugin) plugin).configureWorkerFactory(builder); + + assertTrue("Customizer should have been called", customized.get()); + assertEquals(100, builder.build().getWorkflowCacheSize()); + } + + @Test + public void testCustomizeWorker() { + AtomicBoolean customized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeWorker( + builder -> { + customized.set(true); + builder.setMaxConcurrentActivityExecutionSize(50); + }) + .build(); + + WorkerOptions.Builder builder = WorkerOptions.newBuilder(); + ((io.temporal.worker.WorkerPlugin) plugin).configureWorker("test-queue", builder); + + assertTrue("Customizer should have been called", customized.get()); + assertEquals(50, builder.build().getMaxConcurrentActivityExecutionSize()); + } + + @Test + public void testMultipleCustomizers() { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeClient(builder -> callCount.incrementAndGet()) + .customizeClient(builder -> callCount.incrementAndGet()) + .customizeClient(builder -> callCount.incrementAndGet()) + .build(); + + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(); + ((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder); + + assertEquals("All customizers should be called", 3, callCount.get()); + } + + @Test + public void testAddWorkerInterceptors() { + WorkerInterceptor interceptor = new WorkerInterceptorBase() {}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test").addWorkerInterceptors(interceptor).build(); + + WorkerFactoryOptions.Builder builder = WorkerFactoryOptions.newBuilder(); + ((io.temporal.worker.WorkerPlugin) plugin).configureWorkerFactory(builder); + + WorkerInterceptor[] interceptors = builder.build().getWorkerInterceptors(); + assertEquals(1, interceptors.length); + assertSame(interceptor, interceptors[0]); + } + + @Test + public void testAddClientInterceptors() { + WorkflowClientInterceptor interceptor = new WorkflowClientInterceptorBase() {}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test").addClientInterceptors(interceptor).build(); + + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(); + ((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder); + + WorkflowClientInterceptor[] interceptors = builder.build().getInterceptors(); + assertEquals(1, interceptors.length); + assertSame(interceptor, interceptors[0]); + } + + @Test + public void testInterceptorsAppendToExisting() { + WorkerInterceptor existingInterceptor = new WorkerInterceptorBase() {}; + WorkerInterceptor newInterceptor = new WorkerInterceptorBase() {}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test").addWorkerInterceptors(newInterceptor).build(); + + WorkerFactoryOptions.Builder builder = + WorkerFactoryOptions.newBuilder().setWorkerInterceptors(existingInterceptor); + ((io.temporal.worker.WorkerPlugin) plugin).configureWorkerFactory(builder); + + WorkerInterceptor[] interceptors = builder.build().getWorkerInterceptors(); + assertEquals(2, interceptors.length); + assertSame(existingInterceptor, interceptors[0]); + assertSame(newInterceptor, interceptors[1]); + } + + @Test + public void testInitializeWorker() { + AtomicBoolean initialized = new AtomicBoolean(false); + String[] capturedTaskQueue = {null}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .initializeWorker( + (taskQueue, worker) -> { + initialized.set(true); + capturedTaskQueue[0] = taskQueue; + }) + .build(); + + // Call initializeWorker with null worker (we're just testing the callback is invoked) + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("my-task-queue", null); + + assertTrue("Initializer should have been called", initialized.get()); + assertEquals("my-task-queue", capturedTaskQueue[0]); + } + + @Test + public void testMultipleWorkerInitializers() { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .initializeWorker((taskQueue, worker) -> callCount.incrementAndGet()) + .initializeWorker((taskQueue, worker) -> callCount.incrementAndGet()) + .initializeWorker((taskQueue, worker) -> callCount.incrementAndGet()) + .build(); + + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", null); + + assertEquals("All initializers should be called", 3, callCount.get()); + } + + @Test(expected = NullPointerException.class) + public void testNullInitializeWorker() { + SimplePlugin.newBuilder("test").initializeWorker(null); + } + + @Test + public void testOnWorkerStart() throws Exception { + AtomicBoolean started = new AtomicBoolean(false); + String[] capturedTaskQueue = {null}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerStart( + (taskQueue, worker) -> { + started.set(true); + capturedTaskQueue[0] = taskQueue; + }) + .build(); + + AtomicBoolean nextCalled = new AtomicBoolean(false); + ((io.temporal.worker.WorkerPlugin) plugin) + .startWorker("my-task-queue", null, () -> nextCalled.set(true)); + + assertTrue("next should be called", nextCalled.get()); + assertTrue("Callback should have been called", started.get()); + assertEquals("my-task-queue", capturedTaskQueue[0]); + } + + @Test + public void testOnWorkerShutdown() { + AtomicBoolean shutdown = new AtomicBoolean(false); + String[] capturedTaskQueue = {null}; + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerShutdown( + (taskQueue, worker) -> { + shutdown.set(true); + capturedTaskQueue[0] = taskQueue; + }) + .build(); + + AtomicBoolean nextCalled = new AtomicBoolean(false); + ((io.temporal.worker.WorkerPlugin) plugin) + .shutdownWorker("my-task-queue", null, () -> nextCalled.set(true)); + + assertTrue("next should be called", nextCalled.get()); + assertTrue("Callback should have been called", shutdown.get()); + assertEquals("my-task-queue", capturedTaskQueue[0]); + } + + @Test + public void testMultipleOnWorkerStartCallbacks() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerStart((taskQueue, worker) -> callCount.incrementAndGet()) + .onWorkerStart((taskQueue, worker) -> callCount.incrementAndGet()) + .onWorkerStart((taskQueue, worker) -> callCount.incrementAndGet()) + .build(); + + ((io.temporal.worker.WorkerPlugin) plugin).startWorker("test-queue", null, () -> {}); + + assertEquals("All callbacks should be called", 3, callCount.get()); + } + + @Test + public void testMultipleOnWorkerShutdownCallbacks() { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerShutdown((taskQueue, worker) -> callCount.incrementAndGet()) + .onWorkerShutdown((taskQueue, worker) -> callCount.incrementAndGet()) + .onWorkerShutdown((taskQueue, worker) -> callCount.incrementAndGet()) + .build(); + + ((io.temporal.worker.WorkerPlugin) plugin).shutdownWorker("test-queue", null, () -> {}); + + assertEquals("All callbacks should be called", 3, callCount.get()); + } + + @Test + public void testOnWorkerFactoryStart() throws Exception { + AtomicBoolean started = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test").onWorkerFactoryStart(factory -> started.set(true)).build(); + + AtomicBoolean nextCalled = new AtomicBoolean(false); + ((io.temporal.worker.WorkerPlugin) plugin).startWorkerFactory(null, () -> nextCalled.set(true)); + + assertTrue("next should be called", nextCalled.get()); + assertTrue("Callback should have been called", started.get()); + } + + @Test + public void testOnWorkerFactoryShutdown() throws Exception { + AtomicBoolean shutdown = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerFactoryShutdown(factory -> shutdown.set(true)) + .build(); + + AtomicBoolean nextCalled = new AtomicBoolean(false); + ((io.temporal.worker.WorkerPlugin) plugin) + .shutdownWorkerFactory(null, () -> nextCalled.set(true)); + + assertTrue("next should be called", nextCalled.get()); + assertTrue("Callback should have been called", shutdown.get()); + } + + @Test + public void testMultipleOnWorkerFactoryStartCallbacks() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerFactoryStart(factory -> callCount.incrementAndGet()) + .onWorkerFactoryStart(factory -> callCount.incrementAndGet()) + .onWorkerFactoryStart(factory -> callCount.incrementAndGet()) + .build(); + + ((io.temporal.worker.WorkerPlugin) plugin).startWorkerFactory(null, () -> {}); + + assertEquals("All callbacks should be called", 3, callCount.get()); + } + + @Test + public void testMultipleOnWorkerFactoryShutdownCallbacks() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onWorkerFactoryShutdown(factory -> callCount.incrementAndGet()) + .onWorkerFactoryShutdown(factory -> callCount.incrementAndGet()) + .onWorkerFactoryShutdown(factory -> callCount.incrementAndGet()) + .build(); + + ((io.temporal.worker.WorkerPlugin) plugin).shutdownWorkerFactory(null, () -> {}); + + assertEquals("All callbacks should be called", 3, callCount.get()); + } + + @Test(expected = NullPointerException.class) + public void testNullOnWorkerStart() { + SimplePlugin.newBuilder("test").onWorkerStart(null); + } + + @Test(expected = NullPointerException.class) + public void testNullOnWorkerShutdown() { + SimplePlugin.newBuilder("test").onWorkerShutdown(null); + } + + @Test(expected = NullPointerException.class) + public void testNullOnWorkerFactoryStart() { + SimplePlugin.newBuilder("test").onWorkerFactoryStart(null); + } + + @Test(expected = NullPointerException.class) + public void testNullOnWorkerFactoryShutdown() { + SimplePlugin.newBuilder("test").onWorkerFactoryShutdown(null); + } + + @Test(expected = NullPointerException.class) + public void testNullName() { + SimplePlugin.newBuilder(null); + } + + @Test(expected = NullPointerException.class) + public void testNullCustomizer() { + SimplePlugin.newBuilder("test").customizeClient(null); + } + + @Test + public void testSetDataConverter() { + DataConverter customConverter = mock(DataConverter.class); + + SimplePlugin plugin = SimplePlugin.newBuilder("test").setDataConverter(customConverter).build(); + + WorkflowClientOptions.Builder builder = WorkflowClientOptions.newBuilder(); + ((io.temporal.client.WorkflowClientPlugin) plugin).configureWorkflowClient(builder); + + assertSame(customConverter, builder.build().getDataConverter()); + } + + @Test(expected = NullPointerException.class) + public void testNullDataConverter() { + SimplePlugin.newBuilder("test").setDataConverter(null); + } + + @Test + public void testRegisterWorkflowImplementationTypes() { + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .registerWorkflowImplementationTypes(String.class, Integer.class) + .build(); + + Worker mockWorker = mock(Worker.class); + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker); + + verify(mockWorker).registerWorkflowImplementationTypes(String.class, Integer.class); + } + + @Test + public void testRegisterActivitiesImplementations() { + Object activity1 = new Object(); + Object activity2 = new Object(); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .registerActivitiesImplementations(activity1, activity2) + .build(); + + Worker mockWorker = mock(Worker.class); + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker); + + verify(mockWorker).registerActivitiesImplementations(activity1, activity2); + } + + @Test + public void testRegisterNexusServiceImplementation() { + Object nexusService = new Object(); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test").registerNexusServiceImplementation(nexusService).build(); + + Worker mockWorker = mock(Worker.class); + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker); + + verify(mockWorker).registerNexusServiceImplementation(nexusService); + } + + @Test + public void testRegisterMultipleNexusServiceImplementations() { + Object nexusService1 = new Object(); + Object nexusService2 = new Object(); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .registerNexusServiceImplementation(nexusService1) + .registerNexusServiceImplementation(nexusService2) + .build(); + + Worker mockWorker = mock(Worker.class); + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker); + + verify(mockWorker).registerNexusServiceImplementation(nexusService1); + verify(mockWorker).registerNexusServiceImplementation(nexusService2); + } + + @Test(expected = NullPointerException.class) + public void testNullNexusServiceImplementation() { + SimplePlugin.newBuilder("test").registerNexusServiceImplementation(null); + } + + @Test + public void testRegistrationsWithCustomInitializer() { + AtomicBoolean customInitializerCalled = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .registerWorkflowImplementationTypes(String.class) + .registerActivitiesImplementations(new Object()) + .initializeWorker((taskQueue, worker) -> customInitializerCalled.set(true)) + .build(); + + Worker mockWorker = mock(Worker.class); + ((io.temporal.worker.WorkerPlugin) plugin).initializeWorker("test-queue", mockWorker); + + // Verify registrations happen before custom initializer + verify(mockWorker).registerWorkflowImplementationTypes(String.class); + verify(mockWorker).registerActivitiesImplementations(any()); + assertTrue( + "Custom initializer should be called after registrations", customInitializerCalled.get()); + } + + // ==================== Replay Tests ==================== + + @Test + public void testCustomizeReplayWorker() { + AtomicBoolean customized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeReplayWorker( + builder -> { + customized.set(true); + builder.setMaxConcurrentActivityExecutionSize(25); + }) + .build(); + + WorkerOptions.Builder builder = WorkerOptions.newBuilder(); + ((WorkerPlugin) plugin).configureReplayWorker("test-queue", builder); + + assertTrue("Replay customizer should have been called", customized.get()); + assertEquals(25, builder.build().getMaxConcurrentActivityExecutionSize()); + } + + @Test + public void testCustomizeReplayWorkerDelegatesToConfigureWorkerWhenEmpty() { + AtomicBoolean workerCustomized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeWorker(builder -> workerCustomized.set(true)) + // No replay customizer set + .build(); + + WorkerOptions.Builder builder = WorkerOptions.newBuilder(); + ((WorkerPlugin) plugin).configureReplayWorker("test-queue", builder); + + assertTrue( + "Should delegate to configureWorker when no replay customizers", workerCustomized.get()); + } + + @Test + public void testCustomizeReplayWorkerDoesNotDelegateWhenSet() { + AtomicBoolean workerCustomized = new AtomicBoolean(false); + AtomicBoolean replayCustomized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .customizeWorker(builder -> workerCustomized.set(true)) + .customizeReplayWorker(builder -> replayCustomized.set(true)) + .build(); + + WorkerOptions.Builder builder = WorkerOptions.newBuilder(); + ((WorkerPlugin) plugin).configureReplayWorker("test-queue", builder); + + assertFalse( + "Should NOT delegate to configureWorker when replay customizer is set", + workerCustomized.get()); + assertTrue("Replay customizer should be called", replayCustomized.get()); + } + + @Test + public void testInitializeReplayWorker() { + AtomicBoolean initialized = new AtomicBoolean(false); + AtomicReference capturedTaskQueue = new AtomicReference<>(); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .initializeReplayWorker( + (taskQueue, worker) -> { + initialized.set(true); + capturedTaskQueue.set(taskQueue); + }) + .build(); + + ((WorkerPlugin) plugin).initializeReplayWorker("replay-queue", null); + + assertTrue("Replay initializer should have been called", initialized.get()); + assertEquals("replay-queue", capturedTaskQueue.get()); + } + + @Test + public void testInitializeReplayWorkerDelegatesToInitializeWorkerWhenEmpty() { + AtomicBoolean workerInitialized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .initializeWorker((taskQueue, worker) -> workerInitialized.set(true)) + // No replay initializer set + .build(); + + ((WorkerPlugin) plugin).initializeReplayWorker("test-queue", null); + + assertTrue( + "Should delegate to initializeWorker when no replay initializers", workerInitialized.get()); + } + + @Test + public void testInitializeReplayWorkerDoesNotDelegateWhenSet() { + AtomicBoolean workerInitialized = new AtomicBoolean(false); + AtomicBoolean replayInitialized = new AtomicBoolean(false); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .initializeWorker((taskQueue, worker) -> workerInitialized.set(true)) + .initializeReplayWorker((taskQueue, worker) -> replayInitialized.set(true)) + .build(); + + ((WorkerPlugin) plugin).initializeReplayWorker("test-queue", null); + + assertFalse( + "Should NOT delegate to initializeWorker when replay initializer is set", + workerInitialized.get()); + assertTrue("Replay initializer should be called", replayInitialized.get()); + } + + @Test + public void testOnReplayWorkflowExecution() throws Exception { + AtomicBoolean callbackCalled = new AtomicBoolean(false); + AtomicReference capturedWorker = new AtomicReference<>(); + AtomicReference capturedHistory = new AtomicReference<>(); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onReplayWorkflowExecution( + (worker, history) -> { + callbackCalled.set(true); + capturedWorker.set(worker); + capturedHistory.set(history); + }) + .build(); + + Worker mockWorker = mock(Worker.class); + WorkflowExecutionHistory mockHistory = mock(WorkflowExecutionHistory.class); + AtomicBoolean nextCalled = new AtomicBoolean(false); + + ((WorkerPlugin) plugin) + .replayWorkflowExecution(mockWorker, mockHistory, () -> nextCalled.set(true)); + + assertTrue("next should be called", nextCalled.get()); + assertTrue("Callback should have been called", callbackCalled.get()); + assertSame(mockWorker, capturedWorker.get()); + assertSame(mockHistory, capturedHistory.get()); + } + + @Test + public void testMultipleOnReplayWorkflowExecutionCallbacks() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + SimplePlugin plugin = + SimplePlugin.newBuilder("test") + .onReplayWorkflowExecution((worker, history) -> callCount.incrementAndGet()) + .onReplayWorkflowExecution((worker, history) -> callCount.incrementAndGet()) + .onReplayWorkflowExecution((worker, history) -> callCount.incrementAndGet()) + .build(); + + Worker mockWorker = mock(Worker.class); + WorkflowExecutionHistory mockHistory = mock(WorkflowExecutionHistory.class); + + ((WorkerPlugin) plugin).replayWorkflowExecution(mockWorker, mockHistory, () -> {}); + + assertEquals("All callbacks should be called", 3, callCount.get()); + } + + @Test(expected = NullPointerException.class) + public void testNullCustomizeReplayWorker() { + SimplePlugin.newBuilder("test").customizeReplayWorker(null); + } + + @Test(expected = NullPointerException.class) + public void testNullInitializeReplayWorker() { + SimplePlugin.newBuilder("test").initializeReplayWorker(null); + } + + @Test(expected = NullPointerException.class) + public void testNullOnReplayWorkflowExecution() { + SimplePlugin.newBuilder("test").onReplayWorkflowExecution(null); + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubs.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubs.java index 2ce76512a8..807adbd1a9 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubs.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubs.java @@ -6,6 +6,7 @@ import io.temporal.internal.WorkflowThreadMarker; import io.temporal.internal.testservice.InProcessGRPCServer; import java.time.Duration; +import java.util.function.Supplier; import javax.annotation.Nullable; /** Initializes and holds gRPC blocking and future stubs. */ @@ -32,6 +33,17 @@ static WorkflowServiceStubs newLocalServiceStubs() { * WorkflowServiceStubs#connect(Duration)} after creation or use {@link * #newConnectedServiceStubs(WorkflowServiceStubsOptions, Duration)} instead of this method. * + *

If the options contain plugins (via {@link + * WorkflowServiceStubsOptions.Builder#setPlugins(WorkflowServiceStubsPlugin...)}), this method + * applies them in two phases: + * + *

    + *
  1. Configuration phase: Each plugin's {@code configureServiceStubs} method is called + * in forward (registration) order to modify the options builder + *
  2. Connection phase: Each plugin's {@code connectServiceClient} method is called in + * reverse order to wrap the connection (first plugin wraps all others) + *
+ * *

Migration Note: This method doesn't respect {@link * WorkflowServiceStubsOptions.Builder#setDisableHealthCheck(boolean)}, {@link * WorkflowServiceStubsOptions.Builder#setHealthCheckAttemptTimeout(Duration)} (boolean)} and @@ -43,8 +55,34 @@ static WorkflowServiceStubs newLocalServiceStubs() { */ static WorkflowServiceStubs newServiceStubs(WorkflowServiceStubsOptions options) { enforceNonWorkflowThread(); - return WorkflowThreadMarker.protectFromWorkflowThread( - new WorkflowServiceStubsImpl(null, options), WorkflowServiceStubs.class); + + WorkflowServiceStubsPlugin[] plugins = options.getPlugins(); + if (plugins == null || plugins.length == 0) { + // No plugins - create stubs directly + return WorkflowThreadMarker.protectFromWorkflowThread( + new WorkflowServiceStubsImpl(null, options), WorkflowServiceStubs.class); + } + + // Apply plugin configuration phase (forward order) + WorkflowServiceStubsOptions.Builder builder = WorkflowServiceStubsOptions.newBuilder(options); + for (WorkflowServiceStubsPlugin plugin : plugins) { + plugin.configureServiceStubs(builder); + } + WorkflowServiceStubsOptions finalOptions = builder.validateAndBuildWithDefaults(); + + // Build connection chain (reverse order for proper nesting) + Supplier connectionChain = + () -> + WorkflowThreadMarker.protectFromWorkflowThread( + new WorkflowServiceStubsImpl(null, finalOptions), WorkflowServiceStubs.class); + + for (int i = plugins.length - 1; i >= 0; i--) { + final Supplier next = connectionChain; + final WorkflowServiceStubsPlugin plugin = plugins[i]; + connectionChain = () -> plugin.connectServiceClient(finalOptions, next); + } + + return connectionChain.get(); } /** diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java index 8663b97b2f..f22a2ac67e 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsOptions.java @@ -37,6 +37,12 @@ public final class WorkflowServiceStubsOptions extends ServiceStubsOptions { /** Retry options for outgoing RPC calls */ private final RpcRetryOptions rpcRetryOptions; + /** Plugins for customizing service stubs configuration and connection */ + private final WorkflowServiceStubsPlugin[] plugins; + + private static final WorkflowServiceStubsPlugin[] EMPTY_PLUGINS = + new WorkflowServiceStubsPlugin[0]; + public static Builder newBuilder() { return new Builder(); } @@ -54,12 +60,14 @@ private WorkflowServiceStubsOptions( boolean disableHealthCheck, Duration rpcLongPollTimeout, Duration rpcQueryTimeout, - RpcRetryOptions rpcRetryOptions) { + RpcRetryOptions rpcRetryOptions, + WorkflowServiceStubsPlugin[] plugins) { super(serviceStubsOptions); this.disableHealthCheck = disableHealthCheck; this.rpcLongPollTimeout = rpcLongPollTimeout; this.rpcQueryTimeout = rpcQueryTimeout; this.rpcRetryOptions = rpcRetryOptions; + this.plugins = plugins; } /** @@ -97,6 +105,15 @@ public RpcRetryOptions getRpcRetryOptions() { return rpcRetryOptions; } + /** + * Returns the service stubs plugins configured for this options. + * + * @return the array of service stubs plugins, never null + */ + public WorkflowServiceStubsPlugin[] getPlugins() { + return plugins; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -105,12 +122,16 @@ public boolean equals(Object o) { return disableHealthCheck == that.disableHealthCheck && Objects.equals(rpcLongPollTimeout, that.rpcLongPollTimeout) && Objects.equals(rpcQueryTimeout, that.rpcQueryTimeout) - && Objects.equals(rpcRetryOptions, that.rpcRetryOptions); + && Objects.equals(rpcRetryOptions, that.rpcRetryOptions) + && Arrays.equals(plugins, that.plugins); } @Override public int hashCode() { - return Objects.hash(disableHealthCheck, rpcLongPollTimeout, rpcQueryTimeout, rpcRetryOptions); + int result = + Objects.hash(disableHealthCheck, rpcLongPollTimeout, rpcQueryTimeout, rpcRetryOptions); + result = 31 * result + Arrays.hashCode(plugins); + return result; } @Override @@ -124,6 +145,8 @@ public String toString() { + rpcQueryTimeout + ", rpcRetryOptions=" + rpcRetryOptions + + ", plugins=" + + Arrays.toString(plugins) + '}'; } @@ -133,6 +156,7 @@ public static class Builder extends ServiceStubsOptions.Builder { private Duration rpcLongPollTimeout = DEFAULT_POLL_RPC_TIMEOUT; private Duration rpcQueryTimeout = DEFAULT_QUERY_RPC_TIMEOUT; private RpcRetryOptions rpcRetryOptions = DefaultStubServiceOperationRpcRetryOptions.INSTANCE; + private WorkflowServiceStubsPlugin[] plugins; private Builder() {} @@ -143,6 +167,7 @@ private Builder(ServiceStubsOptions options) { this.rpcLongPollTimeout = castedOptions.rpcLongPollTimeout; this.rpcQueryTimeout = castedOptions.rpcQueryTimeout; this.rpcRetryOptions = castedOptions.rpcRetryOptions; + this.plugins = castedOptions.plugins; } } @@ -240,6 +265,20 @@ public Builder setRpcRetryOptions(RpcRetryOptions rpcRetryOptions) { return this; } + /** + * Sets the workflow service stubs plugins to use for customizing configuration and connection. + * + *

Plugins that implement both {@code WorkflowServiceStubsPlugin} and {@code + * WorkflowClientPlugin} will be automatically propagated to the workflow client. + * + * @param plugins the plugins to use + * @return this builder + */ + public Builder setPlugins(WorkflowServiceStubsPlugin... plugins) { + this.plugins = Objects.requireNonNull(plugins); + return this; + } + /** * Sets the rpc timeout value for query calls. Default is 10 seconds. * @@ -262,9 +301,21 @@ public WorkflowServiceStubsOptions build() { this.disableHealthCheck, this.rpcLongPollTimeout, this.rpcQueryTimeout, - this.rpcRetryOptions); + this.rpcRetryOptions, + this.plugins); } + /** + * Validates options and builds with defaults applied. + * + *

Note: If plugins are configured via {@link #setPlugins(WorkflowServiceStubsPlugin...)}, + * they will have an opportunity to modify options after this method is called, when the options + * are passed to {@link WorkflowServiceStubs#newServiceStubs(WorkflowServiceStubsOptions)}. This + * means validation performed here occurs before plugin modifications. In most cases, users + * should simply call {@link #build()} and let the service stubs creation handle validation. + * + * @return validated options with defaults applied + */ public WorkflowServiceStubsOptions validateAndBuildWithDefaults() { ServiceStubsOptions serviceStubsOptions = super.validateAndBuildWithDefaults(); RpcRetryOptions retryOptions = @@ -274,7 +325,8 @@ public WorkflowServiceStubsOptions validateAndBuildWithDefaults() { this.disableHealthCheck, this.rpcLongPollTimeout, this.rpcQueryTimeout, - retryOptions); + retryOptions, + this.plugins == null ? EMPTY_PLUGINS : this.plugins); } } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsPlugin.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsPlugin.java new file mode 100644 index 0000000000..ea1ce31294 --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/WorkflowServiceStubsPlugin.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.serviceclient; + +import java.util.function.Supplier; +import javax.annotation.Nonnull; + +/** + * Plugin interface for customizing Temporal workflow service stubs configuration and connection. + * + *

This is a low-level plugin interface for configuring the gRPC connection to the Temporal + * server. For most use cases, use {@code WorkflowClientPlugin} or {@code WorkerPlugin} in the + * temporal-sdk module instead. + * + *

Plugins that implement both {@code WorkflowServiceStubsPlugin} and {@code + * WorkflowClientPlugin} are automatically propagated from the service stubs to the workflow client. + */ +public interface WorkflowServiceStubsPlugin { + + /** + * Returns a unique name for this plugin. Used for logging and duplicate detection. Recommended + * format: "organization.plugin-name" (e.g., "io.temporal.tracing") + * + * @return fully qualified plugin name + */ + @Nonnull + String getName(); + + /** + * Allows the plugin to modify service stubs options before the service stubs are created. + * + * @param builder the options builder to modify + */ + void configureServiceStubs(@Nonnull WorkflowServiceStubsOptions.Builder builder); + + /** + * Allows the plugin to wrap service client connection. + * + * @param options the final options being used for connection + * @param next supplier that creates the service stubs (calls next plugin or actual connection) + * @return the service stubs (possibly wrapped or decorated) + */ + @Nonnull + WorkflowServiceStubs connectServiceClient( + @Nonnull WorkflowServiceStubsOptions options, @Nonnull Supplier next); +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java index b2dadcf059..29163195e2 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java @@ -101,6 +101,26 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) { */ Worker newWorker(String taskQueue, WorkerOptions options); + /** + * Creates a new Worker instance specifically for replay operations. This method should be used + * when replaying workflow histories to ensure plugins receive the replay-specific configuration + * callbacks. + * + *

Unlike {@link #newWorker(String, WorkerOptions)}, this method: + * + *

    + *
  • Calls {@link io.temporal.worker.WorkerPlugin#configureReplayWorker} instead of {@link + * io.temporal.worker.WorkerPlugin#configureWorker} + *
  • Calls {@link io.temporal.worker.WorkerPlugin#initializeReplayWorker} instead of {@link + * io.temporal.worker.WorkerPlugin#initializeWorker} + *
+ * + * @param taskQueue task queue for the replay worker + * @param options Options for configuring the replay worker + * @return Worker configured for replay + */ + Worker newReplayWorker(String taskQueue, WorkerOptions options); + /** Creates a WorkflowClient that is connected to the in-memory test Temporal service. */ WorkflowClient getWorkflowClient(); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index f11525f15b..2780cc8c40 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -130,6 +130,11 @@ public Worker newWorker(String taskQueue, WorkerOptions options) { return workerFactory.newWorker(taskQueue, options); } + @Override + public Worker newReplayWorker(String taskQueue, WorkerOptions options) { + return workerFactory.newReplayWorker(taskQueue, options); + } + @Override public WorkflowClient getWorkflowClient() { WorkflowClientOptions options; diff --git a/temporal-testing/src/main/java/io/temporal/testing/WorkflowReplayer.java b/temporal-testing/src/main/java/io/temporal/testing/WorkflowReplayer.java index 31d461fd59..d1d4572181 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/WorkflowReplayer.java +++ b/temporal-testing/src/main/java/io/temporal/testing/WorkflowReplayer.java @@ -5,6 +5,7 @@ import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.common.WorkflowExecutionHistory; import io.temporal.worker.Worker; +import io.temporal.worker.WorkerOptions; import java.io.File; /** Replays a workflow given its history. Useful for backwards compatibility testing. */ @@ -114,7 +115,32 @@ public static void replayWorkflowExecution( Class workflowClass, Class... moreWorkflowClasses) throws Exception { - TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance(); + replayWorkflowExecution( + history, (TestEnvironmentOptions) null, workflowClass, moreWorkflowClasses); + } + + /** + * Replays workflow from a {@link WorkflowExecutionHistory}. RunId must match the one used + * to generate the serialized history. + * + * @param history object that contains the workflow ids and the events. + * @param testEnvironmentOptions options for the test environment, including any plugins to apply. + * If null, default options are used. + * @param workflowClass workflow implementation class to replay + * @param moreWorkflowClasses optional additional workflow implementation classes + * @throws Exception if replay failed for any reason. + */ + @SuppressWarnings("deprecation") + public static void replayWorkflowExecution( + io.temporal.internal.common.WorkflowExecutionHistory history, + TestEnvironmentOptions testEnvironmentOptions, + Class workflowClass, + Class... moreWorkflowClasses) + throws Exception { + TestWorkflowEnvironment testEnv = + testEnvironmentOptions != null + ? TestWorkflowEnvironment.newInstance(testEnvironmentOptions) + : TestWorkflowEnvironment.newInstance(); try { replayWorkflowExecution(history, testEnv, workflowClass, moreWorkflowClasses); } finally { @@ -138,7 +164,9 @@ public static void replayWorkflowExecution( Class workflowClass, Class... moreWorkflowClasses) throws Exception { - Worker worker = testWorkflowEnvironment.newWorker(getQueueName((history))); + Worker worker = + testWorkflowEnvironment.newReplayWorker( + getQueueName((history)), WorkerOptions.newBuilder().build()); worker.registerWorkflowImplementationTypes( ObjectArrays.concat(moreWorkflowClasses, workflowClass)); replayWorkflowExecution(history, worker); @@ -174,8 +202,36 @@ public static ReplayResults replayWorkflowExecutions( boolean failFast, Class... workflowClasses) throws Exception { - try (TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance()) { - Worker worker = testEnv.newWorker("replay-task-queue-name"); + return replayWorkflowExecutions( + histories, failFast, (TestEnvironmentOptions) null, workflowClasses); + } + + /** + * Replays workflows provided by an iterable. + * + * @param histories The histories to be replayed + * @param failFast If true, throws upon the first error encountered (if any) during replay. If + * false, all histories will be replayed and the returned object contains information about + * any failures. + * @param testEnvironmentOptions options for the test environment, including any plugins to apply. + * If null, default options are used. + * @param workflowClasses workflow implementation classes to register + * @return If `failFast` is false, contains any replay failures encountered. + * @throws Exception If replay failed and `failFast` is true. + */ + @SuppressWarnings("deprecation") + public static ReplayResults replayWorkflowExecutions( + Iterable histories, + boolean failFast, + TestEnvironmentOptions testEnvironmentOptions, + Class... workflowClasses) + throws Exception { + try (TestWorkflowEnvironment testEnv = + testEnvironmentOptions != null + ? TestWorkflowEnvironment.newInstance(testEnvironmentOptions) + : TestWorkflowEnvironment.newInstance()) { + Worker worker = + testEnv.newReplayWorker("replay-task-queue-name", WorkerOptions.newBuilder().build()); worker.registerWorkflowImplementationTypes(workflowClasses); return replayWorkflowExecutions(histories, failFast, worker); }