Skip to content

Commit

Permalink
builder: Enable Command Substitution for RetryPolicy configuration (#747
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yohamta authored Dec 25, 2024
1 parent 3093ae2 commit 6ca7f2a
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 14 deletions.
21 changes: 21 additions & 0 deletions internal/cmdutil/cmdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"reflect"
"regexp"
"strconv"
"strings"
"unicode"

Expand Down Expand Up @@ -180,6 +181,26 @@ func SplitCommand(cmd string) (string, []string, error) {
return command[0], command[1:], nil
}

// SubstituteWithEnvExpand substitutes environment variables and commands in the input string
func SubstituteWithEnvExpand(input string) (string, error) {
expanded := os.ExpandEnv(input)
return SubstituteCommands(expanded)
}

// SubstituteWithEnvExpandInt substitutes environment variables and commands in the input string
func SubstituteWithEnvExpandInt(input string) (int, error) {
expanded := os.ExpandEnv(input)
expanded, err := SubstituteCommands(expanded)
if err != nil {
return 0, err
}
v, err := strconv.Atoi(expanded)
if err != nil {
return 0, fmt.Errorf("failed to convert %q to int: %w", expanded, err)
}
return v, nil
}

// tickerMatcher matches the command in the value string.
// Example: "`date`"
var tickerMatcher = regexp.MustCompile("`[^`]+`")
Expand Down
19 changes: 16 additions & 3 deletions internal/digraph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,22 @@ func buildContinueOn(_ BuildContext, def stepDef, step *Step) error {

func buildRetryPolicy(_ BuildContext, def stepDef, step *Step) error {
if def.RetryPolicy != nil {
step.RetryPolicy = &RetryPolicy{
Limit: def.RetryPolicy.Limit,
Interval: time.Second * time.Duration(def.RetryPolicy.IntervalSec),
switch v := def.RetryPolicy.Limit.(type) {
case int:
step.RetryPolicy.Limit = v
case string:
step.RetryPolicy.LimitStr = v
default:
return fmt.Errorf("invalid type for retryPolicy.Limit: %T", v)
}

switch v := def.RetryPolicy.IntervalSec.(type) {
case int:
step.RetryPolicy.Interval = time.Second * time.Duration(v)
case string:
step.RetryPolicy.IntervalSecStr = v
default:
return fmt.Errorf("invalid type for retryPolicy.IntervalSec: %T", v)
}
}
return nil
Expand Down
39 changes: 39 additions & 0 deletions internal/digraph/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Node struct {
outputReader *os.File
scriptFile *os.File
done bool
retryPolicy retryPolicy
}

type NodeData struct {
Expand Down Expand Up @@ -342,6 +343,10 @@ func (n *Node) setup(logDir string, requestID string) error {
if err := n.setupStderr(); err != nil {
return err
}
if err := n.setupRetryPolicy(); err != nil {
return err
}

return n.setupScript()
}

Expand Down Expand Up @@ -491,3 +496,37 @@ func (n *Node) init() {
n.data.Step.Preconditions = []digraph.Condition{}
}
}

type retryPolicy struct {
Limit int
Interval time.Duration
}

func (n *Node) setupRetryPolicy() error {
var retryPolicy retryPolicy

if n.data.Step.RetryPolicy.Limit > 0 {
retryPolicy.Limit = n.data.Step.RetryPolicy.Limit
}
if n.data.Step.RetryPolicy.Interval > 0 {
retryPolicy.Interval = n.data.Step.RetryPolicy.Interval
}
// Evaluate the the configuration if it's configured as a string
// e.g. environment variable or command substitution
if n.data.Step.RetryPolicy.LimitStr != "" {
v, err := cmdutil.SubstituteWithEnvExpandInt(n.data.Step.RetryPolicy.LimitStr)
if err != nil {
return fmt.Errorf("failed to substitute retry limit %q: %w", n.data.Step.RetryPolicy.LimitStr, err)
}
retryPolicy.Limit = v
}
if n.data.Step.RetryPolicy.IntervalSecStr != "" {
v, err := cmdutil.SubstituteWithEnvExpandInt(n.data.Step.RetryPolicy.IntervalSecStr)
if err != nil {
return fmt.Errorf("failed to substitute retry interval %q: %w", n.data.Step.RetryPolicy.IntervalSecStr, err)
}
retryPolicy.Interval = time.Duration(v) * time.Second
}
n.retryPolicy = retryPolicy
return nil
}
4 changes: 2 additions & 2 deletions internal/digraph/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ func (sc *Scheduler) Schedule(ctx context.Context, graph *ExecutionGraph, done c
case sc.isCanceled():
sc.setLastError(execErr)

case node.data.Step.RetryPolicy != nil && node.data.Step.RetryPolicy.Limit > node.getRetryCount():
case node.retryPolicy.Limit > node.getRetryCount():
// retry
node.incRetryCount()
logger.Info(ctx, "Step execution failed. Retrying...", "step", node.data.Step.Name, "error", execErr, "retry", node.getRetryCount())
time.Sleep(node.data.Step.RetryPolicy.Interval)
time.Sleep(node.retryPolicy.Interval)
node.setRetriedAt(time.Now())
node.setStatus(NodeStatusNone)

Expand Down
6 changes: 3 additions & 3 deletions internal/digraph/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ func TestSchedulerRetryFail(t *testing.T) {
Name: "1",
Command: cmd,
ContinueOn: digraph.ContinueOn{Failure: true},
RetryPolicy: &digraph.RetryPolicy{Limit: 1},
RetryPolicy: digraph.RetryPolicy{Limit: 1},
},
digraph.Step{
Name: "2",
Command: cmd,
Args: []string{"flag"},
ContinueOn: digraph.ContinueOn{Failure: true},
RetryPolicy: &digraph.RetryPolicy{Limit: 1},
RetryPolicy: digraph.RetryPolicy{Limit: 1},
Depends: []string{"1"},
},
digraph.Step{
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestSchedulerRetrySuccess(t *testing.T) {
Command: cmd,
Args: []string{tmpFile},
Depends: []string{"1"},
RetryPolicy: &digraph.RetryPolicy{
RetryPolicy: digraph.RetryPolicy{
Limit: 10,
Interval: time.Millisecond * 800,
},
Expand Down
4 changes: 2 additions & 2 deletions internal/digraph/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type repeatPolicyDef struct {
}

type retryPolicyDef struct {
Limit int
IntervalSec int
Limit any
IntervalSec any
}

type smtpConfigDef struct {
Expand Down
12 changes: 9 additions & 3 deletions internal/digraph/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Step struct {
// ContinueOn contains the conditions to continue on failure or skipped.
ContinueOn ContinueOn `json:"ContinueOn,omitempty"`
// RetryPolicy contains the retry policy for the step.
RetryPolicy *RetryPolicy `json:"RetryPolicy,omitempty"`
RetryPolicy RetryPolicy `json:"RetryPolicy,omitempty"`
// RepeatPolicy contains the repeat policy for the step.
RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"`
// MailOnError is the flag to send mail on error.
Expand Down Expand Up @@ -102,8 +102,14 @@ type ExecutorConfig struct {

// RetryPolicy contains the retry policy for a step.
type RetryPolicy struct {
Limit int // Limit is the number of retries allowed.
Interval time.Duration // Interval is the time to wait between retries.
// Limit is the number of retries allowed.
Limit int `json:"Limit,omitempty"`
// Interval is the time to wait between retries.
Interval time.Duration `json:"Interval,omitempty"`
// LimitStr is the string representation of the limit.
LimitStr string `json:"LimitStr,omitempty"`
// IntervalSecStr is the string representation of the interval.
IntervalSecStr string `json:"IntervalSecStr,omitempty"`
}

// RepeatPolicy contains the repeat policy for a step.
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/model/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestStatusSerialization(t *testing.T) {
Name: "1", Description: "", Variables: []string{},
Dir: "dir", Command: "echo 1", Args: []string{},
Depends: []string{}, ContinueOn: digraph.ContinueOn{},
RetryPolicy: &digraph.RetryPolicy{}, MailOnError: false,
RetryPolicy: digraph.RetryPolicy{}, MailOnError: false,
RepeatPolicy: digraph.RepeatPolicy{}, Preconditions: []digraph.Condition{},
},
},
Expand Down

0 comments on commit 6ca7f2a

Please sign in to comment.