Skip to content

Commit

Permalink
digraph: fix issues in command and args handling (#780)
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta authored Jan 9, 2025
1 parent 0614bd5 commit 5c6abd4
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 119 deletions.
2 changes: 2 additions & 0 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,7 @@ func runDry(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", dag.Name, requestID, err)
}

agt.PrintSummary(ctx)

return nil
}
3 changes: 3 additions & 0 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ func executeRetry(ctx context.Context, dag *digraph.DAG, setup *setup, originalS
agt.PrintSummary(ctx)
return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", dag.Name, newRequestID, err)
}
}

if !quiet {
agt.PrintSummary(ctx)
}

return nil
Expand Down
4 changes: 4 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func executeDag(ctx context.Context, setup *setup, specPath string, loadOpts []d
}
}

if !quiet {
agt.PrintSummary(ctx)
}

return nil
}

Expand Down
19 changes: 0 additions & 19 deletions docs/source/yaml_format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -542,25 +542,6 @@ Execute steps periodically:
repeat: true
intervalSec: 60
User Defined Functions
~~~~~~~~~~~~~~~~~~~
Create reusable task templates:

.. code-block:: yaml
functions:
- name: my_function
params: param1 param2
command: python main.py $param1 $param2
steps:
- name: use function
call:
function: my_function
args:
param1: 1
param2: 2
Field Reference
-------------

Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestAgent_Retry(t *testing.T) {
// Modify the DAG to make it successful
status := dagAgent.Status()
for i := range status.Nodes {
status.Nodes[i].Step.CmdWithArgs = "true"
status.Nodes[i].Step.CmdArgsSys = "true"
}

// Retry the DAG and check if it is successful
Expand Down
19 changes: 19 additions & 0 deletions internal/cmdutil/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ import (
"unicode"
)

// ArgsDelimiter is the delimiter used to separate command arguments
const ArgsDelimiter = "∯ᓰ♨"

// JoinCommandArgs joins a command and its arguments into a single string
// separated by ArgsDelimiter
func JoinCommandArgs(cmd string, args []string) string {
return fmt.Sprintf("%s %s", cmd, strings.Join(args, ArgsDelimiter))
}

// SplitCommandArgs splits a command and its arguments into a command and a slice of arguments
func SplitCommandArgs(cmdWithArgs string) (string, []string) {
parts := strings.SplitN(cmdWithArgs, " ", 2)
if len(parts) == 1 {
return parts[0], nil
}
command, args := parts[0], parts[1]
return command, strings.Split(args, ArgsDelimiter)
}

// GetShellCommand returns the shell to use for command execution
func GetShellCommand(configuredShell string) string {
if configuredShell != "" {
Expand Down
6 changes: 6 additions & 0 deletions internal/cmdutil/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func TestSplitCommand(t *testing.T) {
wantCmd: "echo",
wantArgs: []string{`"\"hello world\""`},
},
{
name: "command with JSON",
input: `echo "{\n\t\"key\": \"value\"\n}"`,
wantCmd: "echo",
wantArgs: []string{`"{\n\t\"key\": \"value\"\n}"`},
},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion internal/digraph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type builderEntry struct {
}

var stepBuilderRegistry = []stepBuilderEntry{
{name: "executor", fn: buildExecutor},
{name: "command", fn: buildCommand},
{name: "depends", fn: buildDepends},
{name: "executor", fn: buildExecutor},
{name: "subworkflow", fn: buildSubWorkflow},
{name: "continueOn", fn: buildContinueOn},
{name: "retryPolicy", fn: buildRetryPolicy},
Expand Down
9 changes: 7 additions & 2 deletions internal/digraph/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -288,15 +289,19 @@ func TestBuildStep(t *testing.T) {
t.Run("ValidCommandInArray", func(t *testing.T) {
th := loadTestYAML(t, "valid_command_in_array.yaml")
assert.Len(t, th.Steps, 1)
assert.Equal(t, `echo "1"`, th.Steps[0].CmdWithArgs)
assert.Equal(t,
cmdutil.JoinCommandArgs("echo", []string{"1"}),
th.Steps[0].CmdArgsSys)
assert.Equal(t, "echo", th.Steps[0].Command)
assert.Equal(t, []string{"1"}, th.Steps[0].Args)
assert.Equal(t, "step 1", th.Steps[0].Name)
})
t.Run("ValidCommandInList", func(t *testing.T) {
th := loadTestYAML(t, "valid_command_in_list.yaml")
assert.Len(t, th.Steps, 1)
assert.Equal(t, `echo "1"`, th.Steps[0].CmdWithArgs)
assert.Equal(t,
cmdutil.JoinCommandArgs("echo", []string{"1"}),
th.Steps[0].CmdArgsSys)
assert.Equal(t, "echo", th.Steps[0].Command)
assert.Equal(t, []string{"1"}, th.Steps[0].Args)
assert.Equal(t, "step 1", th.Steps[0].Name)
Expand Down
13 changes: 10 additions & 3 deletions internal/digraph/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ func buildCommand(_ BuildContext, def stepDef, step *Step) error {

case []any:
// Case 3: command is an array

var command string
var args []string
for _, v := range val {
val, ok := v.(string)
if !ok {
// If the value is not a string, convert it to a string.
// This is useful when the value is an integer for example.
val = fmt.Sprintf("%v", v)
}
if step.Command == "" {
step.Command = val
if command == "" {
command = val
continue
}
step.Args = append(step.Args, val)
args = append(args, val)
}

// Setup CmdWithArgs (this will be actually used in the command execution)
Expand All @@ -77,7 +80,11 @@ func buildCommand(_ BuildContext, def stepDef, step *Step) error {
}
sb.WriteString(fmt.Sprintf("%q", arg))
}

step.Command = command
step.Args = args
step.CmdWithArgs = fmt.Sprintf("%s %s", step.Command, sb.String())
step.CmdArgsSys = cmdutil.JoinCommandArgs(step.Command, step.Args)

default:
// Unknown type for command field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

type StepContext struct {
Context

outputVariables *SyncMap
step Step
envs map[string]string
Expand Down
24 changes: 6 additions & 18 deletions internal/digraph/executor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/logger"
)

var _ Executor = (*commandExecutor)(nil)
Expand Down Expand Up @@ -102,22 +101,12 @@ func newCommand(ctx context.Context, step digraph.Step) (Executor, error) {
}

func createCommand(ctx context.Context, step digraph.Step) (*exec.Cmd, error) {
stepContext := digraph.GetStepContext(ctx)
var args []string
for _, arg := range step.Args {
ret, err := stepContext.EvalString(arg, cmdutil.OnlyReplaceVars())
if err != nil {
logger.Error(ctx, "Failed to evaluate string", "arg", arg, "err", err)
return nil, err
}
args = append(args, ret)
}

shellCommand := cmdutil.GetShellCommand(step.Shell)
if shellCommand == "" {
return createDirectCommand(ctx, step, args), nil
shellCmdArgs := step.ShellCmdArgs
if shellCommand == "" || shellCmdArgs == "" {
return createDirectCommand(ctx, step, step.Args), nil
}
return createShellCommand(ctx, shellCommand, step, args), nil
return createShellCommand(ctx, shellCommand, shellCmdArgs), nil
}

// createDirectCommand creates a command that runs directly without a shell
Expand All @@ -127,7 +116,6 @@ func createDirectCommand(ctx context.Context, step digraph.Step, args []string)
}

// createShellCommand creates a command that runs through a shell
func createShellCommand(ctx context.Context, shell string, step digraph.Step, args []string) *exec.Cmd {
command := cmdutil.BuildCommandEscapedString(step.Command, args)
return exec.CommandContext(ctx, shell, "-c", command)
func createShellCommand(ctx context.Context, shell, shellCmd string) *exec.Cmd {
return exec.CommandContext(ctx, shell, "-c", shellCmd)
}
121 changes: 101 additions & 20 deletions internal/digraph/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Node struct {
scriptFile *os.File
done bool
retryPolicy retryPolicy
cmdEvaluated bool
}

type NodeData struct {
Expand Down Expand Up @@ -228,7 +229,7 @@ func (n *Node) State() NodeState {

// Execute runs the command synchronously and returns error if any.
func (n *Node) Execute(ctx context.Context) error {
cmd, err := n.SetupExec(ctx)
cmd, err := n.setupExec(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -309,7 +310,7 @@ func (n *Node) Finish() {
n.data.State.FinishedAt = time.Now()
}

func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) {
func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) {
n.mu.Lock()
defer n.mu.Unlock()

Expand All @@ -324,24 +325,9 @@ func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) {
n.data.State.Error = nil
n.data.State.ExitCode = 0

if n.data.Step.CmdWithArgs != "" {
// Expand envs
stepContext := digraph.GetStepContext(ctx)
cmdWithArgs, err := stepContext.EvalString(n.data.Step.CmdWithArgs, cmdutil.WithoutExpandEnv())
if err != nil {
return nil, err
}
cmd, args, err := cmdutil.SplitCommandWithSub(cmdWithArgs)
if err != nil {
return nil, fmt.Errorf("failed to split command: %w", err)
}
n.data.Step.Command = cmd
n.data.Step.Args = args
}

if n.data.Step.Command == "" {
// If the command is empty, use the default shell as the command
n.data.Step.Command = cmdutil.GetShellCommand(n.data.Step.Shell)
// Evaluate the command and args if not already evaluated
if err := n.evaluateCommandArgs(ctx); err != nil {
return nil, err
}

if n.scriptFile != nil {
Expand Down Expand Up @@ -385,6 +371,101 @@ func (n *Node) SetupExec(ctx context.Context) (executor.Executor, error) {
return cmd, nil
}

func (n *Node) evaluateCommandArgs(ctx context.Context) error {
if n.cmdEvaluated {
return nil
}

stepContext := digraph.GetStepContext(ctx)
switch {
case n.data.Step.CmdArgsSys != "":
// In case of the command and args are defined as a list. In this case,
// CmdArgsSys is a string with the command and args separated by special markers.
cmd, args := cmdutil.SplitCommandArgs(n.data.Step.CmdArgsSys)
for i, arg := range args {
value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv())
if err != nil {
return fmt.Errorf("failed to eval command with args: %w", err)
}
args[i] = value
}
n.data.Step.Command = cmd
n.data.Step.Args = args

if n.data.Step.ExecutorConfig.IsCommand() {
n.data.Step.ShellCmdArgs = cmdutil.BuildCommandEscapedString(cmd, args)
}

case n.data.Step.CmdWithArgs != "":
// In case of the command and args are defined as a string.
stepContext := digraph.GetStepContext(ctx)
cmdWithArgs, err := stepContext.EvalString(n.data.Step.CmdWithArgs, cmdutil.WithoutExpandEnv())
if err != nil {
return err
}

// Use user defined command as the shell command args that should be already a valid command.
if n.data.Step.ExecutorConfig.IsCommand() {
n.data.Step.ShellCmdArgs = cmdWithArgs
}

// Split the command and args in case shell is not available in the system.
// In this case, the command and args need to be split to run the command directly.
cmd, args, err := cmdutil.SplitCommand(cmdWithArgs)
if err != nil {
return fmt.Errorf("failed to split command with args: %w", err)
}

n.data.Step.Command = cmd
n.data.Step.Args = args

case n.data.Step.Command == "":
// If the command is empty, use the default shell as the command
n.data.Step.Command = cmdutil.GetShellCommand(n.data.Step.Shell)

case n.data.Step.Command != "" && len(n.data.Step.Args) == 0:
// Shouldn't reach here except for testing.

cmd, args, err := cmdutil.SplitCommand(n.data.Step.Command)
if err != nil {
return fmt.Errorf("failed to split command: %w", err)
}
for i, arg := range args {
value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv())
if err != nil {
return fmt.Errorf("failed to eval command args: %w", err)
}
args[i] = value
}

n.data.Step.CmdWithArgs = n.data.Step.Command
n.data.Step.Command = cmd
n.data.Step.Args = args

default:
// Shouldn't reach here except for testing.

if n.data.Step.Command != "" {
value, err := stepContext.EvalString(n.data.Step.Command, cmdutil.WithoutExpandEnv())
if err != nil {
return fmt.Errorf("failed to eval command: %w", err)
}
n.data.Step.Command = value
}

for i, arg := range n.data.Step.Args {
value, err := stepContext.EvalString(arg, cmdutil.WithoutExpandEnv())
if err != nil {
return fmt.Errorf("failed to eval command args: %w", err)
}
n.data.Step.Args[i] = value
}
}

n.cmdEvaluated = true
return nil
}

func (n *Node) GetRetryCount() int {
n.mu.RLock()
defer n.mu.RUnlock()
Expand Down
Loading

0 comments on commit 5c6abd4

Please sign in to comment.