Skip to content

Commit 0809cab

Browse files
committed
Address PR comments
1 parent 8e505b8 commit 0809cab

File tree

6 files changed

+70
-46
lines changed

6 files changed

+70
-46
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public interface WorkflowClientCallsInterceptor {
4343
*/
4444
WorkflowSignalOutput signal(WorkflowSignalInput input);
4545

46-
WorkflowStartOutput signalWithStart(WorkflowStartWithSignalInput input);
46+
WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input);
4747

4848
/**
4949
* @see #getResultAsync if you implement this method, {@link #getResultAsync} most likely needs to
@@ -104,19 +104,32 @@ public WorkflowOptions getOptions() {
104104
}
105105
}
106106

107+
final class WorkflowStartOutput {
108+
private final WorkflowExecution workflowExecution;
109+
110+
public WorkflowStartOutput(WorkflowExecution workflowExecution) {
111+
this.workflowExecution = workflowExecution;
112+
}
113+
114+
public WorkflowExecution getWorkflowExecution() {
115+
return workflowExecution;
116+
}
117+
}
118+
107119
final class WorkflowSignalInput {
108-
private final String workflowId;
120+
private final WorkflowExecution workflowExecution;
109121
private final String signalName;
110122
private final Object[] arguments;
111123

112-
public WorkflowSignalInput(String workflowId, String signalName, Object[] signalArguments) {
113-
this.workflowId = workflowId;
124+
public WorkflowSignalInput(
125+
WorkflowExecution workflowExecution, String signalName, Object[] signalArguments) {
126+
this.workflowExecution = workflowExecution;
114127
this.signalName = signalName;
115128
this.arguments = signalArguments;
116129
}
117130

118-
public String getWorkflowId() {
119-
return workflowId;
131+
public WorkflowExecution getWorkflowExecution() {
132+
return workflowExecution;
120133
}
121134

122135
public String getSignalName() {
@@ -130,12 +143,12 @@ public Object[] getArguments() {
130143

131144
final class WorkflowSignalOutput {}
132145

133-
final class WorkflowStartWithSignalInput {
146+
final class WorkflowSignalWithStartInput {
134147
private final WorkflowStartInput workflowStartInput;
135148
private final String signalName;
136149
private final Object[] signalArguments;
137150

138-
public WorkflowStartWithSignalInput(
151+
public WorkflowSignalWithStartInput(
139152
WorkflowStartInput workflowStartInput, String signalName, Object[] signalArguments) {
140153
this.workflowStartInput = workflowStartInput;
141154
this.signalName = signalName;
@@ -155,15 +168,15 @@ public Object[] getSignalArguments() {
155168
}
156169
}
157170

158-
final class WorkflowStartOutput {
159-
private final WorkflowExecution workflowExecution;
171+
final class WorkflowSignalWithStartOutput {
172+
private final WorkflowStartOutput workflowStartOutput;
160173

161-
public WorkflowStartOutput(WorkflowExecution workflowExecution) {
162-
this.workflowExecution = workflowExecution;
174+
public WorkflowSignalWithStartOutput(WorkflowStartOutput workflowStartOutput) {
175+
this.workflowStartOutput = workflowStartOutput;
163176
}
164177

165-
public WorkflowExecution getWorkflowExecution() {
166-
return workflowExecution;
178+
public WorkflowStartOutput getWorkflowStartOutput() {
179+
return workflowStartOutput;
167180
}
168181
}
169182

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
4141
}
4242

4343
@Override
44-
public WorkflowStartOutput signalWithStart(WorkflowStartWithSignalInput input) {
44+
public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
4545
return next.signalWithStart(input);
4646
}
4747

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
6161
SignalWorkflowExecutionRequest.Builder request =
6262
SignalWorkflowExecutionRequest.newBuilder()
6363
.setSignalName(input.getSignalName())
64-
.setWorkflowExecution(
65-
WorkflowExecution.newBuilder().setWorkflowId(input.getWorkflowId()));
64+
.setWorkflowExecution(input.getWorkflowExecution());
6665

6766
if (clientOptions.getIdentity() != null) {
6867
request.setIdentity(clientOptions.getIdentity());
@@ -78,14 +77,15 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
7877
}
7978

8079
@Override
81-
public WorkflowStartOutput signalWithStart(WorkflowStartWithSignalInput input) {
80+
public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
8281
StartWorkflowExecutionRequest request =
8382
requestsHelper.newStartWorkflowExecutionRequest(input.getWorkflowStartInput());
8483
Optional<Payloads> signalInput =
8584
clientOptions.getDataConverter().toPayloads(input.getSignalArguments());
8685
SignalWithStartWorkflowExecutionParameters p =
8786
new SignalWithStartWorkflowExecutionParameters(request, input.getSignalName(), signalInput);
88-
return new WorkflowStartOutput(genericClient.signalWithStart(p));
87+
return new WorkflowSignalWithStartOutput(
88+
new WorkflowStartOutput(genericClient.signalWithStart(p)));
8989
}
9090

9191
@Override
@@ -153,14 +153,10 @@ public <R> QueryOutput<R> query(QueryInput<R> input) {
153153

154154
@Override
155155
public CancelOutput cancel(CancelInput input) {
156-
// RunId can change if workflow does ContinueAsNew. So we do not set it here and
157-
// let the server figure out the current run.
158156
RequestCancelWorkflowExecutionRequest.Builder request =
159157
RequestCancelWorkflowExecutionRequest.newBuilder()
160158
.setRequestId(UUID.randomUUID().toString())
161-
.setWorkflowExecution(
162-
WorkflowExecution.newBuilder()
163-
.setWorkflowId(input.getWorkflowExecution().getWorkflowId()))
159+
.setWorkflowExecution(input.getWorkflowExecution())
164160
.setNamespace(clientOptions.getNamespace())
165161
.setIdentity(clientOptions.getIdentity());
166162
genericClient.requestCancel(request.build());
@@ -169,14 +165,10 @@ public CancelOutput cancel(CancelInput input) {
169165

170166
@Override
171167
public TerminateOutput terminate(TerminateInput input) {
172-
// RunId can change if workflow does ContinueAsNew. So we do not set it here and
173-
// let the server figure out the current run.
174168
TerminateWorkflowExecutionRequest.Builder request =
175169
TerminateWorkflowExecutionRequest.newBuilder()
176170
.setNamespace(clientOptions.getNamespace())
177-
.setWorkflowExecution(
178-
WorkflowExecution.newBuilder()
179-
.setWorkflowId(input.getWorkflowExecution().getWorkflowId()))
171+
.setWorkflowExecution(input.getWorkflowExecution())
180172
.setReason(input.getReason());
181173
Optional<Payloads> payloads = clientOptions.getDataConverter().toPayloads(input.getDetails());
182174
payloads.ifPresent(request::setDetails);

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowStubImpl.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.TimeUnit;
5050
import java.util.concurrent.TimeoutException;
5151
import java.util.concurrent.atomic.AtomicReference;
52+
import org.apache.commons.lang3.StringUtils;
5253

5354
class WorkflowStubImpl implements WorkflowStub {
5455
private final WorkflowClientOptions clientOptions;
@@ -91,7 +92,7 @@ public void signal(String signalName, Object... args) {
9192
try {
9293
workflowClientInvoker.signal(
9394
new WorkflowClientCallsInterceptor.WorkflowSignalInput(
94-
execution.get().getWorkflowId(), signalName, args));
95+
currentExecutionWithoutRunId(), signalName, args));
9596
} catch (StatusRuntimeException e) {
9697
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
9798
throw new WorkflowNotFoundException(execution.get(), workflowType.orElse(null));
@@ -115,7 +116,7 @@ private WorkflowExecution startWithOptions(WorkflowOptions options, Object... ar
115116
execution.set(workflowExecution);
116117
return workflowExecution;
117118
} catch (StatusRuntimeException e) {
118-
throw wrapExecutionAlreadyStarted(workflowId, workflowType.orElse(null), e);
119+
throw wrapStartException(workflowId, workflowType.orElse(null), e);
119120
} catch (Exception e) {
120121
throw new WorkflowServiceException(execution.get(), workflowType.orElse(null), e);
121122
}
@@ -134,35 +135,38 @@ private WorkflowExecution signalWithStartWithOptions(
134135
checkExecutionIsNotStarted();
135136
String workflowId = getWorkflowIdForStart(options);
136137
try {
137-
WorkflowClientCallsInterceptor.WorkflowStartOutput workflowStartOutput =
138+
WorkflowClientCallsInterceptor.WorkflowSignalWithStartOutput workflowStartOutput =
138139
workflowClientInvoker.signalWithStart(
139-
new WorkflowClientCallsInterceptor.WorkflowStartWithSignalInput(
140+
new WorkflowClientCallsInterceptor.WorkflowSignalWithStartInput(
140141
new WorkflowClientCallsInterceptor.WorkflowStartInput(
141142
workflowId, workflowType.get(), Header.empty(), startArgs, options),
142143
signalName,
143144
signalArgs));
144-
WorkflowExecution workflowExecution = workflowStartOutput.getWorkflowExecution();
145+
WorkflowExecution workflowExecution =
146+
workflowStartOutput.getWorkflowStartOutput().getWorkflowExecution();
145147
execution.set(workflowExecution);
146148
return workflowExecution;
147149
} catch (StatusRuntimeException e) {
148-
throw wrapExecutionAlreadyStarted(workflowId, workflowType.orElse(null), e);
150+
throw wrapStartException(workflowId, workflowType.orElse(null), e);
149151
} catch (Exception e) {
150152
throw new WorkflowServiceException(execution.get(), workflowType.orElse(null), e);
151153
}
152154
}
153155

154-
private RuntimeException wrapExecutionAlreadyStarted(
156+
private RuntimeException wrapStartException(
155157
String workflowId, String workflowType, StatusRuntimeException e) {
158+
WorkflowExecution.Builder executionBuilder =
159+
WorkflowExecution.newBuilder().setWorkflowId(workflowId);
160+
156161
WorkflowExecutionAlreadyStartedFailure f =
157162
StatusUtils.getFailure(e, WorkflowExecutionAlreadyStartedFailure.class);
158163
if (f != null) {
159-
WorkflowExecution exe =
160-
WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(f.getRunId()).build();
164+
WorkflowExecution exe = executionBuilder.setRunId(f.getRunId()).build();
161165
execution.set(exe);
162166
return new WorkflowExecutionAlreadyStarted(exe, workflowType, e);
163167
} else {
164-
// TODO should it be wrapped into WorkflowExecutionAlreadyStarted?
165-
return e;
168+
WorkflowExecution exe = executionBuilder.build();
169+
return new WorkflowServiceException(exe, workflowType, e);
166170
}
167171
}
168172

@@ -347,14 +351,16 @@ public <R> R query(String queryType, Class<R> resultClass, Type resultType, Obje
347351
@Override
348352
public void cancel() {
349353
checkStarted();
350-
workflowClientInvoker.cancel(new WorkflowClientCallsInterceptor.CancelInput(execution.get()));
354+
workflowClientInvoker.cancel(
355+
new WorkflowClientCallsInterceptor.CancelInput(currentExecutionWithoutRunId()));
351356
}
352357

353358
@Override
354359
public void terminate(String reason, Object... details) {
355360
checkStarted();
356361
workflowClientInvoker.terminate(
357-
new WorkflowClientCallsInterceptor.TerminateInput(execution.get(), reason, details));
362+
new WorkflowClientCallsInterceptor.TerminateInput(
363+
currentExecutionWithoutRunId(), reason, details));
358364
}
359365

360366
@Override
@@ -376,4 +382,17 @@ private void checkExecutionIsNotStarted() {
376382
+ "change WorkflowIdReusePolicy from AllowDuplicate or use WorkflowStub.getResult");
377383
}
378384
}
385+
386+
/**
387+
* RunId can change e.g. workflow does ContinueAsNew.
388+
* Emptying runId in workflowExecution allows Temporal server figure out the current run id dynamically.
389+
*/
390+
private WorkflowExecution currentExecutionWithoutRunId() {
391+
WorkflowExecution workflowExecution = execution.get();
392+
if (StringUtils.isEmpty(workflowExecution.getRunId())) {
393+
return workflowExecution;
394+
} else {
395+
return WorkflowExecution.newBuilder(workflowExecution).setRunId("").build();
396+
}
397+
}
379398
}

temporal-sdk/src/test/java/io/temporal/workflow/ChildWorkflowRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
7777
}
7878

7979
@Override
80-
public WorkflowStartOutput signalWithStart(
81-
WorkflowStartWithSignalInput input) {
80+
public WorkflowSignalWithStartOutput signalWithStart(
81+
WorkflowSignalWithStartInput input) {
8282
lastStartedWorkflowType.set(
8383
input.getWorkflowStartInput().getWorkflowType());
8484
return super.signalWithStart(input);

temporal-sdk/src/test/java/io/temporal/workflow/WorkflowTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
152152
}
153153

154154
@Override
155-
public WorkflowStartOutput signalWithStart(
156-
WorkflowStartWithSignalInput input) {
155+
public WorkflowSignalWithStartOutput signalWithStart(
156+
WorkflowSignalWithStartInput input) {
157157
lastStartedWorkflowType.set(
158158
input.getWorkflowStartInput().getWorkflowType());
159159
return super.signalWithStart(input);

0 commit comments

Comments
 (0)