Skip to content

Commit 637a6b0

Browse files
committed
Implements passing Headers in WorkflowOptions & WorkflowClientInterceptor.
Drafted for discussion for PR #382
1 parent b3fad1d commit 637a6b0

File tree

6 files changed

+213
-2
lines changed

6 files changed

+213
-2
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

+19
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public static WorkflowOptions merge(
6767
.setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
6868
.setMemo(o.getMemo())
6969
.setSearchAttributes(o.getSearchAttributes())
70+
.setHeaders(o.getHeaders())
7071
.setContextPropagators(o.getContextPropagators())
7172
.validateBuildWithDefaults();
7273
}
@@ -93,6 +94,8 @@ public static final class Builder {
9394

9495
private Map<String, Object> searchAttributes;
9596

97+
private Map<String, Object> headers;
98+
9699
private List<ContextPropagator> contextPropagators;
97100

98101
private Builder() {}
@@ -111,6 +114,7 @@ private Builder(WorkflowOptions options) {
111114
this.cronSchedule = options.cronSchedule;
112115
this.memo = options.memo;
113116
this.searchAttributes = options.searchAttributes;
117+
this.headers = options.headers;
114118
this.contextPropagators = options.contextPropagators;
115119
}
116120

@@ -217,6 +221,11 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
217221
return this;
218222
}
219223

224+
public Builder setHeaders(Map<String, Object> headers) {
225+
this.headers = headers;
226+
return this;
227+
}
228+
220229
public WorkflowOptions build() {
221230
return new WorkflowOptions(
222231
workflowId,
@@ -229,6 +238,7 @@ public WorkflowOptions build() {
229238
cronSchedule,
230239
memo,
231240
searchAttributes,
241+
headers,
232242
contextPropagators);
233243
}
234244

@@ -247,6 +257,7 @@ public WorkflowOptions validateBuildWithDefaults() {
247257
cronSchedule,
248258
memo,
249259
searchAttributes,
260+
headers,
250261
contextPropagators);
251262
}
252263
}
@@ -271,6 +282,8 @@ public WorkflowOptions validateBuildWithDefaults() {
271282

272283
private final Map<String, Object> searchAttributes;
273284

285+
private final Map<String, Object> headers;
286+
274287
private final List<ContextPropagator> contextPropagators;
275288

276289
private WorkflowOptions(
@@ -284,6 +297,7 @@ private WorkflowOptions(
284297
String cronSchedule,
285298
Map<String, Object> memo,
286299
Map<String, Object> searchAttributes,
300+
Map<String, Object> headers,
287301
List<ContextPropagator> contextPropagators) {
288302
this.workflowId = workflowId;
289303
this.workflowIdReusePolicy = workflowIdReusePolicy;
@@ -295,6 +309,7 @@ private WorkflowOptions(
295309
this.cronSchedule = cronSchedule;
296310
this.memo = memo;
297311
this.searchAttributes = searchAttributes;
312+
this.headers = headers;
298313
this.contextPropagators = contextPropagators;
299314
}
300315

@@ -338,6 +353,10 @@ public Map<String, Object> getSearchAttributes() {
338353
return searchAttributes;
339354
}
340355

356+
public Map<String, Object> getHeaders() {
357+
return headers;
358+
}
359+
341360
public List<ContextPropagator> getContextPropagators() {
342361
return contextPropagators;
343362
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java

+2
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,6 @@ <R> CompletableFuture<R> getResultAsync(
160160
void terminate(String reason, Object... details);
161161

162162
Optional<WorkflowOptions> getOptions();
163+
164+
void setOptions(WorkflowOptions options);
163165
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class WorkflowStubImpl implements WorkflowStub {
8282
private final Optional<String> workflowType;
8383
private final Scope metricsScope;
8484
private final AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
85-
private final Optional<WorkflowOptions> options;
85+
private Optional<WorkflowOptions> options;
8686
private final WorkflowClientOptions clientOptions;
8787

8888
WorkflowStubImpl(
@@ -235,7 +235,10 @@ private StartWorkflowExecutionRequest newStartWorkflowExecutionRequest(
235235
}
236236
if (o.getContextPropagators() != null && !o.getContextPropagators().isEmpty()) {
237237
Map<String, Payload> context = extractContextsAndConvertToBytes(o.getContextPropagators());
238-
request.setHeader(Header.newBuilder().putAllFields(context));
238+
request.setHeader(
239+
Header.newBuilder()
240+
.putAllFields(context)
241+
.putAllFields(convertMemoFromObjectToBytes(o.getHeaders())));
239242
}
240243
return request.build();
241244
}
@@ -536,6 +539,11 @@ public Optional<WorkflowOptions> getOptions() {
536539
return options;
537540
}
538541

542+
@Override
543+
public void setOptions(WorkflowOptions options) {
544+
this.options = Optional.of(options);
545+
}
546+
539547
private void checkStarted() {
540548
if (execution.get() == null || execution.get().getWorkflowId() == null) {
541549
throw new IllegalStateException("Null workflowId. Was workflow started?");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.temporal.common.interceptors;
2+
3+
import io.temporal.client.WorkflowOptions;
4+
import io.temporal.client.WorkflowStub;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
//This way is bad also because the header is passed when the stub is created, not when the execution is triggered.
9+
//It can be problematic, especially if our context is thread local and stubs are created and executed in different threads
10+
public class SampleOpenTracingLikeInterceptor extends WorkflowClientInterceptorBase {
11+
@Override
12+
public WorkflowStub newUntypedWorkflowStub(
13+
String workflowType, WorkflowOptions options, WorkflowStub next) {
14+
Map<String, Object> originalHeaders = options != null ? options.getHeaders() : null;
15+
Map<String, Object> newHeaders;
16+
17+
if (originalHeaders == null) {
18+
newHeaders = new HashMap<>();
19+
} else {
20+
// we want to copy it, because right now WorkflowOptions exposes the collection itself, so if
21+
// we modify it -
22+
// we will modify a collection inside WorkflowOptions that supposes to be immutable (because
23+
// it has a builder)
24+
newHeaders = new HashMap<>(originalHeaders);
25+
}
26+
newHeaders.put("opentracing", new Object());
27+
WorkflowOptions modifiedOptions =
28+
WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build();
29+
// it's either
30+
// 1. setOption on stub,
31+
// or
32+
// 2. We hack supposedly immutable WorkflowOptions instance and directly modify the header collection.
33+
next.setOptions(modifiedOptions);
34+
return next;
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.temporal.common.interceptors;
2+
3+
import io.temporal.api.common.v1.WorkflowExecution;
4+
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.client.WorkflowStub;
6+
7+
import java.lang.reflect.Type;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
15+
/**
16+
* This is a "better" implementation, that will give right behavior.
17+
* Requires getOptions setOptions on stub to hack it right before the call.
18+
* Check out {@link SampleOpenTracingLikeInterceptor} before this class.
19+
*/
20+
public class SampleOpenTracingLikeInterceptor2 extends WorkflowClientInterceptorBase {
21+
@Override
22+
public WorkflowStub newUntypedWorkflowStub(
23+
String workflowType, WorkflowOptions options, WorkflowStub next) {
24+
return new WorkflowStub() {
25+
private void hackHeaders() {
26+
//We shouldn't use headers that are passed as a parameter to newUntypedWorkflowStub, because
27+
//they can be outdated at a time of call because of the exposed setOptions
28+
WorkflowOptions options = next.getOptions().orElse(null);
29+
Map<String, Object> originalHeaders = options != null ? options.getHeaders() : null;
30+
Map<String, Object> newHeaders;
31+
32+
if (originalHeaders == null) {
33+
newHeaders = new HashMap<>();
34+
} else {
35+
newHeaders = new HashMap<>(originalHeaders);
36+
}
37+
newHeaders.put("opentracing", new Object());
38+
WorkflowOptions modifiedOptions =
39+
WorkflowOptions.newBuilder(options).setHeaders(newHeaders).build();
40+
next.setOptions(modifiedOptions);
41+
}
42+
43+
@Override
44+
public WorkflowExecution start(Object... args) {
45+
hackHeaders();
46+
return next.start(args);
47+
}
48+
49+
@Override
50+
public WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs) {
51+
hackHeaders();
52+
return next.signalWithStart(signalName, signalArgs, startArgs);
53+
}
54+
55+
@Override
56+
public void signal(String signalName, Object... args) {
57+
next.signal(signalName, args);
58+
}
59+
60+
@Override
61+
public Optional<String> getWorkflowType() {
62+
return next.getWorkflowType();
63+
}
64+
65+
@Override
66+
public WorkflowExecution getExecution() {
67+
return next.getExecution();
68+
}
69+
70+
@Override
71+
public <R> R getResult(Class<R> resultClass, Type resultType) {
72+
return next.getResult(resultClass, resultType);
73+
}
74+
75+
@Override
76+
public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType) {
77+
return next.getResultAsync(resultClass, resultType);
78+
}
79+
80+
@Override
81+
public <R> R getResult(Class<R> resultClass) {
82+
return next.getResult(resultClass);
83+
}
84+
85+
@Override
86+
public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
87+
return next.getResultAsync(resultClass);
88+
}
89+
90+
@Override
91+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) throws TimeoutException {
92+
return next.getResult(timeout, unit, resultClass, resultType);
93+
}
94+
95+
@Override
96+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass) throws TimeoutException {
97+
return next.getResult(timeout, unit, resultClass);
98+
}
99+
100+
@Override
101+
public <R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) {
102+
return next.getResultAsync(timeout, unit, resultClass, resultType);
103+
}
104+
105+
@Override
106+
public <R> CompletableFuture<R> getResultAsync(long timeout, TimeUnit unit, Class<R> resultClass) {
107+
return next.getResultAsync(timeout, unit, resultClass);
108+
}
109+
110+
@Override
111+
public <R> R query(String queryType, Class<R> resultClass, Object... args) {
112+
return next.query(queryType, resultClass, args);
113+
}
114+
115+
@Override
116+
public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
117+
return next.query(queryType, resultClass, resultType, args);
118+
}
119+
120+
@Override
121+
public void cancel() {
122+
next.cancel();
123+
}
124+
125+
@Override
126+
public void terminate(String reason, Object... details) {
127+
next.terminate(reason, details);
128+
}
129+
130+
@Override
131+
public Optional<WorkflowOptions> getOptions() {
132+
return next.getOptions();
133+
}
134+
135+
@Override
136+
public void setOptions(WorkflowOptions options) {
137+
next.setOptions(options);
138+
}
139+
};
140+
}
141+
}

temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java

+5
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ public Optional<WorkflowOptions> getOptions() {
371371
return next.getOptions();
372372
}
373373

374+
@Override
375+
public void setOptions(WorkflowOptions options) {
376+
next.setOptions(options);
377+
}
378+
374379
/** Unlocks time skipping before blocking calls and locks back after completion. */
375380
private class TimeLockingFuture<R> extends CompletableFuture<R> {
376381

0 commit comments

Comments
 (0)