-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathautopause.go
135 lines (116 loc) · 3.42 KB
/
autopause.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package workflow
import (
"context"
"time"
"k8s.io/utils/clock"
"github.com/luno/workflow/internal/errorcounter"
"github.com/luno/workflow/internal/metrics"
)
// maybePause will either return a nil error if it has failed to pause the record and should be retried. A non-nil
// error is returned when no faults have taken place and the corresponding bool returns true when the Run is paused
// and returns false when the Run was not paused.
func maybePause[Type any, Status StatusType](
ctx context.Context,
pauseAfterErrCount int,
counter errorcounter.ErrorCounter,
originalErr error,
processName string,
run *Run[Type, Status],
logger *logger,
) (paused bool, err error) {
// Only keep track of errors only if we need to
if pauseAfterErrCount == 0 {
return false, nil
}
count := counter.Add(originalErr, processName, run.RunID)
if count < pauseAfterErrCount {
return false, nil
}
_, err = run.Pause(ctx)
if err != nil {
return false, err
}
logger.maybeDebug(ctx, "paused record after exceeding allowed error count", map[string]string{
"workflow_name": run.WorkflowName,
"foreign_id": run.ForeignID,
"run_id": run.RunID,
})
// Run paused - now clear the error counter.
counter.Clear(originalErr, processName, run.RunID)
return true, nil
}
type autoPauseRetryConfig struct {
enabled bool
// limit determines the number of records in one lookup cycle.
limit int
// pollingFrequency is the frequency of the lookup cycle that looks up paused records that have met
// or exceeded the resumeAfter duration.
pollingFrequency time.Duration
// resumeAfter is the duration that the record should remain paused for.
resumeAfter time.Duration
}
func defaultAutoPauseRetryConfig() autoPauseRetryConfig {
return autoPauseRetryConfig{
enabled: true,
limit: 10,
pollingFrequency: time.Minute,
resumeAfter: time.Hour,
}
}
func autoRetryPausedRecordsForever[Type any, Status StatusType](w *Workflow[Type, Status]) {
role := makeRole(w.Name(), "paused-records-retry")
processName := role
w.run(role, processName, func(ctx context.Context) error {
for {
err := retryPausedRecords(
ctx,
w.Name(),
w.recordStore.List,
w.recordStore.Store,
w.clock,
processName,
w.autoPauseRetryConfig.limit,
w.autoPauseRetryConfig.resumeAfter,
)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-w.clock.After(w.autoPauseRetryConfig.pollingFrequency): // Slow and constant drip feed of paused records back into running state.
continue
}
}
}, w.defaultOpts.errBackOff)
}
type listFunc func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)
func retryPausedRecords(
ctx context.Context,
workflowName string,
list listFunc,
store storeFunc,
clock clock.Clock,
processName string,
limit int,
retryInterval time.Duration,
) error {
t0 := clock.Now()
rs, err := list(ctx, workflowName, 0, limit, OrderTypeAscending, FilterByRunState(RunStatePaused))
if err != nil {
return err
}
threshold := clock.Now().Add(-retryInterval)
for _, r := range rs {
if r.UpdatedAt.After(threshold) {
continue
}
controller := NewRunStateController(store, &r)
err := controller.Resume(ctx)
if err != nil {
return err
}
}
metrics.ProcessLatency.WithLabelValues(workflowName, processName).Observe(clock.Since(t0).Seconds())
return nil
}