diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java index 8fda2f69e..23f6eec44 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java @@ -67,6 +67,7 @@ public static WorkflowOptions merge( .setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class)) .setMemo(o.getMemo()) .setSearchAttributes(o.getSearchAttributes()) + .setHeaders(o.getHeaders()) .setContextPropagators(o.getContextPropagators()) .validateBuildWithDefaults(); } @@ -93,6 +94,8 @@ public static final class Builder { private Map searchAttributes; + private Map headers; + private List contextPropagators; private Builder() {} @@ -111,6 +114,7 @@ private Builder(WorkflowOptions options) { this.cronSchedule = options.cronSchedule; this.memo = options.memo; this.searchAttributes = options.searchAttributes; + this.headers = options.headers; this.contextPropagators = options.contextPropagators; } @@ -217,6 +221,11 @@ public Builder setContextPropagators(List contextPropagators) return this; } + public Builder setHeaders(Map headers) { + this.headers = headers; + return this; + } + public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -229,6 +238,7 @@ public WorkflowOptions build() { cronSchedule, memo, searchAttributes, + headers, contextPropagators); } @@ -247,6 +257,7 @@ public WorkflowOptions validateBuildWithDefaults() { cronSchedule, memo, searchAttributes, + headers, contextPropagators); } } @@ -271,6 +282,8 @@ public WorkflowOptions validateBuildWithDefaults() { private final Map searchAttributes; + private final Map headers; + private final List contextPropagators; private WorkflowOptions( @@ -284,6 +297,7 @@ private WorkflowOptions( String cronSchedule, Map memo, Map searchAttributes, + Map headers, List contextPropagators) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; @@ -295,6 +309,7 @@ private WorkflowOptions( this.cronSchedule = cronSchedule; this.memo = memo; this.searchAttributes = searchAttributes; + this.headers = headers; this.contextPropagators = contextPropagators; } @@ -338,6 +353,10 @@ public Map getSearchAttributes() { return searchAttributes; } + public Map getHeaders() { + return headers; + } + public List getContextPropagators() { return contextPropagators; } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index 3bd203aa2..6b0dd8e21 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -160,4 +160,6 @@ CompletableFuture getResultAsync( void terminate(String reason, Object... details); Optional getOptions(); + + void setOptions(WorkflowOptions options); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java index 69128aeea..e088702be 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java @@ -82,7 +82,7 @@ class WorkflowStubImpl implements WorkflowStub { private final Optional workflowType; private final Scope metricsScope; private final AtomicReference execution = new AtomicReference<>(); - private final Optional options; + private Optional options; private final WorkflowClientOptions clientOptions; WorkflowStubImpl( @@ -235,7 +235,10 @@ private StartWorkflowExecutionRequest newStartWorkflowExecutionRequest( } if (o.getContextPropagators() != null && !o.getContextPropagators().isEmpty()) { Map context = extractContextsAndConvertToBytes(o.getContextPropagators()); - request.setHeader(Header.newBuilder().putAllFields(context)); + request.setHeader( + Header.newBuilder() + .putAllFields(context) + .putAllFields(convertMemoFromObjectToBytes(o.getHeaders()))); } return request.build(); } @@ -536,6 +539,11 @@ public Optional getOptions() { return options; } + @Override + public void setOptions(WorkflowOptions options) { + this.options = Optional.of(options); + } + private void checkStarted() { if (execution.get() == null || execution.get().getWorkflowId() == null) { throw new IllegalStateException("Null workflowId. Was workflow started?"); diff --git a/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor.java b/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor.java new file mode 100644 index 000000000..3b6a5baec --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor.java @@ -0,0 +1,36 @@ +package io.temporal.common.interceptors; + +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import java.util.HashMap; +import java.util.Map; + +//This way is bad also because the header is passed when the stub is created, not when the execution is triggered. +//It can be problematic, especially if our context is thread local and stubs are created and executed in different threads +public class SampleOpenTracingLikeInterceptor extends WorkflowClientInterceptorBase { + @Override + public WorkflowStub newUntypedWorkflowStub( + String workflowType, WorkflowOptions options, WorkflowStub next) { + Map originalHeaders = options != null ? options.getHeaders() : null; + Map newHeaders; + + if (originalHeaders == null) { + newHeaders = new HashMap<>(); + } else { + // we want to copy it, because right now WorkflowOptions exposes the collection itself, so if + // we modify it - + // we will modify a collection inside WorkflowOptions that supposes to be immutable (because + // it has a builder) + newHeaders = new HashMap<>(originalHeaders); + } + newHeaders.put("opentracing", new Object()); + WorkflowOptions modifiedOptions = + WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build(); + // it's either + // 1. setOption on stub, + // or + // 2. We hack supposedly immutable WorkflowOptions instance and directly modify the header collection. + next.setOptions(modifiedOptions); + return next; + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor2.java b/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor2.java new file mode 100644 index 000000000..fc1f07582 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/common/interceptors/SampleOpenTracingLikeInterceptor2.java @@ -0,0 +1,141 @@ +package io.temporal.common.interceptors; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This is a "better" implementation, that will give right behavior. + * Requires getOptions setOptions on stub to hack it right before the call. + * Check out {@link SampleOpenTracingLikeInterceptor} before this class. + */ +public class SampleOpenTracingLikeInterceptor2 extends WorkflowClientInterceptorBase { + @Override + public WorkflowStub newUntypedWorkflowStub( + String workflowType, WorkflowOptions options, WorkflowStub next) { + return new WorkflowStub() { + private void hackHeaders() { + //We shouldn't use headers that are passed as a parameter to newUntypedWorkflowStub, because + //they can be outdated at a time of call because of the exposed setOptions + WorkflowOptions options = next.getOptions().orElse(null); + Map originalHeaders = options != null ? options.getHeaders() : null; + Map newHeaders; + + if (originalHeaders == null) { + newHeaders = new HashMap<>(); + } else { + newHeaders = new HashMap<>(originalHeaders); + } + newHeaders.put("opentracing", new Object()); + WorkflowOptions modifiedOptions = + WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build(); + next.setOptions(modifiedOptions); + } + + @Override + public WorkflowExecution start(Object... args) { + hackHeaders(); + return next.start(args); + } + + @Override + public WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs) { + hackHeaders(); + return next.signalWithStart(signalName, signalArgs, startArgs); + } + + @Override + public void signal(String signalName, Object... args) { + next.signal(signalName, args); + } + + @Override + public Optional getWorkflowType() { + return next.getWorkflowType(); + } + + @Override + public WorkflowExecution getExecution() { + return next.getExecution(); + } + + @Override + public R getResult(Class resultClass, Type resultType) { + return next.getResult(resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync(Class resultClass, Type resultType) { + return next.getResultAsync(resultClass, resultType); + } + + @Override + public R getResult(Class resultClass) { + return next.getResult(resultClass); + } + + @Override + public CompletableFuture getResultAsync(Class resultClass) { + return next.getResultAsync(resultClass); + } + + @Override + public R getResult(long timeout, TimeUnit unit, Class resultClass, Type resultType) throws TimeoutException { + return next.getResult(timeout, unit, resultClass, resultType); + } + + @Override + public R getResult(long timeout, TimeUnit unit, Class resultClass) throws TimeoutException { + return next.getResult(timeout, unit, resultClass); + } + + @Override + public CompletableFuture getResultAsync(long timeout, TimeUnit unit, Class resultClass, Type resultType) { + return next.getResultAsync(timeout, unit, resultClass, resultType); + } + + @Override + public CompletableFuture getResultAsync(long timeout, TimeUnit unit, Class resultClass) { + return next.getResultAsync(timeout, unit, resultClass); + } + + @Override + public R query(String queryType, Class resultClass, Object... args) { + return next.query(queryType, resultClass, args); + } + + @Override + public R query(String queryType, Class resultClass, Type resultType, Object... args) { + return next.query(queryType, resultClass, resultType, args); + } + + @Override + public void cancel() { + next.cancel(); + } + + @Override + public void terminate(String reason, Object... details) { + next.terminate(reason, details); + } + + @Override + public Optional getOptions() { + return next.getOptions(); + } + + @Override + public void setOptions(WorkflowOptions options) { + next.setOptions(options); + } + }; + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index 58a2217b0..c81dc8d14 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -371,6 +371,11 @@ public Optional getOptions() { return next.getOptions(); } + @Override + public void setOptions(WorkflowOptions options) { + next.setOptions(options); + } + /** Unlocks time skipping before blocking calls and locks back after completion. */ private class TimeLockingFuture extends CompletableFuture {