From 96eb34e7b13216012fc219e1a56d12137686ef37 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 30 Sep 2025 09:35:12 -0300 Subject: [PATCH] Align Java API with other languages Signed-off-by: Matheus Cruz --- .../en/java-sdk-docs/java-client/_index.md | 12 +- .../java-workflow/java-workflow-howto.md | 14 +- .../workflows/chain/DemoChainClient.java | 8 +- .../DemoChildWorkerflowClient.java | 8 +- .../compensation/BookTripClient.java | 4 +- .../DemoContinueAsNewClient.java | 2 +- .../DemoExternalEventClient.java | 2 +- .../faninout/DemoFanInOutClient.java | 6 +- .../multiapp/MultiAppWorkflowClient.java | 6 +- .../DemoSuspendResumeClient.java | 8 +- .../workflows/DaprWorkflowsIIT.java | 201 +++++++++++++++++ ...rkflowsIT.java => OldDaprWorkflowsIT.java} | 8 +- .../WorkflowsMultiAppCallActivityIT.java | 43 +++- .../workflows/client/DaprWorkflowClient.java | 102 ++++++++- .../client/WorkflowInstanceStatus.java | 2 + .../dapr/workflows/client/WorkflowState.java | 142 ++++++++++++ .../DefaultWorkflowInstanceStatus.java | 2 + .../runtime/DefaultWorkflowState.java | 210 ++++++++++++++++++ .../client/DaprWorkflowClientTest.java | 14 +- .../workflows/client/WorkflowStateTest.java | 204 +++++++++++++++++ .../orchestrator/CustomersRestController.java | 7 +- .../wfp/WorkflowPatternsRestController.java | 30 +-- 22 files changed, 968 insertions(+), 67 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIIT.java rename sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/{DaprWorkflowsIT.java => OldDaprWorkflowsIT.java} (96%) create mode 100644 sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowState.java create mode 100644 sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowState.java create mode 100644 sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowStateTest.java diff --git a/daprdocs/content/en/java-sdk-docs/java-client/_index.md b/daprdocs/content/en/java-sdk-docs/java-client/_index.md index 8199824a26..4c9ddb305c 100644 --- a/daprdocs/content/en/java-sdk-docs/java-client/_index.md +++ b/daprdocs/content/en/java-sdk-docs/java-client/_index.md @@ -514,7 +514,7 @@ public class DemoWorkflowClient { System.out.println(separatorStr); System.out.println("**GetInstanceMetadata:Running Workflow**"); - WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true); + WorkflowInstanceStatus workflowMetadata = client.getWorkflowState(instanceId, true); System.out.printf("Result: %s%n", workflowMetadata); System.out.println(separatorStr); @@ -545,13 +545,13 @@ public class DemoWorkflowClient { System.out.println(separatorStr); - System.out.println("**WaitForInstanceCompletion**"); + System.out.println("**waitForWorkflowCompletion**"); try { - WorkflowInstanceStatus waitForInstanceCompletionResult = - client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); - System.out.printf("Result: %s%n", waitForInstanceCompletionResult); + WorkflowInstanceStatus waitForWorkflowCompletionResult = + client.waitForWorkflowCompletion(instanceId, Duration.ofSeconds(60), true); + System.out.printf("Result: %s%n", waitForWorkflowCompletionResult); } catch (TimeoutException ex) { - System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); + System.out.printf("waitForWorkflowCompletion has an exception:%s%n", ex); } System.out.println(separatorStr); diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index ccc365cf42..42710e40e0 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -104,7 +104,7 @@ public class DemoWorkflowClient { System.out.println(separatorStr); System.out.println("**GetInstanceMetadata:Running Workflow**"); - WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true); + WorkflowInstanceStatus workflowMetadata = client.getWorkflowState(instanceId, true); System.out.printf("Result: %s%n", workflowMetadata); System.out.println(separatorStr); @@ -135,13 +135,13 @@ public class DemoWorkflowClient { System.out.println(separatorStr); - System.out.println("**WaitForInstanceCompletion**"); + System.out.println("**waitForWorkflowCompletion**"); try { - WorkflowInstanceStatus waitForInstanceCompletionResult = - client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); - System.out.printf("Result: %s%n", waitForInstanceCompletionResult); + WorkflowInstanceStatus waitForWorkflowCompletionResult = + client.waitForWorkflowCompletion(instanceId, Duration.ofSeconds(60), true); + System.out.printf("Result: %s%n", waitForWorkflowCompletionResult); } catch (TimeoutException ex) { - System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); + System.out.printf("waitForWorkflowCompletion has an exception:%s%n", ex); } System.out.println(separatorStr); @@ -213,7 +213,7 @@ Events raised for workflow with instanceId: 0b4cc0d5-413a-4c1c-816a-a71fa24740d4 ** Registering Event to be captured by anyOf(t1,t2,t3) ** Event raised for workflow with instanceId: 0b4cc0d5-413a-4c1c-816a-a71fa24740d4 ******* -**WaitForInstanceCompletion** +**WaitForWorkflowCompletion** Result: [Name: 'io.dapr.examples.workflows.DemoWorkflow', ID: '0b4cc0d5-413a-4c1c-816a-a71fa24740d4', RuntimeStatus: FAILED, CreatedAt: 2023-09-13T13:02:30.547Z, LastUpdatedAt: 2023-09-13T13:02:55.054Z, Input: '"input data"', Output: ''] ******* **purgeInstance** diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java index 334e40f8df..b59544f0ef 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainClient.java @@ -16,7 +16,7 @@ import io.dapr.examples.workflows.utils.PropertyUtils; import io.dapr.examples.workflows.utils.RetryUtils; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -34,10 +34,10 @@ public static void main(String[] args) { Duration.ofSeconds(60)); System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId); - WorkflowInstanceStatus workflowInstanceStatus = - client.waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowState = + client.waitForWorkflowCompletion(instanceId, null, true); - String result = workflowInstanceStatus.readOutputAs(String.class); + String result = workflowState.readOutputAs(String.class); System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result); } catch (TimeoutException | InterruptedException e) { throw new RuntimeException(e); diff --git a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkerflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkerflowClient.java index 80f647c17c..b09df8ad73 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkerflowClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkerflowClient.java @@ -15,7 +15,7 @@ import io.dapr.examples.workflows.utils.PropertyUtils; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.util.concurrent.TimeoutException; @@ -30,10 +30,10 @@ public static void main(String[] args) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) { String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class); System.out.printf("Started a new child-workflow model workflow with instance ID: %s%n", instanceId); - WorkflowInstanceStatus workflowInstanceStatus = - client.waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowState = + client.waitForWorkflowCompletion(instanceId, null, true); - String result = workflowInstanceStatus.readOutputAs(String.class); + String result = workflowState.readOutputAs(String.class); System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result); } catch (TimeoutException | InterruptedException e) { diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java index b7c4760e52..d827c99e6f 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java @@ -16,7 +16,7 @@ import io.dapr.examples.workflows.utils.PropertyUtils; import io.dapr.examples.workflows.utils.RetryUtils; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -27,7 +27,7 @@ public static void main(String[] args) { String instanceId = RetryUtils.callWithRetry(() -> client.scheduleNewWorkflow(BookTripWorkflow.class), Duration.ofSeconds(60)); System.out.printf("Started a new trip booking workflow with instance ID: %s%n", instanceId); - WorkflowInstanceStatus status = client.waitForInstanceCompletion(instanceId, Duration.ofMinutes(30), true); + WorkflowState status = client.waitForWorkflowCompletion(instanceId, Duration.ofMinutes(30), true); System.out.printf("Workflow instance with ID: %s completed with status: %s%n", instanceId, status); System.out.printf("Workflow output: %s%n", status.getSerializedOutput()); } catch (TimeoutException | InterruptedException e) { diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java index 5827fa2c20..99b52fc869 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewClient.java @@ -30,7 +30,7 @@ public static void main(String[] args) { String instanceId = client.scheduleNewWorkflow(DemoContinueAsNewWorkflow.class); System.out.printf("Started a new continue-as-new model workflow with instance ID: %s%n", instanceId); - client.waitForInstanceCompletion(instanceId, null, true); + client.waitForWorkflowCompletion(instanceId, null, true); System.out.printf("workflow instance with ID: %s completed.", instanceId); } catch (TimeoutException | InterruptedException e) { diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java index f827f2f709..9d4bda4552 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventClient.java @@ -33,7 +33,7 @@ public static void main(String[] args) { client.raiseEvent(instanceId, "Approval", true); //client.raiseEvent(instanceId, "Approval", false); - client.waitForInstanceCompletion(instanceId, null, true); + client.waitForWorkflowCompletion(instanceId, null, true); System.out.printf("workflow instance with ID: %s completed.", instanceId); } catch (TimeoutException | InterruptedException e) { diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java index 871b15cfe4..8346b957cf 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutClient.java @@ -16,7 +16,7 @@ import io.dapr.examples.workflows.utils.PropertyUtils; import io.dapr.examples.workflows.utils.RetryUtils; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.time.Duration; import java.util.Arrays; @@ -48,12 +48,12 @@ public static void main(String[] args) throws InterruptedException { System.out.printf("Started a new fan out/fan in model workflow with instance ID: %s%n", instanceId); // Block until the orchestration completes. Then print the final status, which includes the output. - WorkflowInstanceStatus workflowInstanceStatus = client.waitForInstanceCompletion( + WorkflowState workflowState = client.waitForWorkflowCompletion( instanceId, Duration.ofSeconds(30), true); System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, - workflowInstanceStatus.readOutputAs(int.class)); + workflowState.readOutputAs(int.class)); } catch (TimeoutException e) { throw new RuntimeException(e); } diff --git a/examples/src/main/java/io/dapr/examples/workflows/multiapp/MultiAppWorkflowClient.java b/examples/src/main/java/io/dapr/examples/workflows/multiapp/MultiAppWorkflowClient.java index 63ec49ca2e..dfac32b719 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/multiapp/MultiAppWorkflowClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/multiapp/MultiAppWorkflowClient.java @@ -14,7 +14,7 @@ package io.dapr.examples.workflows.multiapp; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.util.concurrent.TimeoutException; @@ -48,8 +48,8 @@ public static void main(String[] args) { // Wait for the workflow to complete System.out.println("Waiting for workflow completion..."); - WorkflowInstanceStatus workflowInstanceStatus = - client.waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowInstanceStatus = + client.waitForWorkflowCompletion(instanceId, null, true); // Get the result String result = workflowInstanceStatus.readOutputAs(String.class); diff --git a/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java index 5b94b5fa5a..64019fa125 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java +++ b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java @@ -17,7 +17,7 @@ import io.dapr.examples.workflows.utils.PropertyUtils; import io.dapr.examples.workflows.utils.RetryUtils; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -38,21 +38,21 @@ public static void main(String[] args) { System.out.printf("Suspending Workflow Instance: %s%n", instanceId ); client.suspendWorkflow(instanceId, "suspending workflow instance."); - WorkflowInstanceStatus instanceState = client.getInstanceState(instanceId, false); + WorkflowState instanceState = client.getWorkflowState(instanceId, false); assert instanceState != null; System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() ); System.out.printf("Let's resume the Workflow Instance before sending the external event: %s%n", instanceId ); client.resumeWorkflow(instanceId, "resuming workflow instance."); - instanceState = client.getInstanceState(instanceId, false); + instanceState = client.getWorkflowState(instanceId, false); assert instanceState != null; System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() ); System.out.printf("Now that the instance is RUNNING again, lets send the external event. %n"); client.raiseEvent(instanceId, "Approval", true); - client.waitForInstanceCompletion(instanceId, null, true); + client.waitForWorkflowCompletion(instanceId, null, true); System.out.printf("workflow instance with ID: %s completed.", instanceId); } catch (TimeoutException | InterruptedException e) { diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIIT.java new file mode 100644 index 0000000000..abe6c1f2e4 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIIT.java @@ -0,0 +1,201 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.workflows; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowRuntimeStatus; +import io.dapr.workflows.client.WorkflowState; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest( + webEnvironment = WebEnvironment.RANDOM_PORT, + classes = { + TestWorkflowsConfiguration.class, + TestWorkflowsApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class DaprWorkflowsIIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("workflow-dapr-app") + .withNetwork(DAPR_NETWORK) + .withComponent(new Component("kvstore", "state.in-memory", "v1", + Map.of("actorStateStore", "true"))) + .withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap())) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal"); + + /** + * Expose the Dapr ports to the host. + * + * @param registry the dynamic property registry + */ + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); + } + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Autowired + private DaprWorkflowClient workflowClient; + + @Autowired + private WorkflowRuntimeBuilder workflowRuntimeBuilder; + + /** + * Initializes the test. + */ + @BeforeEach + public void init() { + WorkflowRuntime runtime = workflowRuntimeBuilder.build(); + System.out.println("Start workflow runtime"); + runtime.start(false); + } + + @Test + public void testWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload); + + workflowClient.waitForWorkflowCompletion(instanceId, Duration.ofSeconds(10), false); + workflowClient.raiseEvent(instanceId, "MoveForward", payload); + + Duration timeout = Duration.ofSeconds(10); + WorkflowState workflowStatus = workflowClient.waitForWorkflowCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(2, workflowOutput.getPayloads().size()); + assertEquals("First Activity", workflowOutput.getPayloads().get(0)); + assertEquals("Second Activity", workflowOutput.getPayloads().get(1)); + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + + @Test + public void testSuspendAndResumeWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload); + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(10), false); + + workflowClient.suspendWorkflow(instanceId, "testing suspend."); + + + WorkflowState instanceState = workflowClient.getWorkflowState(instanceId, false); + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.SUSPENDED, instanceState.getRuntimeStatus()); + + workflowClient.resumeWorkflow(instanceId, "testing resume"); + + instanceState = workflowClient.getWorkflowState(instanceId, false); + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.RUNNING, instanceState.getRuntimeStatus()); + + workflowClient.raiseEvent(instanceId, "MoveForward", payload); + + Duration timeout = Duration.ofSeconds(10); + instanceState = workflowClient.waitForWorkflowCompletion(instanceId, timeout, true); + + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.COMPLETED, instanceState.getRuntimeStatus()); + + } + + @Test + public void testNamedActivitiesWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestNamedActivitiesWorkflow.class, payload); + + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(10), false); + + Duration timeout = Duration.ofSeconds(10); + WorkflowState workflowStatus = workflowClient.waitForWorkflowCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(5, workflowOutput.getPayloads().size()); + assertEquals("First Activity", workflowOutput.getPayloads().get(0)); + assertEquals("First Activity", workflowOutput.getPayloads().get(1)); + assertEquals("Second Activity", workflowOutput.getPayloads().get(2)); + assertEquals("Anonymous Activity", workflowOutput.getPayloads().get(3)); + assertEquals("Anonymous Activity 2", workflowOutput.getPayloads().get(4)); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + + @Test + public void testExecutionKeyWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); + + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(100), false); + + Duration timeout = Duration.ofSeconds(1000); + WorkflowState workflowStatus = workflowClient.waitForWorkflowCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(1, workflowOutput.getPayloads().size()); + assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); + + assertTrue(KeyStore.getInstance().size() == 1); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { + return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/OldDaprWorkflowsIT.java similarity index 96% rename from sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java rename to sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/OldDaprWorkflowsIT.java index db531d5146..4764d2aad2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/OldDaprWorkflowsIT.java @@ -46,6 +46,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +/** + * This class tests the old API (not aligned with other languages). + *

+ * See https://github.com/dapr/java-sdk/issues/1554. + */ @SpringBootTest( webEnvironment = WebEnvironment.RANDOM_PORT, classes = { @@ -55,7 +60,8 @@ ) @Testcontainers @Tag("testcontainers") -public class DaprWorkflowsIT { +@Deprecated(forRemoval = true) +public class OldDaprWorkflowsIT { private static final Network DAPR_NETWORK = Network.newNetwork(); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppCallActivityIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppCallActivityIT.java index dfa591abfd..6b6c929d7f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppCallActivityIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppCallActivityIT.java @@ -22,7 +22,7 @@ import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowRuntimeStatus; import io.dapr.config.Properties; -import net.bytebuddy.utility.dispatcher.JavaDispatcher; +import io.dapr.workflows.client.WorkflowState; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Network; @@ -175,7 +175,46 @@ public void testMultiAppWorkflow() throws Exception { try { String instanceId = workflowClient.scheduleNewWorkflow(MultiAppWorkflow.class, input); assertNotNull(instanceId, "Workflow instance ID should not be null"); - workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(30), false); + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(30), false); + + WorkflowState workflowStatus = workflowClient.waitForWorkflowCompletion(instanceId, null, true); + assertNotNull(workflowStatus, "Workflow status should not be null"); + assertEquals(WorkflowRuntimeStatus.COMPLETED, workflowStatus.getRuntimeStatus(), + "Workflow should complete successfully"); + String workflowOutput = workflowStatus.readOutputAs(String.class); + assertEquals(expectedOutput, workflowOutput, "Workflow output should match expected result"); + } finally { + workflowClient.close(); + } + } + + /** + * It duplicates the {@link #testMultiAppWorkflow()} due to deprecated APIs. + * It must be deleted after {@link WorkflowInstanceStatus} be removed. + */ + @Test + @Deprecated(forRemoval = true) + public void testMultiAppWorkflowOldApi() throws Exception { + // TestContainers wait strategies ensure all containers are ready before this test runs + + String input = "Hello World"; + String expectedOutput = "HELLO WORLD [TRANSFORMED BY APP2] [FINALIZED BY APP3]"; + + // Create workflow client connected to the main workflow orchestrator + // Use the same endpoint configuration that the workers use + // The workers use host.testcontainers.internal:50001 + Map propertyOverrides = Map.of( + "dapr.grpc.endpoint", MAIN_WORKFLOW_SIDECAR.getGrpcEndpoint(), + "dapr.http.endpoint", MAIN_WORKFLOW_SIDECAR.getHttpEndpoint() + ); + + Properties clientProperties = new Properties(propertyOverrides); + DaprWorkflowClient workflowClient = new DaprWorkflowClient(clientProperties); + + try { + String instanceId = workflowClient.scheduleNewWorkflow(MultiAppWorkflow.class, input); + assertNotNull(instanceId, "Workflow instance ID should not be null"); + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(30), false); WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, null, true); assertNotNull(workflowStatus, "Workflow status should not be null"); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index d8b94edbeb..935ca772db 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -23,6 +23,7 @@ import io.dapr.workflows.Workflow; import io.dapr.workflows.internal.ApiTokenClientInterceptor; import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus; +import io.dapr.workflows.runtime.DefaultWorkflowState; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; @@ -116,8 +117,8 @@ public String scheduleNewWorkflow(Class clazz, Object in /** * Schedules a new workflow with a specified set of options for execution. * - * @param any Workflow type - * @param clazz Class extending Workflow to start an instance of. + * @param any Workflow type + * @param clazz Class extending Workflow to start an instance of. * @param options the options for the new workflow, including input, instance ID, etc. * @return the instanceId parameter value. */ @@ -165,14 +166,31 @@ public void terminateWorkflow(String workflowInstanceId, @Nullable Object output * @param getInputsAndOutputs true to fetch the workflow instance's * inputs, outputs, and custom status, or false to omit them * @return a metadata record that describes the workflow instance and it execution status, or a default instance + * @deprecated Use {@link #getWorkflowState(String, boolean)} instead. */ @Nullable + @Deprecated(forRemoval = true) public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInputsAndOutputs) { OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs); return metadata == null ? null : new DefaultWorkflowInstanceStatus(metadata); } + /** + * Fetches workflow instance metadata from the configured durable store. + * + * @param instanceId the unique ID of the workflow instance to fetch + * @param getInputsAndOutputs true to fetch the workflow instance's + * inputs, outputs, and custom status, or false to omit them + * @return a metadata record that describes the workflow instance and it execution status, or a default instance + */ + @Nullable + public WorkflowState getWorkflowState(String instanceId, boolean getInputsAndOutputs) { + OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs); + + return metadata == null ? null : new DefaultWorkflowState(metadata); + } + /** * Waits for an workflow to start running and returns an * {@link WorkflowInstanceStatus} object that contains metadata about the started @@ -189,7 +207,9 @@ public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInp * inputs, outputs, and custom status, or false to omit them * @return the workflow instance metadata or null if no such instance is found * @throws TimeoutException when the workflow instance is not started within the specified amount of time + * @deprecated Use {@link #waitForWorkflowStart(String, Duration, boolean)} instead. */ + @Deprecated(forRemoval = true) @Nullable public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs) throws TimeoutException { @@ -199,6 +219,33 @@ public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration t return metadata == null ? null : new DefaultWorkflowInstanceStatus(metadata); } + + /** + * Waits for a workflow to start running and returns an + * {@link WorkflowState} object that contains metadata about the started + * instance and optionally its input, output, and custom status payloads. + * + *

A "started" workflow instance is any instance not in the Pending state. + * + *

If an workflow instance is already running when this method is called, + * the method will return immediately. + * + * @param instanceId the unique ID of the workflow instance to wait for + * @param timeout the amount of time to wait for the workflow instance to start + * @param getInputsAndOutputs true to fetch the workflow instance's + * inputs, outputs, and custom status, or false to omit them + * @return the workflow instance metadata or null if no such instance is found + * @throws TimeoutException when the workflow instance is not started within the specified amount of time + */ + @Nullable + public DefaultWorkflowState waitForWorkflowStart(String instanceId, Duration timeout, boolean getInputsAndOutputs) + throws TimeoutException { + + OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs); + + return metadata == null ? null : new DefaultWorkflowState(metadata); + } + /** * Waits for an workflow to complete and returns an {@link WorkflowInstanceStatus} object that contains * metadata about the completed instance. @@ -217,16 +264,47 @@ public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration t * status, or false to omit them * @return the workflow instance metadata or null if no such instance is found * @throws TimeoutException when the workflow instance is not completed within the specified amount of time + * @deprecated Use {@link #waitForWorkflowCompletion(String, Duration, boolean)} instead. */ @Nullable + @Deprecated(forRemoval = true) public WorkflowInstanceStatus waitForInstanceCompletion(String instanceId, Duration timeout, - boolean getInputsAndOutputs) throws TimeoutException { + boolean getInputsAndOutputs) throws TimeoutException { OrchestrationMetadata metadata = this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs); return metadata == null ? null : new DefaultWorkflowInstanceStatus(metadata); } + + /** + * Waits for an workflow to complete and returns an {@link WorkflowState} object that contains + * metadata about the completed instance. + * + *

A "completed" workflow instance is any instance in one of the terminal states. For example, the + * Completed, Failed, or Terminated states. + * + *

Workflows are long-running and could take hours, days, or months before completing. + * Workflows can also be eternal, in which case they'll never complete unless terminated. + * In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are used. + * If an workflow instance is already complete when this method is called, the method will return immediately. + * + * @param instanceId the unique ID of the workflow instance to wait for + * @param timeout the amount of time to wait for the workflow instance to complete + * @param getInputsAndOutputs true to fetch the workflow instance's inputs, outputs, and custom + * status, or false to omit them + * @return the workflow instance metadata or null if no such instance is found + * @throws TimeoutException when the workflow instance is not completed within the specified amount of time + */ + @Nullable + public WorkflowState waitForWorkflowCompletion(String instanceId, Duration timeout, + boolean getInputsAndOutputs) throws TimeoutException { + + OrchestrationMetadata metadata = this.innerClient.waitForInstanceCompletion(instanceId, timeout, + getInputsAndOutputs); + return metadata == null ? null : new DefaultWorkflowState(metadata); + } + /** * Sends an event notification message to awaiting workflow instance. * @@ -243,7 +321,9 @@ public void raiseEvent(String workflowInstanceId, String eventName, Object event * * @param workflowInstanceId The unique ID of the workflow instance to purge. * @return Return true if the workflow state was found and purged successfully otherwise false. + * @deprecated Use {@link #purgeWorkflow(String)} instead. */ + @Deprecated(forRemoval = true) public boolean purgeInstance(String workflowInstanceId) { PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId); @@ -254,6 +334,22 @@ public boolean purgeInstance(String workflowInstanceId) { return false; } + /** + * Purges workflow instance state from the workflow state store. + * + * @param workflowInstanceId The unique ID of the workflow instance to purge. + * @return Return true if the workflow state was found and purged successfully otherwise false. + */ + public boolean purgeWorkflow(String workflowInstanceId) { + PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId); + + if (result != null) { + return result.getDeletedInstanceCount() > 0; + } + + return false; + } + /** * Closes the inner DurableTask client and shutdown the GRPC channel. */ diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java index de4d3bdd3f..bdcd0087f5 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowInstanceStatus.java @@ -20,7 +20,9 @@ /** * Represents a snapshot of a workflow instance's current state, including * metadata. + * @deprecated Use {@link WorkflowState} instead. */ +@Deprecated(forRemoval = true) public interface WorkflowInstanceStatus { /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowState.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowState.java new file mode 100644 index 0000000000..282d1d73ee --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowState.java @@ -0,0 +1,142 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import javax.annotation.Nullable; + +import java.time.Instant; + +/** + * Represents a snapshot of a workflow instance's current state, including + * metadata. + */ +public interface WorkflowState { + + /** + * Gets the name of the workflow. + * + * @return the name of the workflow + */ + String getName(); + + /** + * Gets the unique ID of the workflow instance. + * + * @return the unique ID of the workflow instance + */ + String getWorkflowId(); + + /** + * Gets the current runtime status of the workflow instance at the time this + * object was fetched. + * + * @return the current runtime status of the workflow instance at the time this object was fetched + */ + WorkflowRuntimeStatus getRuntimeStatus(); + + /** + * Gets the workflow instance's creation time in UTC. + * + * @return the workflow instance's creation time in UTC + */ + Instant getCreatedAt(); + + /** + * Gets the workflow instance's last updated time in UTC. + * + * @return the workflow instance's last updated time in UTC + */ + Instant getLastUpdatedAt(); + + /** + * Gets the workflow instance's serialized input, if any, as a string value. + * + * @return the workflow instance's serialized input or {@code null} + */ + String getSerializedInput(); + + /** + * Gets the workflow instance's serialized output, if any, as a string value. + * + * @return the workflow instance's serialized output or {@code null} + */ + String getSerializedOutput(); + + /** + * Gets the failure details, if any, for the failed workflow instance. + * + *

This method returns data only if the workflow is in the + * {@link WorkflowFailureDetails} failureDetails, + * and only if this instance metadata was fetched with the option to include + * output data. + * + * @return the failure details of the failed workflow instance or {@code null} + */ + @Nullable + WorkflowFailureDetails getFailureDetails(); + + /** + * Gets a value indicating whether the workflow instance was running at the time + * this object was fetched. + * + * @return {@code true} if the workflow existed and was in a running state otherwise {@code false} + */ + boolean isRunning(); + + /** + * Gets a value indicating whether the workflow instance was completed at the + * time this object was fetched. + * + *

A workflow instance is considered completed when its runtime status value is + * {@link WorkflowRuntimeStatus#COMPLETED}, + * {@link WorkflowRuntimeStatus#FAILED}, or + * {@link WorkflowRuntimeStatus#TERMINATED}. + * + * @return {@code true} if the workflow was in a terminal state; otherwise {@code false} + */ + boolean isCompleted(); + + /** + * Deserializes the workflow's input into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient object that created this workflow + * metadata object. + * + * @param type the class associated with the type to deserialize the input data + * into + * @param the type to deserialize the input data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + T readInputAs(Class type); + + /** + * Deserializes the workflow's output into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient + * object that created this workflow metadata object. + * + * @param type the class associated with the type to deserialize the output data + * into + * @param the type to deserialize the output data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + T readOutputAs(Class type); + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java index 392357bc32..2c63dc9451 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java @@ -27,7 +27,9 @@ /** * Represents a snapshot of a workflow instance's current state, including * metadata. + * @deprecated Use {@link DefaultWorkflowState} instead. */ +@Deprecated(forRemoval = true) public class DefaultWorkflowInstanceStatus implements WorkflowInstanceStatus { private final OrchestrationMetadata orchestrationMetadata; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowState.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowState.java new file mode 100644 index 0000000000..78420d4c81 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowState.java @@ -0,0 +1,210 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.OrchestrationRuntimeStatus; +import io.dapr.workflows.client.WorkflowFailureDetails; +import io.dapr.workflows.client.WorkflowRuntimeStatus; +import io.dapr.workflows.client.WorkflowState; + +import javax.annotation.Nullable; + +import java.time.Instant; + +/** + * Represents a snapshot of a workflow instance's current state, including + * metadata. + */ +public class DefaultWorkflowState implements WorkflowState { + + private final OrchestrationMetadata orchestrationMetadata; + + @Nullable + private final WorkflowFailureDetails failureDetails; + + /** + * Class constructor. + * + * @param orchestrationMetadata Durable task orchestration metadata + */ + public DefaultWorkflowState(OrchestrationMetadata orchestrationMetadata) { + if (orchestrationMetadata == null) { + throw new IllegalArgumentException("OrchestrationMetadata cannot be null"); + } + this.orchestrationMetadata = orchestrationMetadata; + + FailureDetails details = orchestrationMetadata.getFailureDetails(); + + if (details != null) { + this.failureDetails = new DefaultWorkflowFailureDetails(details); + } else { + this.failureDetails = null; + } + } + + /** + * Gets the name of the workflow. + * + * @return the name of the workflow + */ + public String getName() { + return orchestrationMetadata.getName(); + } + + /** + * Gets the unique ID of the workflow instance. + * + * @return the unique ID of the workflow instance + */ + public String getWorkflowId() { + return orchestrationMetadata.getInstanceId(); + } + + /** + * Gets the current runtime status of the workflow instance at the time this + * object was fetched. + * + * @return the current runtime status of the workflow instance at the time this object was fetched + */ + public WorkflowRuntimeStatus getRuntimeStatus() { + OrchestrationRuntimeStatus status = orchestrationMetadata.getRuntimeStatus(); + + return WorkflowRuntimeStatusConverter.fromOrchestrationRuntimeStatus(status); + } + + /** + * Gets the workflow instance's creation time in UTC. + * + * @return the workflow instance's creation time in UTC + */ + public Instant getCreatedAt() { + return orchestrationMetadata.getCreatedAt(); + } + + /** + * Gets the workflow instance's last updated time in UTC. + * + * @return the workflow instance's last updated time in UTC + */ + public Instant getLastUpdatedAt() { + return orchestrationMetadata.getLastUpdatedAt(); + } + + /** + * Gets the workflow instance's serialized input, if any, as a string value. + * + * @return the workflow instance's serialized input or {@code null} + */ + public String getSerializedInput() { + return orchestrationMetadata.getSerializedInput(); + } + + /** + * Gets the workflow instance's serialized output, if any, as a string value. + * + * @return the workflow instance's serialized output or {@code null} + */ + public String getSerializedOutput() { + return orchestrationMetadata.getSerializedOutput(); + } + + /** + * Gets the failure details, if any, for the failed workflow instance. + * + *

This method returns data only if the workflow is in the + * {@link OrchestrationRuntimeStatus#FAILED} state, + * and only if this instance metadata was fetched with the option to include + * output data. + * + * @return the failure details of the failed workflow instance or {@code null} + */ + @Nullable + public WorkflowFailureDetails getFailureDetails() { + return this.failureDetails; + } + + /** + * Gets a value indicating whether the workflow instance was running at the time + * this object was fetched. + * + * @return {@code true} if the workflow existed and was in a running state otherwise {@code false} + */ + public boolean isRunning() { + return orchestrationMetadata.isRunning(); + } + + /** + * Gets a value indicating whether the workflow instance was completed at the + * time this object was fetched. + * + *

A workflow instance is considered completed when its runtime status value is + * {@link WorkflowRuntimeStatus#COMPLETED}, + * {@link WorkflowRuntimeStatus#FAILED}, or + * {@link WorkflowRuntimeStatus#TERMINATED}. + * + * @return {@code true} if the workflow was in a terminal state; otherwise {@code false} + */ + public boolean isCompleted() { + return orchestrationMetadata.isCompleted(); + } + + /** + * Deserializes the workflow's input into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient object that created this workflow + * metadata object. + * + * @param type the class associated with the type to deserialize the input data + * into + * @param the type to deserialize the input data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + public T readInputAs(Class type) { + return orchestrationMetadata.readInputAs(type); + } + + /** + * Deserializes the workflow's output into an object of the specified type. + * + *

Deserialization is performed using the DataConverter that was + * configured on the DurableTaskClient + * object that created this workflow metadata object. + * + * @param type the class associated with the type to deserialize the output data + * into + * @param the type to deserialize the output data into + * @return the deserialized input value + * @throws IllegalStateException if the metadata was fetched without the option + * to read inputs and outputs + */ + public T readOutputAs(Class type) { + return orchestrationMetadata.readOutputAs(type); + } + + /** + * Generates a user-friendly string representation of the current metadata + * object. + * + * @return a user-friendly string representation of the current metadata object + */ + public String toString() { + return orchestrationMetadata.toString(); + } + +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java index 55f7c9fddc..71fa93f0d2 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -156,12 +156,12 @@ public void getInstanceMetadata() { when(mockInnerClient.getInstanceMetadata(instanceId, true)).thenReturn(expectedMetadata); // Act - WorkflowInstanceStatus metadata = client.getInstanceState(instanceId, true); + WorkflowState metadata = client.getWorkflowState(instanceId, true); // Assert verify(mockInnerClient, times(1)).getInstanceMetadata(instanceId, true); assertNotEquals(metadata, null); - assertEquals(metadata.getInstanceId(), expectedMetadata.getInstanceId()); + assertEquals(metadata.getWorkflowId(), expectedMetadata.getInstanceId()); assertEquals(metadata.getName(), expectedMetadata.getName()); assertEquals(metadata.isRunning(), expectedMetadata.isRunning()); assertEquals(metadata.isCompleted(), expectedMetadata.isCompleted()); @@ -179,12 +179,12 @@ public void waitForInstanceStart() throws TimeoutException { when(mockInnerClient.waitForInstanceStart(instanceId, timeout, true)).thenReturn(expectedMetadata); // Act - WorkflowInstanceStatus result = client.waitForInstanceStart(instanceId, timeout, true); + WorkflowState result = client.waitForWorkflowStart(instanceId, timeout, true); // Assert verify(mockInnerClient, times(1)).waitForInstanceStart(instanceId, timeout, true); assertNotEquals(result, null); - assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId()); + assertEquals(result.getWorkflowId(), expectedMetadata.getInstanceId()); } @Test @@ -199,12 +199,12 @@ public void waitForInstanceCompletion() throws TimeoutException { when(mockInnerClient.waitForInstanceCompletion(instanceId, timeout, true)).thenReturn(expectedMetadata); // Act - WorkflowInstanceStatus result = client.waitForInstanceCompletion(instanceId, timeout, true); + WorkflowState result = client.waitForWorkflowCompletion(instanceId, timeout, true); // Assert verify(mockInnerClient, times(1)).waitForInstanceCompletion(instanceId, timeout, true); assertNotEquals(result, null); - assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId()); + assertEquals(result.getWorkflowId(), expectedMetadata.getInstanceId()); } @Test @@ -231,7 +231,7 @@ public void suspendResumeInstance() { @Test public void purgeInstance() { String expectedArgument = "TestWorkflowInstanceId"; - client.purgeInstance(expectedArgument); + client.purgeWorkflow(expectedArgument); verify(mockInnerClient, times(1)).purgeInstance(expectedArgument); } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowStateTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowStateTest.java new file mode 100644 index 0000000000..6224e2291a --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowStateTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.client; + +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.OrchestrationRuntimeStatus; +import io.dapr.workflows.runtime.DefaultWorkflowState; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class WorkflowStateTest { + + private OrchestrationMetadata mockOrchestrationMetadata; + private WorkflowState workflowMetadata; + + @BeforeEach + public void setUp() { + mockOrchestrationMetadata = mock(OrchestrationMetadata.class); + workflowMetadata = new DefaultWorkflowState(mockOrchestrationMetadata); + } + + @Test + public void getInstanceId() { + String expected = "instanceId"; + + when(mockOrchestrationMetadata.getInstanceId()).thenReturn(expected); + + String result = workflowMetadata.getWorkflowId(); + + verify(mockOrchestrationMetadata, times(1)).getInstanceId(); + assertEquals(expected, result); + } + + @Test + public void getName() { + String expected = "WorkflowName"; + + when(mockOrchestrationMetadata.getName()).thenReturn(expected); + + String result = workflowMetadata.getName(); + + verify(mockOrchestrationMetadata, times(1)).getName(); + assertEquals(expected, result); + } + + @Test + public void getCreatedAt() { + Instant expected = Instant.now(); + when(mockOrchestrationMetadata.getCreatedAt()).thenReturn(expected); + + Instant result = workflowMetadata.getCreatedAt(); + + verify(mockOrchestrationMetadata, times(1)).getCreatedAt(); + assertEquals(expected, result); + } + + @Test + public void getLastUpdatedAt() { + Instant expected = Instant.now(); + + when(mockOrchestrationMetadata.getLastUpdatedAt()).thenReturn(expected); + + Instant result = workflowMetadata.getLastUpdatedAt(); + + verify(mockOrchestrationMetadata, times(1)).getLastUpdatedAt(); + assertEquals(expected, result); + } + + @Test + public void getFailureDetails() { + FailureDetails mockFailureDetails = mock(FailureDetails.class); + + when(mockFailureDetails.getErrorType()).thenReturn("errorType"); + when(mockFailureDetails.getErrorMessage()).thenReturn("errorMessage"); + when(mockFailureDetails.getStackTrace()).thenReturn("stackTrace"); + + OrchestrationMetadata orchestrationMetadata = mock(OrchestrationMetadata.class); + when(orchestrationMetadata.getFailureDetails()).thenReturn(mockFailureDetails); + + WorkflowState metadata = new DefaultWorkflowState(orchestrationMetadata); + WorkflowFailureDetails result = metadata.getFailureDetails(); + + verify(orchestrationMetadata, times(1)).getFailureDetails(); + assertEquals(mockFailureDetails.getErrorType(), result.getErrorType()); + assertEquals(mockFailureDetails.getErrorMessage(), result.getErrorMessage()); + assertEquals(mockFailureDetails.getStackTrace(), result.getStackTrace()); + } + + @Test + public void getRuntimeStatus() { + WorkflowRuntimeStatus expected = WorkflowRuntimeStatus.RUNNING; + + when(mockOrchestrationMetadata.getRuntimeStatus()).thenReturn(OrchestrationRuntimeStatus.RUNNING); + + WorkflowRuntimeStatus result = workflowMetadata.getRuntimeStatus(); + + verify(mockOrchestrationMetadata, times(1)).getRuntimeStatus(); + assertEquals(expected, result); + } + + @Test + public void isRunning() { + boolean expected = true; + + when(mockOrchestrationMetadata.isRunning()).thenReturn(expected); + + boolean result = workflowMetadata.isRunning(); + + verify(mockOrchestrationMetadata, times(1)).isRunning(); + assertEquals(expected, result); + } + + @Test + public void isCompleted() { + boolean expected = true; + + when(mockOrchestrationMetadata.isCompleted()).thenReturn(expected); + + boolean result = workflowMetadata.isCompleted(); + + verify(mockOrchestrationMetadata, times(1)).isCompleted(); + assertEquals(expected, result); + } + + @Test + public void getSerializedInput() { + String expected = "{input: \"test\"}"; + + when(mockOrchestrationMetadata.getSerializedInput()).thenReturn(expected); + + String result = workflowMetadata.getSerializedInput(); + + verify(mockOrchestrationMetadata, times(1)).getSerializedInput(); + assertEquals(expected, result); + } + + @Test + public void getSerializedOutput() { + String expected = "{output: \"test\"}"; + + when(mockOrchestrationMetadata.getSerializedOutput()).thenReturn(expected); + + String result = workflowMetadata.getSerializedOutput(); + + verify(mockOrchestrationMetadata, times(1)).getSerializedOutput(); + assertEquals(expected, result); + } + + @Test + public void readInputAs() { + String expected = "[{property: \"test input\"}}]"; + + when(mockOrchestrationMetadata.readInputAs(String.class)).thenReturn(expected); + + String result = workflowMetadata.readInputAs(String.class); + + verify(mockOrchestrationMetadata, times(1)).readInputAs(String.class); + assertEquals(expected, result); + } + + @Test + public void readOutputAs() { + String expected = "[{property: \"test output\"}}]"; + + when(mockOrchestrationMetadata.readOutputAs(String.class)).thenReturn(expected); + + String result = workflowMetadata.readOutputAs(String.class); + + verify(mockOrchestrationMetadata, times(1)).readOutputAs(String.class); + assertEquals(expected, result); + } + + @Test + public void testToString() { + String expected = "string value"; + + when(mockOrchestrationMetadata.toString()).thenReturn(expected); + + String result = workflowMetadata.toString(); + + assertEquals(expected, result); + } + +} diff --git a/spring-boot-examples/workflows/multi-app/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomersRestController.java b/spring-boot-examples/workflows/multi-app/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomersRestController.java index 46277ada97..817405d07e 100644 --- a/spring-boot-examples/workflows/multi-app/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomersRestController.java +++ b/spring-boot-examples/workflows/multi-app/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomersRestController.java @@ -13,9 +13,8 @@ package io.dapr.springboot.examples.orchestrator; -import io.dapr.spring.workflows.config.EnableDaprWorkflows; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -84,7 +83,7 @@ public String getCustomerStatus(@RequestBody Customer customer) { if (workflowIdForCustomer == null || workflowIdForCustomer.isEmpty()) { return "N/A"; } - WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(workflowIdForCustomer, true); + WorkflowState instanceState = daprWorkflowClient.getWorkflowState(workflowIdForCustomer, true); assert instanceState != null; return "Workflow for Customer: " + customer.getCustomerName() + " is " + instanceState.getRuntimeStatus().name(); } @@ -101,7 +100,7 @@ public Customer getCustomerOutput(@RequestBody Customer customer) { if (workflowIdForCustomer == null || workflowIdForCustomer.isEmpty()) { return null; } - WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(workflowIdForCustomer, true); + WorkflowState instanceState = daprWorkflowClient.getWorkflowState(workflowIdForCustomer, true); assert instanceState != null; return instanceState.readOutputAs(Customer.class); } diff --git a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java index f1f8856d1d..4bb1b0a241 100644 --- a/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java +++ b/spring-boot-examples/workflows/patterns/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java @@ -27,7 +27,7 @@ import io.dapr.springboot.examples.wfp.timer.DurationTimerWorkflow; import io.dapr.springboot.examples.wfp.timer.ZonedDateTimeTimerWorkflow; import io.dapr.workflows.client.DaprWorkflowClient; -import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -67,7 +67,7 @@ public String chain() throws TimeoutException { String instanceId = daprWorkflowClient.scheduleNewWorkflow(ChainWorkflow.class); logger.info("Workflow instance " + instanceId + " started"); return daprWorkflowClient - .waitForInstanceCompletion(instanceId, Duration.ofSeconds(10), true) + .waitForWorkflowCompletion(instanceId, Duration.ofSeconds(10), true) .readOutputAs(String.class); } @@ -81,7 +81,7 @@ public String child() throws TimeoutException { String instanceId = daprWorkflowClient.scheduleNewWorkflow(ParentWorkflow.class); logger.info("Workflow instance " + instanceId + " started"); return daprWorkflowClient - .waitForInstanceCompletion(instanceId, Duration.ofSeconds(10), true) + .waitForWorkflowCompletion(instanceId, Duration.ofSeconds(10), true) .readOutputAs(String.class); } @@ -97,13 +97,13 @@ public Result fanOutIn(@RequestBody List listOfStrings) throws TimeoutEx logger.info("Workflow instance " + instanceId + " started"); // Block until the orchestration completes. Then print the final status, which includes the output. - WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient.waitForInstanceCompletion( + WorkflowState workflowState = daprWorkflowClient.waitForWorkflowCompletion( instanceId, Duration.ofSeconds(30), true); logger.info("workflow instance with ID: %s completed with result: %s%n", instanceId, - workflowInstanceStatus.readOutputAs(Result.class)); - return workflowInstanceStatus.readOutputAs(Result.class); + workflowState.readOutputAs(Result.class)); + return workflowState.readOutputAs(Result.class); } /** @@ -124,8 +124,8 @@ public Decision externalEventContinue(@RequestParam("orderId") String orderId, @ String instanceId = ordersToApprove.get(orderId); logger.info("Workflow instance " + instanceId + " continue"); daprWorkflowClient.raiseEvent(instanceId, "Approval", decision); - WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient - .waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowInstanceStatus = daprWorkflowClient + .waitForWorkflowCompletion(instanceId, null, true); return workflowInstanceStatus.readOutputAs(Decision.class); } @@ -137,7 +137,7 @@ public CleanUpLog continueAsNew() String instanceId = daprWorkflowClient.scheduleNewWorkflow(ContinueAsNewWorkflow.class); logger.info("Workflow instance " + instanceId + " started"); - WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient.waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowInstanceStatus = daprWorkflowClient.waitForWorkflowCompletion(instanceId, null, true); System.out.printf("workflow instance with ID: %s completed.", instanceId); return workflowInstanceStatus.readOutputAs(CleanUpLog.class); } @@ -149,8 +149,8 @@ public Payload remoteEndpoint(@RequestBody Payload payload) String instanceId = daprWorkflowClient.scheduleNewWorkflow(RemoteEndpointWorkflow.class, payload); logger.info("Workflow instance " + instanceId + " started"); - WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient - .waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowInstanceStatus = daprWorkflowClient + .waitForWorkflowCompletion(instanceId, null, true); System.out.printf("workflow instance with ID: %s completed.", instanceId); return workflowInstanceStatus.readOutputAs(Payload.class); } @@ -167,7 +167,7 @@ public String suspendResume(@RequestParam("orderId") String orderId) { public String suspendResumeExecuteSuspend(@RequestParam("orderId") String orderId) { String instanceId = ordersToApprove.get(orderId); daprWorkflowClient.suspendWorkflow(instanceId, "testing suspend"); - WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false); + WorkflowState instanceState = daprWorkflowClient.getWorkflowState(instanceId, false); return instanceState.getRuntimeStatus().name(); } @@ -175,7 +175,7 @@ public String suspendResumeExecuteSuspend(@RequestParam("orderId") String orderI public String suspendResumeExecuteResume(@RequestParam("orderId") String orderId) { String instanceId = ordersToApprove.get(orderId); daprWorkflowClient.resumeWorkflow(instanceId, "testing resume"); - WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false); + WorkflowState instanceState = daprWorkflowClient.getWorkflowState(instanceId, false); return instanceState.getRuntimeStatus().name(); } @@ -186,8 +186,8 @@ public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @ String instanceId = ordersToApprove.get(orderId); logger.info("Workflow instance " + instanceId + " continue"); daprWorkflowClient.raiseEvent(instanceId, "Approval", decision); - WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient - .waitForInstanceCompletion(instanceId, null, true); + WorkflowState workflowInstanceStatus = daprWorkflowClient + .waitForWorkflowCompletion(instanceId, null, true); return workflowInstanceStatus.readOutputAs(Decision.class); }