From 6ca7f2adc13d3154910d3110bcb028c966258039 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Wed, 25 Dec 2024 17:52:14 +0900 Subject: [PATCH] builder: Enable Command Substitution for RetryPolicy configuration (#747) --- internal/cmdutil/cmdutil.go | 21 +++++++++++ internal/digraph/builder.go | 19 ++++++++-- internal/digraph/scheduler/node.go | 39 ++++++++++++++++++++ internal/digraph/scheduler/scheduler.go | 4 +- internal/digraph/scheduler/scheduler_test.go | 6 +-- internal/digraph/spec.go | 4 +- internal/digraph/step.go | 12 ++++-- internal/persistence/model/status_test.go | 2 +- 8 files changed, 93 insertions(+), 14 deletions(-) diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 9f43ada6b..2e9f20743 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -9,6 +9,7 @@ import ( "os/exec" "reflect" "regexp" + "strconv" "strings" "unicode" @@ -180,6 +181,26 @@ func SplitCommand(cmd string) (string, []string, error) { return command[0], command[1:], nil } +// SubstituteWithEnvExpand substitutes environment variables and commands in the input string +func SubstituteWithEnvExpand(input string) (string, error) { + expanded := os.ExpandEnv(input) + return SubstituteCommands(expanded) +} + +// SubstituteWithEnvExpandInt substitutes environment variables and commands in the input string +func SubstituteWithEnvExpandInt(input string) (int, error) { + expanded := os.ExpandEnv(input) + expanded, err := SubstituteCommands(expanded) + if err != nil { + return 0, err + } + v, err := strconv.Atoi(expanded) + if err != nil { + return 0, fmt.Errorf("failed to convert %q to int: %w", expanded, err) + } + return v, nil +} + // tickerMatcher matches the command in the value string. // Example: "`date`" var tickerMatcher = regexp.MustCompile("`[^`]+`") diff --git a/internal/digraph/builder.go b/internal/digraph/builder.go index f8ada0c68..5de44643a 100644 --- a/internal/digraph/builder.go +++ b/internal/digraph/builder.go @@ -449,9 +449,22 @@ func buildContinueOn(_ BuildContext, def stepDef, step *Step) error { func buildRetryPolicy(_ BuildContext, def stepDef, step *Step) error { if def.RetryPolicy != nil { - step.RetryPolicy = &RetryPolicy{ - Limit: def.RetryPolicy.Limit, - Interval: time.Second * time.Duration(def.RetryPolicy.IntervalSec), + switch v := def.RetryPolicy.Limit.(type) { + case int: + step.RetryPolicy.Limit = v + case string: + step.RetryPolicy.LimitStr = v + default: + return fmt.Errorf("invalid type for retryPolicy.Limit: %T", v) + } + + switch v := def.RetryPolicy.IntervalSec.(type) { + case int: + step.RetryPolicy.Interval = time.Second * time.Duration(v) + case string: + step.RetryPolicy.IntervalSecStr = v + default: + return fmt.Errorf("invalid type for retryPolicy.IntervalSec: %T", v) } } return nil diff --git a/internal/digraph/scheduler/node.go b/internal/digraph/scheduler/node.go index 5528ca757..38405db2a 100644 --- a/internal/digraph/scheduler/node.go +++ b/internal/digraph/scheduler/node.go @@ -45,6 +45,7 @@ type Node struct { outputReader *os.File scriptFile *os.File done bool + retryPolicy retryPolicy } type NodeData struct { @@ -342,6 +343,10 @@ func (n *Node) setup(logDir string, requestID string) error { if err := n.setupStderr(); err != nil { return err } + if err := n.setupRetryPolicy(); err != nil { + return err + } + return n.setupScript() } @@ -491,3 +496,37 @@ func (n *Node) init() { n.data.Step.Preconditions = []digraph.Condition{} } } + +type retryPolicy struct { + Limit int + Interval time.Duration +} + +func (n *Node) setupRetryPolicy() error { + var retryPolicy retryPolicy + + if n.data.Step.RetryPolicy.Limit > 0 { + retryPolicy.Limit = n.data.Step.RetryPolicy.Limit + } + if n.data.Step.RetryPolicy.Interval > 0 { + retryPolicy.Interval = n.data.Step.RetryPolicy.Interval + } + // Evaluate the the configuration if it's configured as a string + // e.g. environment variable or command substitution + if n.data.Step.RetryPolicy.LimitStr != "" { + v, err := cmdutil.SubstituteWithEnvExpandInt(n.data.Step.RetryPolicy.LimitStr) + if err != nil { + return fmt.Errorf("failed to substitute retry limit %q: %w", n.data.Step.RetryPolicy.LimitStr, err) + } + retryPolicy.Limit = v + } + if n.data.Step.RetryPolicy.IntervalSecStr != "" { + v, err := cmdutil.SubstituteWithEnvExpandInt(n.data.Step.RetryPolicy.IntervalSecStr) + if err != nil { + return fmt.Errorf("failed to substitute retry interval %q: %w", n.data.Step.RetryPolicy.IntervalSecStr, err) + } + retryPolicy.Interval = time.Duration(v) * time.Second + } + n.retryPolicy = retryPolicy + return nil +} diff --git a/internal/digraph/scheduler/scheduler.go b/internal/digraph/scheduler/scheduler.go index 261b72151..4047f1c19 100644 --- a/internal/digraph/scheduler/scheduler.go +++ b/internal/digraph/scheduler/scheduler.go @@ -183,11 +183,11 @@ func (sc *Scheduler) Schedule(ctx context.Context, graph *ExecutionGraph, done c case sc.isCanceled(): sc.setLastError(execErr) - case node.data.Step.RetryPolicy != nil && node.data.Step.RetryPolicy.Limit > node.getRetryCount(): + case node.retryPolicy.Limit > node.getRetryCount(): // retry node.incRetryCount() logger.Info(ctx, "Step execution failed. Retrying...", "step", node.data.Step.Name, "error", execErr, "retry", node.getRetryCount()) - time.Sleep(node.data.Step.RetryPolicy.Interval) + time.Sleep(node.retryPolicy.Interval) node.setRetriedAt(time.Now()) node.setStatus(NodeStatusNone) diff --git a/internal/digraph/scheduler/scheduler_test.go b/internal/digraph/scheduler/scheduler_test.go index 645e3e7a9..c09268a75 100644 --- a/internal/digraph/scheduler/scheduler_test.go +++ b/internal/digraph/scheduler/scheduler_test.go @@ -210,14 +210,14 @@ func TestSchedulerRetryFail(t *testing.T) { Name: "1", Command: cmd, ContinueOn: digraph.ContinueOn{Failure: true}, - RetryPolicy: &digraph.RetryPolicy{Limit: 1}, + RetryPolicy: digraph.RetryPolicy{Limit: 1}, }, digraph.Step{ Name: "2", Command: cmd, Args: []string{"flag"}, ContinueOn: digraph.ContinueOn{Failure: true}, - RetryPolicy: &digraph.RetryPolicy{Limit: 1}, + RetryPolicy: digraph.RetryPolicy{Limit: 1}, Depends: []string{"1"}, }, digraph.Step{ @@ -256,7 +256,7 @@ func TestSchedulerRetrySuccess(t *testing.T) { Command: cmd, Args: []string{tmpFile}, Depends: []string{"1"}, - RetryPolicy: &digraph.RetryPolicy{ + RetryPolicy: digraph.RetryPolicy{ Limit: 10, Interval: time.Millisecond * 800, }, diff --git a/internal/digraph/spec.go b/internal/digraph/spec.go index be1bc9127..aa1e92c33 100644 --- a/internal/digraph/spec.go +++ b/internal/digraph/spec.go @@ -90,8 +90,8 @@ type repeatPolicyDef struct { } type retryPolicyDef struct { - Limit int - IntervalSec int + Limit any + IntervalSec any } type smtpConfigDef struct { diff --git a/internal/digraph/step.go b/internal/digraph/step.go index dc20e635d..0e235ef56 100644 --- a/internal/digraph/step.go +++ b/internal/digraph/step.go @@ -47,7 +47,7 @@ type Step struct { // ContinueOn contains the conditions to continue on failure or skipped. ContinueOn ContinueOn `json:"ContinueOn,omitempty"` // RetryPolicy contains the retry policy for the step. - RetryPolicy *RetryPolicy `json:"RetryPolicy,omitempty"` + RetryPolicy RetryPolicy `json:"RetryPolicy,omitempty"` // RepeatPolicy contains the repeat policy for the step. RepeatPolicy RepeatPolicy `json:"RepeatPolicy,omitempty"` // MailOnError is the flag to send mail on error. @@ -102,8 +102,14 @@ type ExecutorConfig struct { // RetryPolicy contains the retry policy for a step. type RetryPolicy struct { - Limit int // Limit is the number of retries allowed. - Interval time.Duration // Interval is the time to wait between retries. + // Limit is the number of retries allowed. + Limit int `json:"Limit,omitempty"` + // Interval is the time to wait between retries. + Interval time.Duration `json:"Interval,omitempty"` + // LimitStr is the string representation of the limit. + LimitStr string `json:"LimitStr,omitempty"` + // IntervalSecStr is the string representation of the interval. + IntervalSecStr string `json:"IntervalSecStr,omitempty"` } // RepeatPolicy contains the repeat policy for a step. diff --git a/internal/persistence/model/status_test.go b/internal/persistence/model/status_test.go index e63ce6d2f..71532594c 100644 --- a/internal/persistence/model/status_test.go +++ b/internal/persistence/model/status_test.go @@ -34,7 +34,7 @@ func TestStatusSerialization(t *testing.T) { Name: "1", Description: "", Variables: []string{}, Dir: "dir", Command: "echo 1", Args: []string{}, Depends: []string{}, ContinueOn: digraph.ContinueOn{}, - RetryPolicy: &digraph.RetryPolicy{}, MailOnError: false, + RetryPolicy: digraph.RetryPolicy{}, MailOnError: false, RepeatPolicy: digraph.RepeatPolicy{}, Preconditions: []digraph.Condition{}, }, },