Skip to content

Commit

Permalink
improve timing of substitutes
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Dec 25, 2024
1 parent 4320210 commit f70edb9
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 38 deletions.
11 changes: 9 additions & 2 deletions cmd/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"time"

"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/fileutil"
"github.com/dagu-org/dagu/internal/logger"
Expand Down Expand Up @@ -52,17 +53,23 @@ type logFileSettings struct {
// openLogFile creates and opens a log file based on the provided settings.
// It creates the necessary directory structure and returns the file handle.
func openLogFile(config logFileSettings) (*os.File, error) {
logDir, err := cmdutil.SubstituteCommands(os.ExpandEnv(config.LogDir))
if err != nil {
return nil, fmt.Errorf("failed to expand log directory: %w", err)
}
config.LogDir = logDir

if err := validateSettings(config); err != nil {
return nil, fmt.Errorf("invalid log settings: %w", err)
}

logDir, err := setupLogDirectory(config)
outputDir, err := setupLogDirectory(config)
if err != nil {
return nil, fmt.Errorf("failed to setup log directory: %w", err)
}

filename := buildLogFilename(config)
return createLogFile(filepath.Join(logDir, filename))
return createLogFile(filepath.Join(outputDir, filename))
}

// validateSettings ensures all required fields are properly set
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/firefart/nonamedreturns v1.0.5 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/ghostiam/protogetter v0.3.8 // indirect
github.com/go-critic/go-critic v0.11.5 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand All @@ -106,6 +107,9 @@ require (
github.com/go-openapi/inflect v0.21.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/go-toolsmith/astcast v1.1.0 // indirect
github.com/go-toolsmith/astcopy v1.1.0 // indirect
github.com/go-toolsmith/astequal v1.2.0 // indirect
Expand Down Expand Up @@ -158,6 +162,7 @@ require (
github.com/lasiar/canonicalheader v1.1.2 // indirect
github.com/ldez/gomoddirectives v0.2.4 // indirect
github.com/ldez/tagliatelle v0.5.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/leonklingele/grouper v1.1.2 // indirect
github.com/macabu/inamedparam v0.1.3 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo=
github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/ghostiam/protogetter v0.3.8 h1:LYcXbYvybUyTIxN2Mj9h6rHrDZBDwZloPoKctWrFyJY=
github.com/ghostiam/protogetter v0.3.8/go.mod h1:WZ0nw9pfzsgxuRsPOFQomgDVSWtDLJRfQJEhsGbmQMA=
github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0=
Expand Down Expand Up @@ -233,6 +235,12 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58=
github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o=
github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI=
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
Expand Down Expand Up @@ -455,6 +463,8 @@ github.com/ldez/gomoddirectives v0.2.4 h1:j3YjBIjEBbqZ0NKtBNzr8rtMHTOrLPeiwTkfUJ
github.com/ldez/gomoddirectives v0.2.4/go.mod h1:oWu9i62VcQDYp9EQ0ONTfqLNh+mDLWWDO+SO0qSQw5g=
github.com/ldez/tagliatelle v0.5.0 h1:epgfuYt9v0CG3fms0pEgIMNPuFf/LpPIfjk4kyqSioo=
github.com/ldez/tagliatelle v0.5.0/go.mod h1:rj1HmWiL1MiKQuOONhd09iySTEkUuE/8+5jtPYz9xa4=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84YrjT3mIY=
github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA=
github.com/macabu/inamedparam v0.1.3 h1:2tk/phHkMlEL/1GNe/Yf6kkR/hkcUdAEY3L0hjYV1Mk=
Expand Down
23 changes: 14 additions & 9 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/dagu-org/dagu/internal/client"
"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/digraph/scheduler"
"github.com/dagu-org/dagu/internal/logger"
Expand Down Expand Up @@ -308,19 +309,23 @@ func (a *Agent) HandleHTTP(ctx context.Context) sock.HTTPHandlerFunc {

// setup the agent instance for DAG execution.
func (a *Agent) setup(ctx context.Context) error {
// Lock to prevent race condition.
a.lock.Lock()
defer a.lock.Unlock()

a.scheduler = a.newScheduler()
a.reporter = newReporter(
mailer.New(&mailer.NewMailerArgs{
Host: a.dag.SMTP.Host,
Port: a.dag.SMTP.Port,
Username: a.dag.SMTP.Username,
Password: a.dag.SMTP.Password,
}),
)

mailerConfig, err := cmdutil.SubstituteStringFields(mailer.Config{
Host: a.dag.SMTP.Host,
Port: a.dag.SMTP.Port,
Username: a.dag.SMTP.Username,
Password: a.dag.SMTP.Password,
})
if err != nil {
return fmt.Errorf("failed to substitute mailer options: %w", err)
}

mailer := mailer.New(mailerConfig)
a.reporter = newReporter(mailer)

return a.setupGraph(ctx)
}
Expand Down
2 changes: 0 additions & 2 deletions internal/cmdutil/cmdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ func GetShellCommand(configuredShell string) string {
// modified struct value.
func SubstituteStringFields[T any](obj T) (T, error) {
v := reflect.ValueOf(obj)
k := v.Kind()
println(k)
if v.Kind() != reflect.Struct {
return obj, fmt.Errorf("input must be a struct, got %T", obj)
}
Expand Down
10 changes: 2 additions & 8 deletions internal/digraph/builder_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"os"
"strings"
"time"

"github.com/dagu-org/dagu/internal/cmdutil"
)

var builderRegistry = []builderEntry{
Expand Down Expand Up @@ -109,12 +107,8 @@ func buildSteps(ctx BuildContext, spec *definition, dag *DAG) error {

// buildLogDir builds the log directory for the DAG.
func buildLogDir(_ BuildContext, spec *definition, dag *DAG) (err error) {
logDir, err := cmdutil.SubstituteCommands(os.ExpandEnv(spec.LogDir))
if err != nil {
return err
}
dag.LogDir = logDir
return err
dag.LogDir = spec.LogDir
return nil
}

// buildHandlers builds the handlers for the DAG.
Expand Down
3 changes: 0 additions & 3 deletions internal/digraph/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type DAG struct {
// replaced with '_').
LogDir string `json:"LogDir"`

// LogFormat is the format of the log (text or json).
LogFormat string `json:"LogFormat"`

// Parameters configuration.
// The DAG definition contains only DefaultParams. Params are automatically
// set by the DAG loader.
Expand Down
38 changes: 27 additions & 11 deletions internal/digraph/executors/mail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import (
"io"
"os"

"github.com/dagu-org/dagu/internal/cmdutil"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/mailer"
govalidator "github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
)

var validate = govalidator.New()

type mail struct {
stdout io.Writer
stderr io.Writer
Expand All @@ -22,35 +26,47 @@ type mail struct {
}

type mailConfig struct {
From string `mapstructure:"from"`
To string `mapstructure:"to"`
Subject string `mapstructure:"subject"`
From string `mapstructure:"from" validate:"required,email"`
To string `mapstructure:"to" validate:"required,email"`
Subject string `mapstructure:"subject" validate:"required"`
Message string `mapstructure:"message"`
}

func newMail(ctx context.Context, step digraph.Step) (Executor, error) {
var cfg mailConfig
if err := decodeMailConfig(step.ExecutorConfig.Config, &cfg); err != nil {
var mailConfig mailConfig
if err := decodeMailConfig(step.ExecutorConfig.Config, &mailConfig); err != nil {
return nil, err
}

cfg.From = os.ExpandEnv(cfg.From)
cfg.To = os.ExpandEnv(cfg.To)
cfg.Subject = os.ExpandEnv(cfg.Subject)
cfg.Message = os.ExpandEnv(cfg.Message)
mailConfig, err := cmdutil.SubstituteStringFields(mailConfig)
if err != nil {
return nil, fmt.Errorf("failed to substitute mail config: %w", err)
}

// Validate the configuration.
if err := validate.Struct(mailConfig); err != nil {
return nil, fmt.Errorf("invalid mail configuration: %w", err)
}

exec := &mail{cfg: &cfg}
exec := &mail{cfg: &mailConfig}

dagCtx, err := digraph.GetContext(ctx)
if err != nil {
return nil, err
}
m := mailer.New(&mailer.NewMailerArgs{

mailerConfig, err := cmdutil.SubstituteStringFields(mailer.Config{
Host: dagCtx.DAG.SMTP.Host,
Port: dagCtx.DAG.SMTP.Port,
Username: dagCtx.DAG.SMTP.Username,
Password: dagCtx.DAG.SMTP.Password,
})

if err != nil {
return nil, fmt.Errorf("failed to substitute mailer options: %w", err)
}

m := mailer.New(mailerConfig)
exec.mailer = m

return exec, nil
Expand Down
93 changes: 93 additions & 0 deletions internal/digraph/executors/mail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package executors

import (
"context"
"os"
"testing"

"github.com/dagu-org/dagu/internal/digraph"
"github.com/stretchr/testify/assert"
)

func TestMail(t *testing.T) {
t.Parallel()

os.Setenv("MAIL_SUBJECT", "Test Subject")
t.Cleanup(func() {
os.Unsetenv("MAIL_SUBJECT")
})

t.Run("NewMail", func(t *testing.T) {
tests := []struct {
name string
step digraph.Step
expectedError bool
}{
{
name: "ValidConfig",
step: digraph.Step{
ExecutorConfig: digraph.ExecutorConfig{
Config: map[string]any{
"from": "[email protected]",
"to": "[email protected]",
"subject": "Test Subject",
"message": "Test Message",
},
},
},
expectedError: false,
},
{
name: "ValidConfigWithEnv",
step: digraph.Step{
ExecutorConfig: digraph.ExecutorConfig{
Config: map[string]any{
"from": "[email protected]",
"to": "[email protected]",
"subject": "${MAIL_SUBJECT}",
"message": "Test Message",
},
},
},
expectedError: false,
},
{
name: "InvalidConfig",
step: digraph.Step{
ExecutorConfig: digraph.ExecutorConfig{
Config: map[string]any{
"from": "[email protected]",
},
},
},
expectedError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
ctx = digraph.NewContext(ctx, &digraph.DAG{
SMTP: &digraph.SMTPConfig{},
}, nil, nil, "", "")

exec, err := newMail(ctx, tt.step)

if tt.expectedError {
assert.Error(t, err)
assert.Nil(t, exec)
} else {
assert.NoError(t, err)
assert.NotNil(t, exec)

mailExec, ok := exec.(*mail)
assert.True(t, ok)
assert.Equal(t, "[email protected]", mailExec.cfg.From)
assert.Equal(t, "[email protected]", mailExec.cfg.To)
assert.Equal(t, "Test Subject", mailExec.cfg.Subject)
assert.Equal(t, "Test Message", mailExec.cfg.Message)
}
})
}
})
}
6 changes: 3 additions & 3 deletions internal/mailer/mailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type Mailer struct {
password string
}

// NewMailerArgs is a config for SMTP mailer.
type NewMailerArgs struct {
// Config is a config for SMTP mailer.
type Config struct {
Host string
Port string
Username string
Password string
}

func New(cfg *NewMailerArgs) *Mailer {
func New(cfg Config) *Mailer {
return &Mailer{
host: cfg.Host,
port: cfg.Port,
Expand Down

0 comments on commit f70edb9

Please sign in to comment.