Skip to content

Commit 3093ae2

Browse files
authored
builder: Enable Command Substitution for Email / SMTP Configuration (#746)
1 parent 2541c4e commit 3093ae2

30 files changed

+785
-554
lines changed

cmd/logger.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"path/filepath"
1010
"time"
1111

12+
"github.com/dagu-org/dagu/internal/cmdutil"
1213
"github.com/dagu-org/dagu/internal/config"
1314
"github.com/dagu-org/dagu/internal/fileutil"
1415
"github.com/dagu-org/dagu/internal/logger"
@@ -52,17 +53,23 @@ type logFileSettings struct {
5253
// openLogFile creates and opens a log file based on the provided settings.
5354
// It creates the necessary directory structure and returns the file handle.
5455
func openLogFile(config logFileSettings) (*os.File, error) {
56+
logDir, err := cmdutil.SubstituteCommands(os.ExpandEnv(config.LogDir))
57+
if err != nil {
58+
return nil, fmt.Errorf("failed to expand log directory: %w", err)
59+
}
60+
config.LogDir = logDir
61+
5562
if err := validateSettings(config); err != nil {
5663
return nil, fmt.Errorf("invalid log settings: %w", err)
5764
}
5865

59-
logDir, err := setupLogDirectory(config)
66+
outputDir, err := setupLogDirectory(config)
6067
if err != nil {
6168
return nil, fmt.Errorf("failed to setup log directory: %w", err)
6269
}
6370

6471
filename := buildLogFilename(config)
65-
return createLogFile(filepath.Join(logDir, filename))
72+
return createLogFile(filepath.Join(outputDir, filename))
6673
}
6774

6875
// validateSettings ensures all required fields are properly set

internal/agent/agent.go

+23-7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http"
1111
"os"
1212
"regexp"
13+
"runtime/debug"
1314
"strings"
1415
"sync"
1516
"sync/atomic"
@@ -140,12 +141,12 @@ func (a *Agent) Run(ctx context.Context) error {
140141
return fmt.Errorf("failed to setup unix socket server: %w", err)
141142
}
142143
listenerErrCh := make(chan error)
143-
go func() {
144+
go execWithRecovery(ctx, func() {
144145
err := a.socketServer.Serve(ctx, listenerErrCh)
145146
if err != nil && !errors.Is(err, sock.ErrServerRequestedShutdown) {
146147
logger.Error(ctx, "Failed to start socket frontend", "err", err)
147148
}
148-
}()
149+
})
149150

150151
// Stop the socket server when finishing the DAG execution.
151152
defer func() {
@@ -164,7 +165,7 @@ func (a *Agent) Run(ctx context.Context) error {
164165
// example, when started, stopped, or cancelled, etc.
165166
done := make(chan *scheduler.Node)
166167
defer close(done)
167-
go func() {
168+
go execWithRecovery(ctx, func() {
168169
for node := range done {
169170
status := a.Status()
170171
if err := a.historyStore.Write(ctx, status); err != nil {
@@ -174,19 +175,19 @@ func (a *Agent) Run(ctx context.Context) error {
174175
logger.Error(ctx, "Failed to report step", "err", err)
175176
}
176177
}
177-
}()
178+
})
178179

179180
// Write the first status just after the start to store the running status.
180181
// If the DAG is already finished, skip it.
181-
go func() {
182+
go execWithRecovery(ctx, func() {
182183
time.Sleep(waitForRunning)
183184
if a.finished.Load() {
184185
return
185186
}
186187
if err := a.historyStore.Write(ctx, a.Status()); err != nil {
187188
logger.Error(ctx, "Status write failed", "err", err)
188189
}
189-
}()
190+
})
190191

191192
// Start the DAG execution.
192193
logger.Info(ctx, "DAG execution started", "reqId", a.requestID, "name", a.dag.Name, "params", a.dag.Params)
@@ -314,7 +315,7 @@ func (a *Agent) setup(ctx context.Context) error {
314315

315316
a.scheduler = a.newScheduler()
316317
a.reporter = newReporter(
317-
mailer.New(&mailer.NewMailerArgs{
318+
mailer.New(mailer.Config{
318319
Host: a.dag.SMTP.Host,
319320
Port: a.dag.SMTP.Port,
320321
Username: a.dag.SMTP.Username,
@@ -501,6 +502,21 @@ func (a *Agent) checkIsAlreadyRunning(ctx context.Context) error {
501502
return nil
502503
}
503504

505+
func execWithRecovery(ctx context.Context, fn func()) {
506+
defer func() {
507+
if panicObj := recover(); panicObj != nil {
508+
err, ok := panicObj.(error)
509+
if !ok {
510+
err = fmt.Errorf("panic: %v", panicObj)
511+
}
512+
st := string(debug.Stack())
513+
logger.Error(ctx, "Panic occurred", "err", err, "st", st)
514+
}
515+
}()
516+
517+
fn()
518+
}
519+
504520
type httpError struct {
505521
Code int
506522
Message string

internal/agent/reporter.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (r *reporter) reportStep(
5252
}
5353

5454
// report is a function that reports the status of the scheduler.
55-
func (r *reporter) getSummary(ctx context.Context, status *model.Status, err error) string {
55+
func (r *reporter) getSummary(_ context.Context, status *model.Status, err error) string {
5656
var buf bytes.Buffer
5757
_, _ = buf.Write([]byte("\n"))
5858
_, _ = buf.Write([]byte("Summary ->\n"))

internal/cmdutil/cmdutil.go

+48
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"os"
99
"os/exec"
10+
"reflect"
1011
"regexp"
1112
"strings"
1213
"unicode"
@@ -229,3 +230,50 @@ func GetShellCommand(configuredShell string) string {
229230

230231
return ""
231232
}
233+
234+
// SubstituteStringFields processes all string fields in a struct by expanding environment
235+
// variables and substituting command outputs. It takes a struct value and returns a new
236+
// modified struct value.
237+
func SubstituteStringFields[T any](obj T) (T, error) {
238+
v := reflect.ValueOf(obj)
239+
if v.Kind() != reflect.Struct {
240+
return obj, fmt.Errorf("input must be a struct, got %T", obj)
241+
}
242+
243+
modified := reflect.New(v.Type()).Elem()
244+
modified.Set(v)
245+
246+
if err := processStructFields(modified); err != nil {
247+
return obj, fmt.Errorf("failed to process fields: %w", err)
248+
}
249+
250+
return modified.Interface().(T), nil
251+
}
252+
253+
func processStructFields(v reflect.Value) error {
254+
t := v.Type()
255+
for i := 0; i < v.NumField(); i++ {
256+
field := v.Field(i)
257+
if !field.CanSet() {
258+
continue
259+
}
260+
261+
// nolint:exhaustive
262+
switch field.Kind() {
263+
case reflect.String:
264+
value := field.String()
265+
value = os.ExpandEnv(value)
266+
processed, err := SubstituteCommands(value)
267+
if err != nil {
268+
return fmt.Errorf("field %q: %w", t.Field(i).Name, err)
269+
}
270+
field.SetString(processed)
271+
272+
case reflect.Struct:
273+
if err := processStructFields(field); err != nil {
274+
return err
275+
}
276+
}
277+
}
278+
return nil
279+
}

internal/cmdutil/cmdutil_test.go

+155
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package cmdutil
55

66
import (
77
"os"
8+
"reflect"
89
"testing"
910

1011
"github.com/stretchr/testify/require"
@@ -168,3 +169,157 @@ func TestSplitCommandWithParse(t *testing.T) {
168169
require.Equal(t, "hello", args[0])
169170
})
170171
}
172+
173+
func TestSubstituteStringFields(t *testing.T) {
174+
// Set up test environment variables
175+
os.Setenv("TEST_VAR", "test_value")
176+
os.Setenv("NESTED_VAR", "nested_value")
177+
defer os.Unsetenv("TEST_VAR")
178+
defer os.Unsetenv("NESTED_VAR")
179+
180+
type Nested struct {
181+
NestedField string
182+
NestedCommand string
183+
unexported string
184+
}
185+
186+
type TestStruct struct {
187+
SimpleField string
188+
EnvField string
189+
CommandField string
190+
MultiField string
191+
EmptyField string
192+
unexported string
193+
NestedStruct Nested
194+
}
195+
196+
tests := []struct {
197+
name string
198+
input TestStruct
199+
want TestStruct
200+
wantErr bool
201+
}{
202+
{
203+
name: "basic substitution",
204+
input: TestStruct{
205+
SimpleField: "hello",
206+
EnvField: "$TEST_VAR",
207+
CommandField: "`echo hello`",
208+
MultiField: "$TEST_VAR and `echo command`",
209+
EmptyField: "",
210+
NestedStruct: Nested{
211+
NestedField: "$NESTED_VAR",
212+
NestedCommand: "`echo nested`",
213+
unexported: "should not change",
214+
},
215+
unexported: "should not change",
216+
},
217+
want: TestStruct{
218+
SimpleField: "hello",
219+
EnvField: "test_value",
220+
CommandField: "hello",
221+
MultiField: "test_value and command",
222+
EmptyField: "",
223+
NestedStruct: Nested{
224+
NestedField: "nested_value",
225+
NestedCommand: "nested",
226+
unexported: "should not change",
227+
},
228+
unexported: "should not change",
229+
},
230+
wantErr: false,
231+
},
232+
{
233+
name: "invalid command",
234+
input: TestStruct{
235+
CommandField: "`invalid_command_that_does_not_exist`",
236+
},
237+
wantErr: true,
238+
},
239+
}
240+
241+
for _, tt := range tests {
242+
t.Run(tt.name, func(t *testing.T) {
243+
got, err := SubstituteStringFields(tt.input)
244+
if (err != nil) != tt.wantErr {
245+
t.Errorf("SubstituteStringFields() error = %v, wantErr %v", err, tt.wantErr)
246+
return
247+
}
248+
if !tt.wantErr && !reflect.DeepEqual(got, tt.want) {
249+
t.Errorf("SubstituteStringFields() = %+v, want %+v", got, tt.want)
250+
}
251+
})
252+
}
253+
}
254+
255+
func TestSubstituteStringFields_NonStruct(t *testing.T) {
256+
_, err := SubstituteStringFields("not a struct")
257+
if err == nil {
258+
t.Error("SubstituteStringFields() should return error for non-struct input")
259+
}
260+
}
261+
262+
func TestSubstituteStringFields_NestedStructs(t *testing.T) {
263+
type DeepNested struct {
264+
Field string
265+
}
266+
267+
type Nested struct {
268+
Field string
269+
DeepNested DeepNested
270+
}
271+
272+
type Root struct {
273+
Field string
274+
Nested Nested
275+
}
276+
277+
input := Root{
278+
Field: "$TEST_VAR",
279+
Nested: Nested{
280+
Field: "`echo nested`",
281+
DeepNested: DeepNested{
282+
Field: "$NESTED_VAR",
283+
},
284+
},
285+
}
286+
287+
// Set up environment
288+
os.Setenv("TEST_VAR", "test_value")
289+
os.Setenv("NESTED_VAR", "deep_nested_value")
290+
defer os.Unsetenv("TEST_VAR")
291+
defer os.Unsetenv("NESTED_VAR")
292+
293+
want := Root{
294+
Field: "test_value",
295+
Nested: Nested{
296+
Field: "nested",
297+
DeepNested: DeepNested{
298+
Field: "deep_nested_value",
299+
},
300+
},
301+
}
302+
303+
got, err := SubstituteStringFields(input)
304+
if err != nil {
305+
t.Fatalf("SubstituteStringFields() error = %v", err)
306+
}
307+
308+
if !reflect.DeepEqual(got, want) {
309+
t.Errorf("SubstituteStringFields() = %+v, want %+v", got, want)
310+
}
311+
}
312+
313+
func TestSubstituteStringFields_EmptyStruct(t *testing.T) {
314+
type Empty struct{}
315+
316+
input := Empty{}
317+
got, err := SubstituteStringFields(input)
318+
if err != nil {
319+
t.Fatalf("SubstituteStringFields() error = %v", err)
320+
}
321+
322+
if !reflect.DeepEqual(got, input) {
323+
t.Errorf("SubstituteStringFields() = %+v, want %+v", got, input)
324+
}
325+
}

0 commit comments

Comments
 (0)