Skip to content

Commit

Permalink
workflow: Add auto retry of paused records (#93)
Browse files Browse the repository at this point in the history
* workflow: Add auto retry of paused records

* fix typo

* pass in polling frequency config to sleeping

* update process name

* clean up comments on parameters
  • Loading branch information
andrewwormald authored Feb 21, 2025
1 parent 8294b24 commit 7548142
Show file tree
Hide file tree
Showing 13 changed files with 495 additions and 79 deletions.
92 changes: 91 additions & 1 deletion autopause.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package workflow

import (
"context"
"time"

"k8s.io/utils/clock"

"github.com/luno/workflow/internal/errorcounter"
"github.com/luno/workflow/internal/metrics"
)

// maybePause will either return a nil error if it has failed to pause the record and should be retried. A non-nil
Expand All @@ -24,7 +28,7 @@ func maybePause[Type any, Status StatusType](
}

count := counter.Add(originalErr, processName, run.RunID)
if count <= pauseAfterErrCount {
if count < pauseAfterErrCount {
return false, nil
}

Expand All @@ -43,3 +47,89 @@ func maybePause[Type any, Status StatusType](
counter.Clear(originalErr, processName, run.RunID)
return true, nil
}

type autoPauseRetryConfig struct {
enabled bool
// limit determines the number of records in one lookup cycle.
limit int
// pollingFrequency is the frequency of the lookup cycle that looks up paused records that have met
// or exceeded the resumeAfter duration.
pollingFrequency time.Duration
// resumeAfter is the duration that the record should remain paused for.
resumeAfter time.Duration
}

func defaultAutoPauseRetryConfig() autoPauseRetryConfig {
return autoPauseRetryConfig{
enabled: true,
limit: 10,
pollingFrequency: time.Minute,
resumeAfter: time.Hour,
}
}

func autoRetryPausedRecordsForever[Type any, Status StatusType](w *Workflow[Type, Status]) {
role := makeRole(w.Name(), "paused-records-retry")
processName := role

w.run(role, processName, func(ctx context.Context) error {
for {
err := retryPausedRecords(
ctx,
w.Name(),
w.recordStore.List,
w.recordStore.Store,
w.clock,
processName,
w.autoPauseRetryConfig.limit,
w.autoPauseRetryConfig.resumeAfter,
)
if err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case <-w.clock.After(w.autoPauseRetryConfig.pollingFrequency): // Slow and constant drip feed of paused records back into running state.
continue
}
}
}, w.defaultOpts.errBackOff)
}

type listFunc func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error)

func retryPausedRecords(
ctx context.Context,
workflowName string,
list listFunc,
store storeFunc,
clock clock.Clock,
processName string,
limit int,
retryInterval time.Duration,
) error {
t0 := clock.Now()

rs, err := list(ctx, workflowName, 0, limit, OrderTypeAscending, FilterByRunState(RunStatePaused))
if err != nil {
return err
}

threshold := clock.Now().Add(-retryInterval)
for _, r := range rs {
if r.UpdatedAt.After(threshold) {
continue
}

controller := NewRunStateController(store, &r)
err := controller.Resume(ctx)
if err != nil {
return err
}
}

metrics.ProcessLatency.WithLabelValues(workflowName, processName).Observe(clock.Since(t0).Seconds())
return nil
}
165 changes: 138 additions & 27 deletions autopause_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
clock_testing "k8s.io/utils/clock/testing"

"github.com/luno/workflow/internal/errorcounter"
)
Expand All @@ -18,42 +20,42 @@ func Test_maybeAutoPause(t *testing.T) {
processName := "process"

testCases := []struct {
name string
countBeforePause int
pausesRecord bool
errCount int
expectedErr error
pauseFn func(ctx context.Context) error
name string
pauseAfterErrCount int
pausesRecord bool
errCount int
expectedErr error
pauseFn func(ctx context.Context) error
}{
{
name: "Default - not configured, no previous errors - does not pause workflow run",
countBeforePause: 0,
pausesRecord: false,
errCount: 0,
name: "Default - not configured, no previous errors - does not pause workflow run",
pauseAfterErrCount: 0,
pausesRecord: false,
errCount: 0,
},
{
name: "Default - not configured, has previous errors - does not pause workflow run",
countBeforePause: 0,
pausesRecord: false,
errCount: 100,
name: "Default - not configured, has previous errors - does not pause workflow run",
pauseAfterErrCount: 0,
pausesRecord: false,
errCount: 100,
},
{
name: "Pause record that has max retry of 1",
countBeforePause: 1,
pausesRecord: true,
errCount: 1,
name: "Pause record that has max retry of 1",
pauseAfterErrCount: 1,
pausesRecord: true,
errCount: 1,
},
{
name: "Do not pause record that has not passed max retry",
countBeforePause: 2,
pausesRecord: false,
errCount: 1,
name: "Do not pause record that has not passed max retry",
pauseAfterErrCount: 3,
pausesRecord: false,
errCount: 1,
},
{
name: "Return error when pause fails",
countBeforePause: 1,
pausesRecord: false,
errCount: 1,
name: "Return error when pause fails",
pauseAfterErrCount: 1,
pausesRecord: false,
errCount: 1,
pauseFn: func(ctx context.Context) error {
return pauseErr
},
Expand All @@ -74,7 +76,7 @@ func Test_maybeAutoPause(t *testing.T) {

paused, err := maybePause(
ctx,
tc.countBeforePause,
tc.pauseAfterErrCount,
counter,
testErr,
processName,
Expand All @@ -86,3 +88,112 @@ func Test_maybeAutoPause(t *testing.T) {
})
}
}

func Test_retryPausedRecords(t *testing.T) {
t.Run("Golden path", func(t *testing.T) {
ctx := context.Background()
nw := time.Now()
clock := clock_testing.NewFakeClock(nw)
retryInterval := 20 * time.Minute
recordList := []Record{
{
RunID: "A",
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-time.Hour),
},
{
RunID: "B",
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-time.Hour * 48),
},
{
RunID: "C",
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-time.Second),
},
{
RunID: "D",
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-retryInterval),
},
{
RunID: "E",
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-time.Minute * 10),
},
}
updateCounter := make(map[string]int)
err := retryPausedRecords(
ctx,
"example workflow",
func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error) {
filter := MakeFilter(filters...)
require.True(t, filter.byRunState.Enabled)
require.Equal(t, "3", filter.byRunState.Value)
require.Equal(t, int64(0), offsetID)
require.Equal(t, 100, limit)
require.Equal(t, OrderTypeAscending, order)
return recordList, nil
},
func(ctx context.Context, record *Record) error {
// Require that the record is updated to RunStateRunning
require.Equal(t, RunStateRunning, record.RunState)
updateCounter[record.RunID] += 1
return nil
},
clock,
"process-name",
100,
retryInterval,
)
require.NoError(t, err)

require.Equal(t, map[string]int{
"A": 1,
"B": 1,
"D": 1,
}, updateCounter)
})

t.Run("Returns error from listing records failure", func(t *testing.T) {
testErr := errors.New("test error")
err := retryPausedRecords(
context.Background(),
"example workflow",
func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error) {
return nil, testErr
},
nil,
&clock_testing.FakeClock{},
"process-name",
100,
time.Minute,
)
require.Error(t, err, testErr)
})

t.Run("Returns error when updating record to Running fails", func(t *testing.T) {
testErr := errors.New("test error")
clock := clock_testing.NewFakeClock(time.Now())
err := retryPausedRecords(
context.Background(),
"example workflow",
func(ctx context.Context, workflowName string, offsetID int64, limit int, order OrderType, filters ...RecordFilter) ([]Record, error) {
return []Record{
{
RunState: RunStatePaused,
UpdatedAt: clock.Now().Add(-time.Hour),
},
}, nil
},
func(ctx context.Context, record *Record) error {
return testErr
},
clock,
"process-name",
100,
time.Minute,
)
require.Error(t, err, testErr)
})
}
Loading

0 comments on commit 7548142

Please sign in to comment.