Skip to content

Commit 266d282

Browse files
committed
Adds dapr workflow
``` Workflow management commands. Use -k to target a Kubernetes Dapr cluster. Usage: dapr workflow [command] Aliases: workflow, work Available Commands: history Get the history of a workflow instance. list List workflows for the given app ID. purge Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. raise-event Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'. rerun ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided. resume Resume a workflow that is suspended. run Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name. suspend Suspend a workflow in progress. terminate Terminate a workflow in progress. Flags: -a, --app-id string The app ID owner of the workflow instance -h, --help help for workflow -k, --kubernetes Target a Kubernetes dapr installation -n, --namespace string Namespace to perform workflow operation on (default "default") Global Flags: --log-as-json Log output in JSON format --runtime-path string The path to the dapr runtime installation directory Get the history of a workflow instance. Usage: dapr workflow history [flags] Flags: -h, --help help for history -o, --output string Output format. One of short, wide, yaml, json (default "short") Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory List workflows for the given app ID. Usage: dapr workflow list [flags] Aliases: list, ls Flags: -c, --connection-string string The connection string used to connect and authenticate to the actor state store -m, --filter-max-age string Filter only the workflows started within the given duration or timestamp. Examples: 300ms, 1.5h or 2h45m, 2023-01-02T15:04:05 or 2023-01-02 -w, --filter-name string Filter only the workflows with the given name -s, --filter-status string Filter only the workflows with the given runtime status. One of RUNNING, COMPLETED, CONTINUED_AS_NEW, FAILED, CANCELED, TERMINATED, PENDING, SUSPENDED -h, --help help for list -o, --output string Output format. One of short, wide, yaml, json (default "short") -t, --table-name string The name of the table or collection which is used as the actor state store Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Usage: dapr workflow purge [flags] Flags: --all Purge all workflow instances in a terminal state. Use with caution. --all-older-than string Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'. -c, --connection-string string The connection string used to connect and authenticate to the actor state store -h, --help help for purge -t, --table-name string The name of the table or collection which is used as the actor state store Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'. Usage: dapr workflow raise-event [flags] Flags: -h, --help help for raise-event -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided. Usage: dapr workflow rerun [instance ID] [flags] Flags: -e, --event-id uint32 The event ID from which to re-run the workflow. If not provided, the workflow will re-run from the beginning. -h, --help help for rerun -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. --new-instance-id string Optional new ID for the re-run workflow instance. If not provided, a new ID will be generated. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Resume a workflow that is suspended. Usage: dapr workflow resume [flags] Flags: -h, --help help for resume -r, --reason string Reason for resuming the workflow Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name. Usage: dapr workflow run [flags] Flags: -h, --help help for run -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. -i, --instance-id string The target workflow instance ID. -s, --start-time string Optional start time for the workflow in RFC3339 or Go duration string format. If not provided, the workflow starts immediately. A duration of '0s', or any start time, will cause the command to not wait for the command to start Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Suspend a workflow in progress. Usage: dapr workflow suspend [flags] Flags: -h, --help help for suspend -r, --reason string Reason for resuming the workflow Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Terminate a workflow in progress. Usage: dapr workflow terminate [flags] Flags: -h, --help help for terminate -o, --output string Optional output data for the workflow in JSON string format. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory ``` Signed-off-by: joshvanl <[email protected]>
1 parent de38192 commit 266d282

File tree

35 files changed

+3988
-50
lines changed

35 files changed

+3988
-50
lines changed

cmd/dapr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/spf13/viper"
2323

2424
"github.com/dapr/cli/cmd/scheduler"
25+
"github.com/dapr/cli/cmd/workflow"
2526
"github.com/dapr/cli/pkg/api"
2627
"github.com/dapr/cli/pkg/print"
2728
"github.com/dapr/cli/pkg/standalone"
@@ -111,4 +112,5 @@ func init() {
111112
RootCmd.PersistentFlags().BoolVarP(&logAsJSON, "log-as-json", "", false, "Log output in JSON format")
112113

113114
RootCmd.AddCommand(scheduler.SchedulerCmd)
115+
RootCmd.AddCommand(workflow.WorkflowCmd)
114116
}

cmd/workflow/history.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/cli/utils"
24+
"github.com/dapr/kit/signals"
25+
)
26+
27+
var (
28+
historyOutputFormat *string
29+
)
30+
31+
var HistoryCmd = &cobra.Command{
32+
Use: "history",
33+
Short: "Get the history of a workflow instance.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
appID, err := getWorkflowAppID(cmd)
39+
if err != nil {
40+
return err
41+
}
42+
43+
opts := workflow.HistoryOptions{
44+
KubernetesMode: flagKubernetesMode,
45+
Namespace: flagDaprNamespace,
46+
AppID: appID,
47+
InstanceID: args[0],
48+
}
49+
50+
var list any
51+
if *historyOutputFormat == outputFormatShort {
52+
list, err = workflow.HistoryShort(ctx, opts)
53+
} else {
54+
list, err = workflow.HistoryWide(ctx, opts)
55+
}
56+
if err != nil {
57+
return err
58+
}
59+
60+
switch *historyOutputFormat {
61+
case outputFormatYAML:
62+
err = utils.PrintDetail(os.Stdout, "yaml", list)
63+
case outputFormatJSON:
64+
err = utils.PrintDetail(os.Stdout, "json", list)
65+
default:
66+
table, err := gocsv.MarshalString(list)
67+
if err != nil {
68+
break
69+
}
70+
71+
utils.PrintTable(table)
72+
}
73+
74+
return nil
75+
},
76+
}
77+
78+
func init() {
79+
historyOutputFormat = outputFunc(HistoryCmd)
80+
WorkflowCmd.AddCommand(HistoryCmd)
81+
}

cmd/workflow/list.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/print"
23+
"github.com/dapr/cli/pkg/workflow"
24+
"github.com/dapr/cli/utils"
25+
"github.com/dapr/kit/signals"
26+
)
27+
28+
var (
29+
listFilter *workflow.Filter
30+
listOutputFormat *string
31+
32+
listConn *connFlag
33+
)
34+
35+
var ListCmd = &cobra.Command{
36+
Use: "list",
37+
Aliases: []string{"ls"},
38+
Short: "List workflows for the given app ID.",
39+
Args: cobra.NoArgs,
40+
RunE: func(cmd *cobra.Command, args []string) error {
41+
ctx := signals.Context()
42+
43+
appID, err := getWorkflowAppID(cmd)
44+
if err != nil {
45+
return err
46+
}
47+
48+
opts := workflow.ListOptions{
49+
KubernetesMode: flagKubernetesMode,
50+
Namespace: flagDaprNamespace,
51+
AppID: appID,
52+
ConnectionString: listConn.connectionString,
53+
TableName: listConn.tableName,
54+
Filter: *listFilter,
55+
}
56+
57+
var list any
58+
var empty bool
59+
60+
switch *listOutputFormat {
61+
case outputFormatShort:
62+
ll, err := workflow.ListShort(ctx, opts)
63+
if err != nil {
64+
return err
65+
}
66+
empty = len(ll) == 0
67+
list = ll
68+
69+
default:
70+
ll, err := workflow.ListWide(ctx, opts)
71+
if err != nil {
72+
return err
73+
}
74+
empty = len(ll) == 0
75+
list = ll
76+
}
77+
78+
if empty {
79+
print.FailureStatusEvent(os.Stderr, "No workflow found in namespace %q for app ID %q", flagDaprNamespace, appID)
80+
return nil
81+
}
82+
83+
switch *listOutputFormat {
84+
case outputFormatYAML:
85+
err = utils.PrintDetail(os.Stdout, "yaml", list)
86+
case outputFormatJSON:
87+
err = utils.PrintDetail(os.Stdout, "json", list)
88+
default:
89+
table, err := gocsv.MarshalString(list)
90+
if err != nil {
91+
break
92+
}
93+
94+
utils.PrintTable(table)
95+
}
96+
97+
return nil
98+
},
99+
}
100+
101+
func init() {
102+
listFilter = filterCmd(ListCmd)
103+
listOutputFormat = outputFunc(ListCmd)
104+
listConn = connectionCmd(ListCmd)
105+
WorkflowCmd.AddCommand(ListCmd)
106+
}

cmd/workflow/purge.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
19+
"github.com/dapr/cli/pkg/workflow"
20+
"github.com/dapr/kit/signals"
21+
"github.com/spf13/cobra"
22+
)
23+
24+
var (
25+
flagPurgeOlderThan string
26+
flagPurgeAll bool
27+
flagPurgeConn *connFlag
28+
)
29+
30+
var PurgeCmd = &cobra.Command{
31+
Use: "purge",
32+
Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances.",
33+
Args: func(cmd *cobra.Command, args []string) error {
34+
switch {
35+
case cmd.Flags().Changed("all-older-than"),
36+
cmd.Flags().Changed("all"):
37+
if len(args) > 0 {
38+
return errors.New("no arguments are accepted when using purge all flags")
39+
}
40+
default:
41+
if len(args) == 0 {
42+
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
43+
}
44+
}
45+
46+
return nil
47+
},
48+
RunE: func(cmd *cobra.Command, args []string) error {
49+
ctx := signals.Context()
50+
51+
appID, err := getWorkflowAppID(cmd)
52+
if err != nil {
53+
return err
54+
}
55+
56+
opts := workflow.PurgeOptions{
57+
KubernetesMode: flagKubernetesMode,
58+
Namespace: flagDaprNamespace,
59+
AppID: appID,
60+
InstanceIDs: args,
61+
All: flagPurgeAll,
62+
ConnectionString: flagPurgeConn.connectionString,
63+
TableName: flagPurgeConn.tableName,
64+
}
65+
66+
if cmd.Flags().Changed("all-older-than") {
67+
opts.AllOlderThan, err = parseWorkflowDurationTimestamp(flagPurgeOlderThan, true)
68+
if err != nil {
69+
return err
70+
}
71+
}
72+
73+
return workflow.Purge(ctx, opts)
74+
},
75+
}
76+
77+
func init() {
78+
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
79+
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
80+
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
81+
82+
flagPurgeConn = connectionCmd(PurgeCmd)
83+
84+
WorkflowCmd.AddCommand(PurgeCmd)
85+
}

cmd/workflow/raiseevent.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
"os"
19+
"strings"
20+
21+
"github.com/dapr/cli/pkg/print"
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/kit/signals"
24+
"github.com/spf13/cobra"
25+
)
26+
27+
var (
28+
flagRaiseEventInput *inputFlag
29+
)
30+
31+
var RaiseEventCmd = &cobra.Command{
32+
Use: "raise-event",
33+
Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
split := strings.Split(args[0], "/")
39+
if len(split) != 2 {
40+
return errors.New("the argument must be in the format '<instance-id>/<event-name>'")
41+
}
42+
instanceID := split[0]
43+
eventName := split[1]
44+
45+
appID, err := getWorkflowAppID(cmd)
46+
if err != nil {
47+
return err
48+
}
49+
50+
opts := workflow.RaiseEventOptions{
51+
KubernetesMode: flagKubernetesMode,
52+
Namespace: flagDaprNamespace,
53+
AppID: appID,
54+
InstanceID: instanceID,
55+
Name: eventName,
56+
Input: flagRaiseEventInput.input,
57+
}
58+
59+
if err = workflow.RaiseEvent(ctx, opts); err != nil {
60+
print.FailureStatusEvent(os.Stdout, err.Error())
61+
os.Exit(1)
62+
}
63+
64+
print.InfoStatusEvent(os.Stdout, "Workflow '%s' raised event '%s' successfully", instanceID, eventName)
65+
66+
return nil
67+
},
68+
}
69+
70+
func init() {
71+
flagRaiseEventInput = inputCmd(RaiseEventCmd)
72+
73+
WorkflowCmd.AddCommand(RaiseEventCmd)
74+
}

0 commit comments

Comments
 (0)