Skip to content

Commit f799660

Browse files
committed
Also delete scheduler jobs during workflow purge
Signed-off-by: joshvanl <[email protected]>
1 parent 4786140 commit f799660

File tree

10 files changed

+74
-22
lines changed

10 files changed

+74
-22
lines changed

cmd/workflow/purge.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ var (
2525
flagPurgeOlderThan string
2626
flagPurgeAll bool
2727
flagPurgeConn *connFlag
28+
schedulerNamespace string
2829
)
2930

3031
var PurgeCmd = &cobra.Command{
3132
Use: "purge",
32-
Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances.",
33+
Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.",
3334
Args: func(cmd *cobra.Command, args []string) error {
3435
switch {
3536
case cmd.Flags().Changed("all-older-than"),
@@ -54,13 +55,14 @@ var PurgeCmd = &cobra.Command{
5455
}
5556

5657
opts := workflow.PurgeOptions{
57-
KubernetesMode: flagKubernetesMode,
58-
Namespace: flagDaprNamespace,
59-
AppID: appID,
60-
InstanceIDs: args,
61-
All: flagPurgeAll,
62-
ConnectionString: flagPurgeConn.connectionString,
63-
TableName: flagPurgeConn.tableName,
58+
KubernetesMode: flagKubernetesMode,
59+
Namespace: flagDaprNamespace,
60+
SchedulerNamespace: schedulerNamespace,
61+
AppID: appID,
62+
InstanceIDs: args,
63+
All: flagPurgeAll,
64+
ConnectionString: flagPurgeConn.connectionString,
65+
TableName: flagPurgeConn.tableName,
6466
}
6567

6668
if cmd.Flags().Changed("all-older-than") {
@@ -79,6 +81,8 @@ func init() {
7981
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
8082
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
8183

84+
PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")
85+
8286
flagPurgeConn = connectionCmd(PurgeCmd)
8387

8488
WorkflowCmd.AddCommand(PurgeCmd)

cmd/workflow/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var (
4444
var WorkflowCmd = &cobra.Command{
4545
Use: "workflow",
4646
Short: "Workflow management commands. Use -k to target a Kubernetes Dapr cluster.",
47-
Aliases: []string{"work"},
47+
Aliases: []string{"wf"},
4848
}
4949

5050
func init() {

pkg/scheduler/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type DeleteOptions struct {
3030
}
3131

3232
func Delete(ctx context.Context, opts DeleteOptions, keys ...string) error {
33-
etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
33+
etcdClient, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
3434
if err != nil {
3535
return err
3636
}

pkg/scheduler/deleteall.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
func DeleteAll(ctx context.Context, opts DeleteOptions, key string) error {
28-
etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
28+
etcdClient, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
2929
if err != nil {
3030
return err
3131
}

pkg/scheduler/exportimport.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func Export(ctx context.Context, opts ExportImportOptions) error {
4646
return err
4747
}
4848

49-
client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
49+
client, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
5050
if err != nil {
5151
return err
5252
}
@@ -98,7 +98,7 @@ func Export(ctx context.Context, opts ExportImportOptions) error {
9898
}
9999

100100
func Import(ctx context.Context, opts ExportImportOptions) error {
101-
client, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
101+
client, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
102102
if err != nil {
103103
return err
104104
}

pkg/scheduler/get.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func Get(ctx context.Context, opts GetOptions, keys ...string) ([]*ListOutput, e
3838
}
3939

4040
func GetWide(ctx context.Context, opts GetOptions, keys ...string) ([]*ListOutputWide, error) {
41-
etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
41+
etcdClient, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
4242
if err != nil {
4343
return nil, err
4444
}

pkg/scheduler/list.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func ListWide(ctx context.Context, opts ListOptions) ([]*ListOutputWide, error)
102102
}
103103

104104
func ListJobs(ctx context.Context, opts ListOptions) ([]*JobCount, error) {
105-
etcdClient, cancel, err := etcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
105+
etcdClient, cancel, err := EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
106106
if err != nil {
107107
return nil, err
108108
}

pkg/scheduler/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func parseJobKey(key string) (*jobKey, error) {
192192
}
193193
}
194194

195-
func etcdClient(kubernetesMode bool, schedulerNamespace string) (*clientv3.Client, context.CancelFunc, error) {
195+
func EtcdClient(kubernetesMode bool, schedulerNamespace string) (*clientv3.Client, context.CancelFunc, error) {
196196
var etcdClient *clientv3.Client
197197
var err error
198198
if kubernetesMode {

pkg/workflow/purge.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@ import (
1919
"os"
2020
"time"
2121

22+
clientv3 "go.etcd.io/etcd/client/v3"
23+
2224
"github.com/dapr/cli/pkg/print"
25+
"github.com/dapr/cli/pkg/scheduler"
2326
"github.com/dapr/cli/pkg/workflow/dclient"
2427
"github.com/dapr/durabletask-go/workflow"
2528
)
2629

2730
type PurgeOptions struct {
28-
KubernetesMode bool
29-
Namespace string
30-
AppID string
31-
InstanceIDs []string
32-
AllOlderThan *time.Time
33-
All bool
31+
KubernetesMode bool
32+
Namespace string
33+
SchedulerNamespace string
34+
AppID string
35+
InstanceIDs []string
36+
AllOlderThan *time.Time
37+
All bool
3438

3539
ConnectionString *string
3640
TableName *string
@@ -80,12 +84,39 @@ func Purge(ctx context.Context, opts PurgeOptions) error {
8084

8185
wf := workflow.NewClient(cli.Dapr.GrpcClientConn())
8286

87+
etcdClient, cancel, err := scheduler.EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace)
88+
if err != nil {
89+
return err
90+
}
91+
defer cancel()
92+
8393
print.InfoStatusEvent(os.Stdout, "Purging %d workflow instance(s)", len(toPurge))
8494

8595
for _, id := range toPurge {
8696
if err = wf.PurgeWorkflowState(ctx, id); err != nil {
8797
return fmt.Errorf("%s: %w", id, err)
8898
}
99+
100+
paths := []string{
101+
fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id),
102+
fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id),
103+
fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id),
104+
fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id),
105+
}
106+
107+
oopts := make([]clientv3.Op, 0, len(paths))
108+
for _, path := range paths {
109+
oopts = append(oopts, clientv3.OpDelete(path,
110+
clientv3.WithPrefix(),
111+
clientv3.WithPrevKV(),
112+
clientv3.WithKeysOnly(),
113+
))
114+
}
115+
116+
if _, err = etcdClient.Txn(ctx).Then(oopts...).Commit(); err != nil {
117+
return err
118+
}
119+
89120
print.SuccessStatusEvent(os.Stdout, "Purged workflow instance %q", id)
90121
}
91122

tests/e2e/standalone/workflow_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,23 @@ func TestWorkflowPurge(t *testing.T) {
332332
require.NoError(t, err, output)
333333
assert.NotContains(t, output, "purge-older")
334334
})
335+
336+
t.Run("also purge scheduler", func(t *testing.T) {
337+
output, err := cmdWorkflowRun(appID, "EventWorkflow",
338+
"--instance-id=also-sched")
339+
require.NoError(t, err)
340+
341+
output, err := cmdSchedulerList()
342+
require.NoError(t, err)
343+
assert.Greater(c, len(strings.Split(output, "\n")), 2)
344+
345+
output, err = cmdWorkflowPurge(appID, "also-sched")
346+
require.NoError(t, err, output)
347+
348+
output, err = cmdSchedulerList()
349+
require.NoError(t, err)
350+
assert.Len(c, strings.Split(output, "\n"), 2)
351+
})
335352
}
336353

337354
func TestWorkflowFilters(t *testing.T) {

0 commit comments

Comments
 (0)