diff --git a/cmd/common_test.go b/cmd/cmd_test.go similarity index 96% rename from cmd/common_test.go rename to cmd/cmd_test.go index c86d31490..28b99b52d 100644 --- a/cmd/common_test.go +++ b/cmd/cmd_test.go @@ -59,9 +59,8 @@ func (td *testDAG) AssertCurrentStatus(t *testing.T, expected scheduler.Status) func (th *testDAG) AssertLastStatus(t *testing.T, expected scheduler.Status) { t.Helper() - hs := th.DataStores.HistoryStore() require.Eventually(t, func() bool { - status := hs.ReadStatusRecent(th.Context, th.Path, 1) + status := th.HistoryStore.ReadStatusRecent(th.Context, th.Path, 1) if len(status) < 1 { return false } diff --git a/cmd/common.go b/cmd/common.go deleted file mode 100644 index e185135ec..000000000 --- a/cmd/common.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2024 Yota Hamada -// SPDX-License-Identifier: GPL-3.0-or-later - -package main - -import ( - "context" - "fmt" - "os" - "os/signal" - "syscall" - - "github.com/dagu-org/dagu/internal/client" - "github.com/dagu-org/dagu/internal/config" - "github.com/dagu-org/dagu/internal/persistence" - dsclient "github.com/dagu-org/dagu/internal/persistence/client" - "github.com/google/uuid" - "github.com/spf13/cobra" -) - -func wrapRunE(f func(cmd *cobra.Command, args []string) error) func(cmd *cobra.Command, args []string) error { - return func(cmd *cobra.Command, args []string) error { - if err := f(cmd, args); err != nil { - fmt.Printf("Error: %v\n", err) - os.Exit(1) - } - return nil - } -} - -func newClient(cfg *config.Config, ds persistence.DataStores) client.Client { - return client.New(ds, cfg.Paths.Executable, cfg.WorkDir) -} - -func newDataStores(cfg *config.Config) persistence.DataStores { - return dsclient.NewDataStores( - cfg.Paths.DAGsDir, - cfg.Paths.DataDir, - cfg.Paths.SuspendFlagsDir, - dsclient.DataStoreOptions{ - LatestStatusToday: cfg.LatestStatusToday, - }, - ) -} - -// generateRequestID generates a new request ID. -// For simplicity, we use UUIDs as request IDs. -func generateRequestID() (string, error) { - id, err := uuid.NewRandom() - if err != nil { - return "", err - } - return id.String(), nil -} - -type signalListener interface { - Signal(context.Context, os.Signal) -} - -var signalChan = make(chan os.Signal, 100) - -// listenSignals subscribes to the OS signals and passes them to the listener. -// It listens for the context cancellation as well. -func listenSignals(ctx context.Context, listener signalListener) { - go func() { - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - - select { - case <-ctx.Done(): - listener.Signal(ctx, os.Interrupt) - case sig := <-signalChan: - listener.Signal(ctx, sig) - } - }() -} diff --git a/cmd/dry.go b/cmd/dry.go index 0da356807..c59688421 100644 --- a/cmd/dry.go +++ b/cmd/dry.go @@ -10,7 +10,6 @@ import ( "github.com/dagu-org/dagu/internal/agent" "github.com/dagu-org/dagu/internal/config" "github.com/dagu-org/dagu/internal/digraph" - "github.com/dagu-org/dagu/internal/logger" "github.com/spf13/cobra" ) @@ -33,6 +32,7 @@ func runDry(cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("failed to load config: %w", err) } + setup := newSetup(cfg) cmd.Flags().StringP("params", "p", "", "parameters") params, err := cmd.Flags().GetString("params") @@ -41,6 +41,7 @@ func runDry(cmd *cobra.Command, args []string) error { } ctx := cmd.Context() + dag, err := digraph.Load(ctx, cfg.Paths.BaseConfig, args[0], removeQuotes(params)) if err != nil { return fmt.Errorf("failed to load DAG from %s: %w", args[0], err) @@ -51,23 +52,23 @@ func runDry(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to generate request ID: %w", err) } - logSettings := logFileSettings{ - Prefix: dryPrefix, - LogDir: cfg.Paths.LogDir, - DAGLogDir: dag.LogDir, - DAGName: dag.Name, - RequestID: requestID, + logFile, err := setup.openLogFile(dryPrefix, dag, requestID) + if err != nil { + return fmt.Errorf("failed to initialize log file for DAG %s: %w", dag.Name, err) } + defer logFile.Close() + + ctx = setup.loggerContextWithFile(ctx, false, logFile) - logFile, err := openLogFile(logSettings) + dagStore, err := setup.dagStore() if err != nil { - return fmt.Errorf("failed to create log file for DAG %s: %w", dag.Name, err) + return fmt.Errorf("failed to initialize DAG store: %w", err) } - defer logFile.Close() - ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, false)) - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + cli, err := setup.client() + if err != nil { + return fmt.Errorf("failed to initialize client: %w", err) + } agt := agent.New( requestID, @@ -75,7 +76,8 @@ func runDry(cmd *cobra.Command, args []string) error { filepath.Dir(logFile.Name()), logFile.Name(), cli, - dataStore, + dagStore, + setup.historyStore(), &agent.Options{Dry: true}, ) diff --git a/cmd/logger.go b/cmd/logger.go deleted file mode 100644 index 694e944f1..000000000 --- a/cmd/logger.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright (C) 2024 Yota Hamada -// SPDX-License-Identifier: GPL-3.0-or-later - -package main - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/dagu-org/dagu/internal/cmdutil" - "github.com/dagu-org/dagu/internal/config" - "github.com/dagu-org/dagu/internal/fileutil" - "github.com/dagu-org/dagu/internal/logger" - "github.com/dagu-org/dagu/internal/stringutil" -) - -func buildLogger(cfg *config.Config, quiet bool) logger.Logger { - var opts []logger.Option - if cfg.Debug { - opts = append(opts, logger.WithDebug()) - } - if quiet { - opts = append(opts, logger.WithQuiet()) - } - if cfg.LogFormat != "" { - opts = append(opts, logger.WithFormat(cfg.LogFormat)) - } - return logger.NewLogger(opts...) -} - -func buildLoggerWithFile(f *os.File, quiet bool) logger.Logger { - var opts []logger.Option - if quiet { - opts = append(opts, logger.WithQuiet()) - } - if f != nil { - opts = append(opts, logger.WithWriter(f)) - } - return logger.NewLogger(opts...) -} - -// logFileSettings contains the settings for the log file. -type logFileSettings struct { - Prefix string - LogDir string - DAGLogDir string - DAGName string - RequestID string -} - -// openLogFile creates and opens a log file based on the provided settings. -// It creates the necessary directory structure and returns the file handle. -func openLogFile(config logFileSettings) (*os.File, error) { - logDir, err := cmdutil.SubstituteCommands(os.ExpandEnv(config.LogDir)) - if err != nil { - return nil, fmt.Errorf("failed to expand log directory: %w", err) - } - config.LogDir = logDir - - if err := validateSettings(config); err != nil { - return nil, fmt.Errorf("invalid log settings: %w", err) - } - - outputDir, err := setupLogDirectory(config) - if err != nil { - return nil, fmt.Errorf("failed to setup log directory: %w", err) - } - - filename := buildLogFilename(config) - return createLogFile(filepath.Join(outputDir, filename)) -} - -// validateSettings ensures all required fields are properly set -func validateSettings(config logFileSettings) error { - if config.DAGName == "" { - return fmt.Errorf("DAGName cannot be empty") - } - if config.LogDir == "" && config.DAGLogDir == "" { - return fmt.Errorf("either LogDir or DAGLogDir must be specified") - } - return nil -} - -// setupLogDirectory creates and returns the appropriate log directory -func setupLogDirectory(config logFileSettings) (string, error) { - safeName := fileutil.SafeName(config.DAGName) - - // Determine the base directory - baseDir := config.LogDir - if config.DAGLogDir != "" { - baseDir = config.DAGLogDir - } - - logDir := filepath.Join(baseDir, safeName) - if err := os.MkdirAll(logDir, 0755); err != nil { - return "", fmt.Errorf("failed to create directory %s: %w", logDir, err) - } - - return logDir, nil -} - -// buildLogFilename generates the log filename using the configured format -func buildLogFilename(config logFileSettings) string { - timestamp := time.Now().Format("20060102.15:04:05.000") - truncatedRequestID := stringutil.TruncString(config.RequestID, 8) - safeDagName := fileutil.SafeName(config.DAGName) - - return fmt.Sprintf("%s%s.%s.%s.log", - config.Prefix, - safeDagName, - timestamp, - truncatedRequestID, - ) -} - -// createLogFile opens or creates a log file with appropriate permissions -func createLogFile(filepath string) (*os.File, error) { - flags := os.O_CREATE | os.O_WRONLY | os.O_APPEND | os.O_SYNC - permissions := os.FileMode(0644) - - file, err := os.OpenFile(filepath, flags, permissions) - if err != nil { - return nil, fmt.Errorf("failed to create/open log file %s: %w", filepath, err) - } - - return file, nil -} diff --git a/cmd/restart.go b/cmd/restart.go index d2b86ead0..d04d79a8d 100644 --- a/cmd/restart.go +++ b/cmd/restart.go @@ -41,14 +41,14 @@ func runRestart(cmd *cobra.Command, args []string) error { if err != nil { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) quiet, err := cmd.Flags().GetBool("quiet") if err != nil { return fmt.Errorf("failed to get quiet flag: %w", err) } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, quiet)) + ctx := setup.loggerContext(cmd.Context(), quiet) specFilePath := args[0] @@ -59,11 +59,8 @@ func runRestart(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load DAG from %s: %w", specFilePath, err) } - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) - // Handle the restart process - if err := handleRestartProcess(ctx, cli, cfg, dag, quiet, specFilePath); err != nil { + if err := handleRestartProcess(ctx, setup, dag, quiet, specFilePath); err != nil { logger.Error(ctx, "Failed to restart process", "path", specFilePath, "err", err) return fmt.Errorf("restart process failed for DAG %s: %w", dag.Name, err) } @@ -71,8 +68,11 @@ func runRestart(cmd *cobra.Command, args []string) error { return nil } -func handleRestartProcess(ctx context.Context, cli client.Client, cfg *config.Config, - dag *digraph.DAG, quiet bool, specFilePath string) error { +func handleRestartProcess(ctx context.Context, setup *setup, dag *digraph.DAG, quiet bool, specFilePath string) error { + cli, err := setup.client() + if err != nil { + return fmt.Errorf("failed to initialize client: %w", err) + } // Stop if running if err := stopDAGIfRunning(ctx, cli, dag); err != nil { @@ -89,15 +89,15 @@ func handleRestartProcess(ctx context.Context, cli client.Client, cfg *config.Co } // Reload DAG with parameters - dag, err = digraph.Load(ctx, cfg.Paths.BaseConfig, specFilePath, params) + dag, err = digraph.Load(ctx, setup.cfg.Paths.BaseConfig, specFilePath, params) if err != nil { return fmt.Errorf("failed to reload DAG with params: %w", err) } - return executeDAG(ctx, cli, cfg, dag, quiet) + return executeDAG(ctx, cli, setup, dag, quiet) } -func executeDAG(ctx context.Context, cli client.Client, cfg *config.Config, +func executeDAG(ctx context.Context, cli client.Client, setup *setup, dag *digraph.DAG, quiet bool) error { requestID, err := generateRequestID() @@ -105,31 +105,30 @@ func executeDAG(ctx context.Context, cli client.Client, cfg *config.Config, return fmt.Errorf("failed to generate request ID: %w", err) } - logFile, err := openLogFile(logFileSettings{ - Prefix: restartPrefix, - LogDir: cfg.Paths.LogDir, - DAGLogDir: dag.LogDir, - DAGName: dag.Name, - RequestID: requestID, - }) + logFile, err := setup.openLogFile(restartPrefix, dag, requestID) if err != nil { - return fmt.Errorf("failed to create log file: %w", err) + return fmt.Errorf("failed to initialize log file: %w", err) } defer logFile.Close() - logger.Info(ctx, "DAG restart initiated", - "DAG", dag.Name, - "requestID", requestID, - "logFile", logFile.Name()) + ctx = setup.loggerContextWithFile(ctx, quiet, logFile) + + logger.Info(ctx, "DAG restart initiated", "DAG", dag.Name, "requestID", requestID, "logFile", logFile.Name()) + + dagStore, err := setup.dagStore() + if err != nil { + logger.Error(ctx, "Failed to initialize DAG store", "err", err) + return fmt.Errorf("failed to initialize DAG store: %w", err) + } - ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, quiet)) agt := agent.New( requestID, dag, filepath.Dir(logFile.Name()), logFile.Name(), cli, - newDataStores(cfg), + dagStore, + setup.historyStore(), &agent.Options{Dry: false}) listenSignals(ctx, agt) diff --git a/cmd/restart_test.go b/cmd/restart_test.go index cd6420027..04ef48992 100644 --- a/cmd/restart_test.go +++ b/cmd/restart_test.go @@ -58,8 +58,10 @@ func TestRestartCommand(t *testing.T) { dag, err := digraph.Load(th.Context, th.Config.Paths.BaseConfig, dagFile.Path, "") require.NoError(t, err) - dataStore := newDataStores(th.Config) - client := newClient(th.Config, dataStore) + setup := newSetup(th.Config) + client, err := setup.client() + require.NoError(t, err) + recentHistory := client.GetRecentHistory(context.Background(), dag, 2) require.Len(t, recentHistory, 2) diff --git a/cmd/retry.go b/cmd/retry.go index 3ccef57d6..71f96a843 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -13,7 +13,6 @@ import ( "github.com/dagu-org/dagu/internal/config" "github.com/dagu-org/dagu/internal/digraph" "github.com/dagu-org/dagu/internal/logger" - "github.com/dagu-org/dagu/internal/persistence" "github.com/dagu-org/dagu/internal/persistence/model" "github.com/spf13/cobra" ) @@ -43,6 +42,8 @@ func runRetry(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) + // Get quiet flag quiet, err := cmd.Flags().GetBool("quiet") if err != nil { @@ -54,98 +55,75 @@ func runRetry(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to get request ID: %w", err) } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, quiet)) + ctx := setup.loggerContext(cmd.Context(), quiet) specFilePath := args[0] - // Setup execution context - executionCtx, err := prepareExecutionContext(ctx, cfg, specFilePath, requestID) - if err != nil { - logger.Error(ctx, "Failed to prepare execution context", "path", specFilePath, "err", err) - return fmt.Errorf("failed to prepare execution context: %w", err) - } - - // Execute DAG retry - if err := executeRetry(ctx, executionCtx, cfg, quiet); err != nil { - logger.Error(ctx, "Failed to execute retry", "path", specFilePath, "err", err) - return fmt.Errorf("failed to execute retry: %w", err) - } - - return nil -} - -type executionContext struct { - dag *digraph.DAG - dataStore persistence.DataStores - originalState *model.StatusFile - absolutePath string -} - -func prepareExecutionContext(ctx context.Context, cfg *config.Config, specFilePath, requestID string) (*executionContext, error) { absolutePath, err := filepath.Abs(specFilePath) if err != nil { - return nil, fmt.Errorf("failed to resolve absolute path for %s: %w", specFilePath, err) + logger.Error(ctx, "Failed to resolve absolute path", "path", specFilePath, "err", err) + return fmt.Errorf("failed to resolve absolute path for %s: %w", specFilePath, err) } - dataStore := newDataStores(cfg) - historyStore := dataStore.HistoryStore() - - status, err := historyStore.FindByRequestID(ctx, absolutePath, requestID) + status, err := setup.historyStore().FindByRequestID(ctx, absolutePath, requestID) if err != nil { - return nil, fmt.Errorf("failed to retrieve historical execution for request ID %s: %w", requestID, err) + logger.Error(ctx, "Failed to retrieve historical execution", "requestID", requestID, "err", err) + return fmt.Errorf("failed to retrieve historical execution for request ID %s: %w", requestID, err) } dag, err := digraph.Load(ctx, cfg.Paths.BaseConfig, absolutePath, status.Status.Params) if err != nil { - return nil, fmt.Errorf("failed to load DAG specification from %s with params %s: %w", + logger.Error(ctx, "Failed to load DAG specification", "path", specFilePath, "err", err) + return fmt.Errorf("failed to load DAG specification from %s with params %s: %w", specFilePath, status.Status.Params, err) } - return &executionContext{ - dag: dag, - dataStore: dataStore, - originalState: status, - absolutePath: absolutePath, - }, nil + // Execute DAG retry + if err := executeRetry(ctx, dag, setup, status, quiet); err != nil { + logger.Error(ctx, "Failed to execute retry", "path", specFilePath, "err", err) + return fmt.Errorf("failed to execute retry: %w", err) + } + + return nil } -func executeRetry(ctx context.Context, execCtx *executionContext, cfg *config.Config, quiet bool) error { +func executeRetry(ctx context.Context, dag *digraph.DAG, setup *setup, originalStatus *model.StatusFile, quiet bool) error { newRequestID, err := generateRequestID() if err != nil { return fmt.Errorf("failed to generate new request ID: %w", err) } - logFile, err := openLogFile(logFileSettings{ - Prefix: retryPrefix, - LogDir: cfg.Paths.LogDir, - DAGLogDir: execCtx.dag.LogDir, - DAGName: execCtx.dag.Name, - RequestID: newRequestID, - }) + logFile, err := setup.openLogFile(retryPrefix, dag, newRequestID) if err != nil { - return fmt.Errorf("failed to create log file for DAG %s: %w", execCtx.dag.Name, err) + return fmt.Errorf("failed to initialize log file for DAG %s: %w", dag.Name, err) } defer logFile.Close() - cli := newClient(cfg, execCtx.dataStore) + logger.Info(ctx, "DAG retry initiated", "DAG", dag.Name, "originalRequestID", originalStatus.Status.RequestID, "newRequestID", newRequestID, "logFile", logFile.Name()) + + ctx = setup.loggerContextWithFile(ctx, quiet, logFile) - logger.Info(ctx, "DAG retry initiated", - "DAG", execCtx.dag.Name, - "originalRequestID", execCtx.originalState.Status.RequestID, - "newRequestID", newRequestID, - "logFile", logFile.Name()) + dagStore, err := setup.dagStore() + if err != nil { + logger.Error(ctx, "Failed to initialize DAG store", "err", err) + return fmt.Errorf("failed to initialize DAG store: %w", err) + } - ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, quiet)) + cli, err := setup.client() + if err != nil { + logger.Error(ctx, "Failed to initialize client", "err", err) + return fmt.Errorf("failed to initialize client: %w", err) + } agt := agent.New( newRequestID, - execCtx.dag, + dag, filepath.Dir(logFile.Name()), logFile.Name(), cli, - execCtx.dataStore, - &agent.Options{RetryTarget: &execCtx.originalState.Status}, + dagStore, + setup.historyStore(), + &agent.Options{RetryTarget: &originalStatus.Status}, ) listenSignals(ctx, agt) @@ -155,7 +133,7 @@ func executeRetry(ctx context.Context, execCtx *executionContext, cfg *config.Co os.Exit(1) } else { agt.PrintSummary(ctx) - return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", execCtx.dag.Name, newRequestID, err) + return fmt.Errorf("failed to execute DAG %s (requestID: %s): %w", dag.Name, newRequestID, err) } } diff --git a/cmd/scheduler.go b/cmd/scheduler.go index 278541bbb..7c0c4804b 100644 --- a/cmd/scheduler.go +++ b/cmd/scheduler.go @@ -8,7 +8,6 @@ import ( "github.com/dagu-org/dagu/internal/config" "github.com/dagu-org/dagu/internal/logger" - "github.com/dagu-org/dagu/internal/scheduler" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -37,24 +36,23 @@ func runScheduler(cmd *cobra.Command, _ []string) error { if err != nil { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, false)) + ctx := setup.loggerContext(cmd.Context(), false) // Update DAGs directory if specified if dagsDir, _ := cmd.Flags().GetString("dags"); dagsDir != "" { cfg.Paths.DAGsDir = dagsDir } - logger.Info(ctx, "Scheduler initialization", - "specsDirectory", cfg.Paths.DAGsDir, - "logFormat", cfg.LogFormat) + logger.Info(ctx, "Scheduler initialization", "specsDirectory", cfg.Paths.DAGsDir, "logFormat", cfg.LogFormat) - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + scheduler, err := setup.scheduler() + if err != nil { + return fmt.Errorf("failed to initialize scheduler: %w", err) + } - sc := scheduler.New(cfg, cli) - if err := sc.Start(ctx); err != nil { + if err := scheduler.Start(ctx); err != nil { return fmt.Errorf("failed to start scheduler in directory %s: %w", cfg.Paths.DAGsDir, err) } diff --git a/cmd/server.go b/cmd/server.go index e036a0a3b..f4979db3a 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/dagu-org/dagu/internal/config" - "github.com/dagu-org/dagu/internal/frontend" "github.com/dagu-org/dagu/internal/logger" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -46,18 +45,17 @@ func runServer(cmd *cobra.Command, _ []string) error { if err != nil { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, false)) + ctx := setup.loggerContext(cmd.Context(), false) - logger.Info(ctx, "Server initialization", - "host", cfg.Host, - "port", cfg.Port) + logger.Info(ctx, "Server initialization", "host", cfg.Host, "port", cfg.Port) - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + server, err := setup.server(ctx) + if err != nil { + return fmt.Errorf("failed to initialize server: %w", err) + } - server := frontend.New(cfg, cli) if err := server.Serve(cmd.Context()); err != nil { return fmt.Errorf("failed to start server: %w", err) } diff --git a/cmd/setup.go b/cmd/setup.go new file mode 100644 index 000000000..f6b3cb3d5 --- /dev/null +++ b/cmd/setup.go @@ -0,0 +1,307 @@ +// Copyright (C) 2024 Yota Hamada +// SPDX-License-Identifier: GPL-3.0-or-later + +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/dagu-org/dagu/internal/client" + "github.com/dagu-org/dagu/internal/cmdutil" + "github.com/dagu-org/dagu/internal/config" + "github.com/dagu-org/dagu/internal/digraph" + "github.com/dagu-org/dagu/internal/fileutil" + "github.com/dagu-org/dagu/internal/frontend" + "github.com/dagu-org/dagu/internal/frontend/server" + "github.com/dagu-org/dagu/internal/logger" + "github.com/dagu-org/dagu/internal/persistence" + "github.com/dagu-org/dagu/internal/persistence/filecache" + "github.com/dagu-org/dagu/internal/persistence/jsondb" + "github.com/dagu-org/dagu/internal/persistence/local" + "github.com/dagu-org/dagu/internal/persistence/local/storage" + "github.com/dagu-org/dagu/internal/persistence/model" + "github.com/dagu-org/dagu/internal/scheduler" + "github.com/dagu-org/dagu/internal/stringutil" + "github.com/google/uuid" + "github.com/spf13/cobra" +) + +func wrapRunE(f func(cmd *cobra.Command, args []string) error) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + if err := f(cmd, args); err != nil { + fmt.Printf("Error: %v\n", err) + os.Exit(1) + } + return nil + } +} + +type setup struct { + cfg *config.Config +} + +func newSetup(cfg *config.Config) *setup { + return &setup{cfg: cfg} +} + +func (s *setup) loggerContext(ctx context.Context, quiet bool) context.Context { + var opts []logger.Option + if s.cfg.Debug { + opts = append(opts, logger.WithDebug()) + } + if quiet { + opts = append(opts, logger.WithQuiet()) + } + if s.cfg.LogFormat != "" { + opts = append(opts, logger.WithFormat(s.cfg.LogFormat)) + } + return logger.WithLogger(ctx, logger.NewLogger(opts...)) +} + +func (s *setup) loggerContextWithFile(ctx context.Context, quiet bool, f *os.File) context.Context { + var opts []logger.Option + if quiet { + opts = append(opts, logger.WithQuiet()) + } + if f != nil { + opts = append(opts, logger.WithWriter(f)) + } + return logger.WithLogger(ctx, logger.NewLogger(opts...)) +} + +type clientOption func(*clientOptions) + +type clientOptions struct { + dagStore persistence.DAGStore + historyStore persistence.HistoryStore +} + +func withDAGStore(dagStore persistence.DAGStore) clientOption { + return func(o *clientOptions) { + o.dagStore = dagStore + } +} + +func withHistoryStore(historyStore persistence.HistoryStore) clientOption { + return func(o *clientOptions) { + o.historyStore = historyStore + } +} + +func (s *setup) client(opts ...clientOption) (client.Client, error) { + options := &clientOptions{} + for _, opt := range opts { + opt(options) + } + dagStore := options.dagStore + if dagStore == nil { + var err error + dagStore, err = s.dagStore() + if err != nil { + return nil, fmt.Errorf("failed to initialize DAG store: %w", err) + } + } + historyStore := options.historyStore + if historyStore == nil { + historyStore = s.historyStore() + } + flagStore := local.NewFlagStore(storage.NewStorage( + s.cfg.Paths.SuspendFlagsDir, + )) + + return client.New( + dagStore, + historyStore, + flagStore, + s.cfg.Paths.Executable, + s.cfg.WorkDir, + ), nil +} + +func (s *setup) server(ctx context.Context) (*server.Server, error) { + dagCache := filecache.New[*digraph.DAG](0, time.Hour*12) + dagCache.StartEviction(ctx) + dagStore := s.dagStoreWithCache(dagCache) + + historyCache := filecache.New[*model.Status](0, time.Hour*12) + historyCache.StartEviction(ctx) + historyStore := s.historyStoreWithCache(historyCache) + + cli, err := s.client(withDAGStore(dagStore), withHistoryStore(historyStore)) + if err != nil { + return nil, fmt.Errorf("failed to initialize client: %w", err) + } + return frontend.New(s.cfg, cli), nil +} + +func (s *setup) scheduler() (*scheduler.Scheduler, error) { + cli, err := s.client() + if err != nil { + return nil, fmt.Errorf("failed to initialize client: %w", err) + } + return scheduler.New(s.cfg, cli), nil +} + +func (s *setup) dagStore() (persistence.DAGStore, error) { + baseDir := s.cfg.Paths.DAGsDir + _, err := os.Stat(baseDir) + if os.IsNotExist(err) { + if err := os.MkdirAll(baseDir, 0755); err != nil { + return nil, fmt.Errorf("failed to initialize directory %s: %w", baseDir, err) + } + } + + return local.NewDAGStore(s.cfg.Paths.DAGsDir), nil +} + +func (s *setup) dagStoreWithCache(cache *filecache.Cache[*digraph.DAG]) persistence.DAGStore { + return local.NewDAGStore(s.cfg.Paths.DAGsDir, local.WithFileCache(cache)) +} + +func (s *setup) historyStore() persistence.HistoryStore { + return jsondb.New(s.cfg.Paths.DataDir, jsondb.WithLatestStatusToday( + s.cfg.LatestStatusToday, + )) +} + +func (s *setup) historyStoreWithCache(cache *filecache.Cache[*model.Status]) persistence.HistoryStore { + return jsondb.New(s.cfg.Paths.DataDir, + jsondb.WithLatestStatusToday(s.cfg.LatestStatusToday), + jsondb.WithFileCache(cache), + ) +} + +func (s *setup) openLogFile( + prefix string, + dag *digraph.DAG, + requestID string, +) (*os.File, error) { + logDir, err := cmdutil.SubstituteCommands(os.ExpandEnv( + s.cfg.Paths.LogDir, + )) + if err != nil { + return nil, fmt.Errorf("failed to expand log directory: %w", err) + } + + config := logFileSettings{ + Prefix: prefix, + LogDir: logDir, + DAGLogDir: dag.LogDir, + DAGName: dag.Name, + RequestID: requestID, + } + + if err := validateSettings(config); err != nil { + return nil, fmt.Errorf("invalid log settings: %w", err) + } + + outputDir, err := setupLogDirectory(config) + if err != nil { + return nil, fmt.Errorf("failed to setup log directory: %w", err) + } + + filename := buildLogFilename(config) + return createLogFile(filepath.Join(outputDir, filename)) +} + +// generateRequestID generates a new request ID. +// For simplicity, we use UUIDs as request IDs. +func generateRequestID() (string, error) { + id, err := uuid.NewRandom() + if err != nil { + return "", err + } + return id.String(), nil +} + +type signalListener interface { + Signal(context.Context, os.Signal) +} + +var signalChan = make(chan os.Signal, 100) + +// listenSignals subscribes to the OS signals and passes them to the listener. +// It listens for the context cancellation as well. +func listenSignals(ctx context.Context, listener signalListener) { + go func() { + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + select { + case <-ctx.Done(): + listener.Signal(ctx, os.Interrupt) + case sig := <-signalChan: + listener.Signal(ctx, sig) + } + }() +} + +// logFileSettings contains the settings for the log file. +type logFileSettings struct { + Prefix string + LogDir string + DAGLogDir string + DAGName string + RequestID string +} + +// validateSettings ensures all required fields are properly set +func validateSettings(config logFileSettings) error { + if config.DAGName == "" { + return fmt.Errorf("DAGName cannot be empty") + } + if config.LogDir == "" && config.DAGLogDir == "" { + return fmt.Errorf("either LogDir or DAGLogDir must be specified") + } + return nil +} + +// setupLogDirectory creates and returns the appropriate log directory +func setupLogDirectory(config logFileSettings) (string, error) { + safeName := fileutil.SafeName(config.DAGName) + + // Determine the base directory + baseDir := config.LogDir + if config.DAGLogDir != "" { + baseDir = config.DAGLogDir + } + + logDir := filepath.Join(baseDir, safeName) + if err := os.MkdirAll(logDir, 0755); err != nil { + return "", fmt.Errorf("failed to initialize directory %s: %w", logDir, err) + } + + return logDir, nil +} + +// buildLogFilename generates the log filename using the configured format +func buildLogFilename(config logFileSettings) string { + timestamp := time.Now().Format("20060102.15:04:05.000") + truncatedRequestID := stringutil.TruncString(config.RequestID, 8) + safeDagName := fileutil.SafeName(config.DAGName) + + return fmt.Sprintf("%s%s.%s.%s.log", + config.Prefix, + safeDagName, + timestamp, + truncatedRequestID, + ) +} + +// createLogFile opens or creates a log file with appropriate permissions +func createLogFile(filepath string) (*os.File, error) { + flags := os.O_CREATE | os.O_WRONLY | os.O_APPEND | os.O_SYNC + permissions := os.FileMode(0644) + + file, err := os.OpenFile(filepath, flags, permissions) + if err != nil { + return nil, fmt.Errorf("failed to create/open log file %s: %w", filepath, err) + } + + return file, nil +} diff --git a/cmd/logger_test.go b/cmd/setup_test.go similarity index 86% rename from cmd/logger_test.go rename to cmd/setup_test.go index eccb67b6c..9152a2709 100644 --- a/cmd/logger_test.go +++ b/cmd/setup_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/dagu-org/dagu/internal/config" + "github.com/dagu-org/dagu/internal/digraph" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -17,14 +19,16 @@ func TestOpenLogFile(t *testing.T) { t.Run("successful log file creation", func(t *testing.T) { tempDir := t.TempDir() // Using t.TempDir() for automatic cleanup - config := logFileSettings{ - Prefix: "test_", - LogDir: tempDir, - DAGName: "test_dag", - RequestID: "12345678", - } + setup := newSetup(&config.Config{ + Paths: config.PathsConfig{ + LogDir: tempDir, + }, + }) - file, err := openLogFile(config) + file, err := setup.openLogFile("test_", &digraph.DAG{ + Name: "test_dag", + LogDir: "", + }, "12345678") require.NoError(t, err) defer file.Close() @@ -34,29 +38,6 @@ func TestOpenLogFile(t *testing.T) { assert.Contains(t, file.Name(), "test_") assert.Contains(t, file.Name(), "12345678") }) - - t.Run("invalid settings", func(t *testing.T) { - invalidConfigs := []struct { - name string - config logFileSettings - }{ - { - name: "empty DAGName", - config: logFileSettings{LogDir: "dir"}, - }, - { - name: "no directories specified", - config: logFileSettings{DAGName: "dag"}, - }, - } - - for _, tc := range invalidConfigs { - t.Run(tc.name, func(t *testing.T) { - _, err := openLogFile(tc.config) - assert.Error(t, err) - }) - } - }) } func TestSetupLogDirectory(t *testing.T) { diff --git a/cmd/start.go b/cmd/start.go index a0a239d73..a14797562 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -44,6 +44,8 @@ func runStart(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) + // Get quiet flag quiet, err := cmd.Flags().GetBool("quiet") if err != nil { @@ -56,8 +58,7 @@ func runStart(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to get request ID: %w", err) } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, quiet)) + ctx := setup.loggerContext(cmd.Context(), quiet) // Get parameters params, err := cmd.Flags().GetString("params") @@ -67,12 +68,12 @@ func runStart(cmd *cobra.Command, args []string) error { } // Initialize and run DAG - return executeDag(ctx, cfg, args[0], removeQuotes(params), quiet, requestID) + return executeDag(ctx, setup, args[0], removeQuotes(params), quiet, requestID) } -func executeDag(ctx context.Context, cfg *config.Config, specPath, params string, quiet bool, requestID string) error { +func executeDag(ctx context.Context, setup *setup, specPath, params string, quiet bool, requestID string) error { // Load DAG - dag, err := digraph.Load(ctx, cfg.Paths.BaseConfig, specPath, params) + dag, err := digraph.Load(ctx, setup.cfg.Paths.BaseConfig, specPath, params) if err != nil { logger.Error(ctx, "Failed to load DAG", "path", specPath, "err", err) return fmt.Errorf("failed to load DAG from %s: %w", specPath, err) @@ -89,25 +90,28 @@ func executeDag(ctx context.Context, cfg *config.Config, specPath, params string } // Setup logging - logFile, err := openLogFile(logFileSettings{ - Prefix: startPrefix, - LogDir: cfg.Paths.LogDir, - DAGLogDir: dag.LogDir, - DAGName: dag.Name, - RequestID: requestID, - }) + logFile, err := setup.openLogFile(startPrefix, dag, requestID) if err != nil { - logger.Error(ctx, "Failed to create log file", "DAG", dag.Name, "err", err) - return fmt.Errorf("failed to create log file for DAG %s: %w", dag.Name, err) + logger.Error(ctx, "failed to initialize log file", "DAG", dag.Name, "err", err) + return fmt.Errorf("failed to initialize log file for DAG %s: %w", dag.Name, err) } defer logFile.Close() - // Initialize services - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + ctx = setup.loggerContextWithFile(ctx, quiet, logFile) logger.Info(ctx, "DAG execution initiated", "DAG", dag.Name, "requestID", requestID, "logFile", logFile.Name()) - ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, quiet)) + + dagStore, err := setup.dagStore() + if err != nil { + logger.Error(ctx, "Failed to initialize DAG store", "err", err) + return fmt.Errorf("failed to initialize DAG store: %w", err) + } + + cli, err := setup.client() + if err != nil { + logger.Error(ctx, "Failed to initialize client", "err", err) + return fmt.Errorf("failed to initialize client: %w", err) + } // Create and run agent agt := agent.New( @@ -116,7 +120,8 @@ func executeDag(ctx context.Context, cfg *config.Config, specPath, params string filepath.Dir(logFile.Name()), logFile.Name(), cli, - dataStore, + dagStore, + setup.historyStore(), &agent.Options{}, ) diff --git a/cmd/start_all.go b/cmd/start_all.go index 1f2648f54..4d850712b 100644 --- a/cmd/start_all.go +++ b/cmd/start_all.go @@ -7,9 +7,7 @@ import ( "fmt" "github.com/dagu-org/dagu/internal/config" - "github.com/dagu-org/dagu/internal/frontend" "github.com/dagu-org/dagu/internal/logger" - "github.com/dagu-org/dagu/internal/scheduler" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -43,34 +41,40 @@ func runStartAll(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to load configuration: %w", err) } + setup := newSetup(cfg) + // Update DAGs directory if specified if dagsDir, _ := cmd.Flags().GetString("dags"); dagsDir != "" { cfg.Paths.DAGsDir = dagsDir } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, false)) + ctx := setup.loggerContext(cmd.Context(), false) - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + scheduler, err := setup.scheduler() + if err != nil { + return fmt.Errorf("failed to initialize scheduler: %w", err) + } // Start scheduler in a goroutine errChan := make(chan error, 1) go func() { logger.Info(ctx, "Scheduler initialization", "dags", cfg.Paths.DAGsDir) - sc := scheduler.New(cfg, cli) - if err := sc.Start(ctx); err != nil { + if err := scheduler.Start(ctx); err != nil { errChan <- fmt.Errorf("scheduler initialization failed: %w", err) return } errChan <- nil }() + server, err := setup.server(ctx) + if err != nil { + return fmt.Errorf("failed to initialize server: %w", err) + } + // Start server in main thread logger.Info(ctx, "Server initialization", "host", cfg.Host, "port", cfg.Port) - server := frontend.New(cfg, cli) serverErr := make(chan error, 1) go func() { if err := server.Serve(ctx); err != nil { diff --git a/cmd/status.go b/cmd/status.go index 746a552b7..88e6f9d4f 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -29,8 +29,9 @@ func runStatus(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load configuration: %w", err) } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, false)) + setup := newSetup(cfg) + + ctx := setup.loggerContext(cmd.Context(), false) // Load the DAG dag, err := digraph.Load(ctx, cfg.Paths.BaseConfig, args[0], "") @@ -39,9 +40,11 @@ func runStatus(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load DAG from %s: %w", args[0], err) } - // Initialize services and get status - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + cli, err := setup.client() + if err != nil { + logger.Error(ctx, "failed to initialize client", "err", err) + return fmt.Errorf("failed to initialize client: %w", err) + } status, err := cli.GetCurrentStatus(ctx, dag) if err != nil { diff --git a/cmd/status_test.go b/cmd/status_test.go index d9eaed8de..ac6e751b0 100644 --- a/cmd/status_test.go +++ b/cmd/status_test.go @@ -24,9 +24,8 @@ func TestStatusCommand(t *testing.T) { close(done) }() - hs := th.DataStores.HistoryStore() require.Eventually(t, func() bool { - status := hs.ReadStatusRecent(th.Context, dagFile.Path, 1) + status := th.HistoryStore.ReadStatusRecent(th.Context, dagFile.Path, 1) if len(status) < 1 { return false } diff --git a/cmd/stop.go b/cmd/stop.go index 10b5a8913..5e5f8997e 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -29,8 +29,9 @@ func runStop(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to load configuration: %w", err) } - ctx := cmd.Context() - ctx = logger.WithLogger(ctx, buildLogger(cfg, false)) + setup := newSetup(cfg) + + ctx := setup.loggerContext(cmd.Context(), false) dag, err := digraph.Load(cmd.Context(), cfg.Paths.BaseConfig, args[0], "") if err != nil { @@ -40,8 +41,11 @@ func runStop(cmd *cobra.Command, args []string) error { logger.Info(ctx, "DAG is stopping", "dag", dag.Name) - dataStore := newDataStores(cfg) - cli := newClient(cfg, dataStore) + cli, err := setup.client() + if err != nil { + logger.Error(ctx, "failed to initialize client", "err", err) + return fmt.Errorf("failed to initialize client: %w", err) + } if err := cli.Stop(cmd.Context(), dag); err != nil { logger.Error(ctx, "Failed to stop DAG", "dag", dag.Name, "err", err) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 61000696d..2062e36ba 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -38,7 +38,7 @@ type Agent struct { dag *digraph.DAG dry bool retryTarget *model.Status - dataStore persistence.DataStores + dagStore persistence.DAGStore client client.Client scheduler *scheduler.Scheduler graph *scheduler.ExecutionGraph @@ -75,18 +75,20 @@ func New( logDir string, logFile string, cli client.Client, - dataStore persistence.DataStores, + dagStore persistence.DAGStore, + historyStore persistence.HistoryStore, opts *Options, ) *Agent { return &Agent{ - requestID: requestID, - dag: dag, - dry: opts.Dry, - retryTarget: opts.RetryTarget, - logDir: logDir, - logFile: logFile, - client: cli, - dataStore: dataStore, + requestID: requestID, + dag: dag, + dry: opts.Dry, + retryTarget: opts.RetryTarget, + logDir: logDir, + logFile: logFile, + client: cli, + dagStore: dagStore, + historyStore: historyStore, } } @@ -193,7 +195,7 @@ func (a *Agent) Run(ctx context.Context) error { // Start the DAG execution. logger.Info(ctx, "DAG execution started", "reqId", a.requestID, "name", a.dag.Name, "params", a.dag.Params) - dagCtx := digraph.NewContext(ctx, a.dag, a.dataStore.DAGStore(), newOutputCollector(a.historyStore), a.requestID, a.logFile) + dagCtx := digraph.NewContext(ctx, a.dag, a.dagStore, newOutputCollector(a.historyStore), a.requestID, a.logFile) lastErr := a.scheduler.Schedule(dagCtx, a.graph, done) // Update the finished status to the history database. @@ -362,7 +364,7 @@ func (a *Agent) dryRun(ctx context.Context) error { logger.Info(ctx, "Dry-run started", "reqId", a.requestID) - dagCtx := digraph.NewContext(context.Background(), a.dag, a.dataStore.DAGStore(), newOutputCollector(a.historyStore), a.requestID, a.logFile) + dagCtx := digraph.NewContext(context.Background(), a.dag, a.dagStore, newOutputCollector(a.historyStore), a.requestID, a.logFile) lastErr := a.scheduler.Schedule(dagCtx, a.graph, done) a.lastErr = lastErr @@ -441,7 +443,6 @@ func (a *Agent) setupGraphForRetry(ctx context.Context) error { // setup database prepare database connection and remove old history data. func (a *Agent) setupDatabase(ctx context.Context) error { - a.historyStore = a.dataStore.HistoryStore() location, retentionDays := a.dag.Location, a.dag.HistRetentionDays if err := a.historyStore.RemoveOld(ctx, location, retentionDays); err != nil { logger.Error(ctx, "History data cleanup failed", "err", err) diff --git a/internal/client/client.go b/internal/client/client.go index 52966af71..400f74ac3 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -25,23 +25,29 @@ import ( // New creates a new Client instance. // The Client is used to interact with the DAG. func New( - dataStore persistence.DataStores, + dagStore persistence.DAGStore, + historyStore persistence.HistoryStore, + flagStore persistence.FlagStore, executable string, workDir string, ) Client { return &client{ - dataStore: dataStore, - executable: executable, - workDir: workDir, + dagStore: dagStore, + historyStore: historyStore, + flagStore: flagStore, + executable: executable, + workDir: workDir, } } var _ Client = (*client)(nil) type client struct { - dataStore persistence.DataStores - executable string - workDir string + dagStore persistence.DAGStore + historyStore persistence.HistoryStore + flagStore persistence.FlagStore + executable string + workDir string } var ( @@ -58,13 +64,11 @@ var ( ) func (e *client) GetDAGSpec(ctx context.Context, id string) (string, error) { - dagStore := e.dataStore.DAGStore() - return dagStore.GetSpec(ctx, id) + return e.dagStore.GetSpec(ctx, id) } func (e *client) CreateDAG(ctx context.Context, name string) (string, error) { - dagStore := e.dataStore.DAGStore() - id, err := dagStore.Create(ctx, name, dagTemplate) + id, err := e.dagStore.Create(ctx, name, dagTemplate) if err != nil { return "", fmt.Errorf("%w: %s", errCreateDAGFile, err) } @@ -74,25 +78,22 @@ func (e *client) CreateDAG(ctx context.Context, name string) (string, error) { func (e *client) Grep(ctx context.Context, pattern string) ( []*persistence.GrepResult, []string, error, ) { - dagStore := e.dataStore.DAGStore() - return dagStore.Grep(ctx, pattern) + return e.dagStore.Grep(ctx, pattern) } func (e *client) Rename(ctx context.Context, oldID, newID string) error { - dagStore := e.dataStore.DAGStore() - oldDAG, err := dagStore.Find(ctx, oldID) + oldDAG, err := e.dagStore.FindByName(ctx, oldID) if err != nil { return err } - if err := dagStore.Rename(ctx, oldID, newID); err != nil { + if err := e.dagStore.Rename(ctx, oldID, newID); err != nil { return err } - newDAG, err := dagStore.Find(ctx, newID) + newDAG, err := e.dagStore.FindByName(ctx, newID) if err != nil { return err } - historyStore := e.dataStore.HistoryStore() - return historyStore.Rename(ctx, oldDAG.Location, newDAG.Location) + return e.historyStore.Rename(ctx, oldDAG.Location, newDAG.Location) } func (e *client) Stop(_ context.Context, dag *digraph.DAG) error { @@ -186,7 +187,7 @@ func (*client) GetCurrentStatus(_ context.Context, dag *digraph.DAG) (*model.Sta func (e *client) GetStatusByRequestID(ctx context.Context, dag *digraph.DAG, requestID string) ( *model.Status, error, ) { - ret, err := e.dataStore.HistoryStore().FindByRequestID(ctx, dag.Location, requestID) + ret, err := e.historyStore.FindByRequestID(ctx, dag.Location, requestID) if err != nil { return nil, err } @@ -212,7 +213,7 @@ func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.S if currStatus != nil { return *currStatus, nil } - status, err := e.dataStore.HistoryStore().ReadStatusToday(ctx, dag.Location) + status, err := e.historyStore.ReadStatusToday(ctx, dag.Location) if err != nil { status := model.NewStatusFactory(dag).CreateDefault() if errors.Is(err, persistence.ErrNoStatusDataToday) || @@ -227,7 +228,7 @@ func (e *client) GetLatestStatus(ctx context.Context, dag *digraph.DAG) (model.S } func (e *client) GetRecentHistory(ctx context.Context, dag *digraph.DAG, n int) []model.StatusFile { - return e.dataStore.HistoryStore().ReadStatusRecent(ctx, dag.Location, n) + return e.historyStore.ReadStatusRecent(ctx, dag.Location, n) } func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status model.Status) error { @@ -244,28 +245,25 @@ func (e *client) UpdateStatus(ctx context.Context, dag *digraph.DAG, status mode return errDAGIsRunning } } - return e.dataStore.HistoryStore().Update(ctx, dag.Location, status.RequestID, status) + return e.historyStore.Update(ctx, dag.Location, status.RequestID, status) } func (e *client) UpdateDAG(ctx context.Context, id string, spec string) error { - dagStore := e.dataStore.DAGStore() - return dagStore.UpdateSpec(ctx, id, []byte(spec)) + return e.dagStore.UpdateSpec(ctx, id, []byte(spec)) } func (e *client) DeleteDAG(ctx context.Context, name, loc string) error { - err := e.dataStore.HistoryStore().RemoveAll(ctx, loc) + err := e.historyStore.RemoveAll(ctx, loc) if err != nil { return err } - dagStore := e.dataStore.DAGStore() - return dagStore.Delete(ctx, name) + return e.dagStore.Delete(ctx, name) } func (e *client) GetAllStatus(ctx context.Context) ( statuses []DAGStatus, errs []string, err error, ) { - dagStore := e.dataStore.DAGStore() - dagList, errs, err := dagStore.List(ctx) + dagList, errs, err := e.dagStore.List(ctx) var ret []DAGStatus for _, d := range dagList { @@ -287,7 +285,6 @@ func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDag var ( dagListPaginationResult *persistence.DagListPaginationResult err error - dagStore = e.dataStore.DAGStore() dagStatusList = make([]DAGStatus, 0) ) @@ -300,11 +297,11 @@ func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDag limit = int(*params.Limit) } - if dagListPaginationResult, err = dagStore.ListPagination(ctx, persistence.DAGListPaginationArgs{ + if dagListPaginationResult, err = e.dagStore.ListPagination(ctx, persistence.DAGListPaginationArgs{ Page: page, Limit: limit, - Name: params.SearchName, - Tag: params.SearchTag, + Name: fromPtr(params.SearchName), + Tag: fromPtr(params.SearchTag), }); err != nil { return dagStatusList, &DagListPaginationSummaryResult{PageCount: 1}, err } @@ -327,8 +324,7 @@ func (e *client) GetAllStatusPagination(ctx context.Context, params dags.ListDag } func (e *client) getDAG(ctx context.Context, name string) (*digraph.DAG, error) { - dagStore := e.dataStore.DAGStore() - dagDetail, err := dagStore.GetDetails(ctx, name) + dagDetail, err := e.dagStore.GetDetails(ctx, name) return e.emptyDAGIfNil(dagDetail, name), err } @@ -349,8 +345,7 @@ func (e *client) GetStatus(ctx context.Context, id string) (DAGStatus, error) { } func (e *client) ToggleSuspend(_ context.Context, id string, suspend bool) error { - flagStore := e.dataStore.FlagStore() - return flagStore.ToggleSuspend(id, suspend) + return e.flagStore.ToggleSuspend(id, suspend) } func (e *client) readStatus(ctx context.Context, dag *digraph.DAG) (DAGStatus, error) { @@ -373,8 +368,7 @@ func (*client) emptyDAGIfNil(dag *digraph.DAG, dagLocation string) *digraph.DAG } func (e *client) IsSuspended(_ context.Context, id string) bool { - flagStore := e.dataStore.FlagStore() - return flagStore.IsSuspended(id) + return e.flagStore.IsSuspended(id) } func escapeArg(input string) string { @@ -394,5 +388,13 @@ func escapeArg(input string) string { } func (e *client) GetTagList(ctx context.Context) ([]string, []string, error) { - return e.dataStore.DAGStore().TagList(ctx) + return e.dagStore.TagList(ctx) +} + +func fromPtr[T any](p *T) T { + var zero T + if p == nil { + return zero + } + return *p } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index df037ee0a..cb095ad1f 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -78,16 +78,14 @@ func TestClient_GetStatus(t *testing.T) { cli := th.Client // Open the history store and write a status before updating it. - historyStore := th.DataStores.HistoryStore() - - err := historyStore.Open(ctx, dag.Location, now, requestID) + err := th.HistoryStore.Open(ctx, dag.Location, now, requestID) require.NoError(t, err) status := testNewStatus(dag.DAG, requestID, scheduler.StatusSuccess, scheduler.NodeStatusSuccess) - err = historyStore.Write(ctx, status) + err = th.HistoryStore.Write(ctx, status) require.NoError(t, err) - _ = historyStore.Close(ctx) + _ = th.HistoryStore.Close(ctx) // Get the status and check if it is the same as the one we wrote. statusToCheck, err := cli.GetStatusByRequestID(ctx, dag.DAG, requestID) diff --git a/internal/digraph/context.go b/internal/digraph/context.go index 6eb8cfbe2..dc63f3eb1 100644 --- a/internal/digraph/context.go +++ b/internal/digraph/context.go @@ -18,7 +18,7 @@ const ( // Finder finds a DAG by name. // This is used to find the DAG when a node references another DAG. type Finder interface { - Find(ctx context.Context, name string) (*DAG, error) + FindByName(ctx context.Context, name string) (*DAG, error) } // ResultCollector gets a result of a DAG execution. diff --git a/internal/digraph/executor/sub.go b/internal/digraph/executor/sub.go index 72f21c1d2..ea0c8fc25 100644 --- a/internal/digraph/executor/sub.go +++ b/internal/digraph/executor/sub.go @@ -53,7 +53,7 @@ func newSubWorkflow( return nil, fmt.Errorf("failed to substitute string fields: %w", err) } - subDAG, err := dagCtx.Finder.Find(ctx, config.Name) + subDAG, err := dagCtx.Finder.FindByName(ctx, config.Name) if err != nil { return nil, fmt.Errorf( "failed to find subworkflow %q: %w", config.Name, err, diff --git a/internal/persistence/client/store_factory.go b/internal/persistence/client/store_factory.go deleted file mode 100644 index d7208f7d2..000000000 --- a/internal/persistence/client/store_factory.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2024 Yota Hamada -// SPDX-License-Identifier: GPL-3.0-or-later - -package client - -import ( - "os" - - "github.com/dagu-org/dagu/internal/persistence" - "github.com/dagu-org/dagu/internal/persistence/jsondb" - "github.com/dagu-org/dagu/internal/persistence/local" - "github.com/dagu-org/dagu/internal/persistence/local/storage" -) - -var _ persistence.DataStores = (*dataStores)(nil) - -type dataStores struct { - historyStore persistence.HistoryStore - dagStore persistence.DAGStore - - dags string - dataDir string - suspendFlagsDir string - latestStatusToday bool -} - -type DataStoreOptions struct { - LatestStatusToday bool -} - -func NewDataStores( - dags string, - dataDir string, - suspendFlagsDir string, - opts DataStoreOptions, -) persistence.DataStores { - dataStoreImpl := &dataStores{ - dags: dags, - dataDir: dataDir, - suspendFlagsDir: suspendFlagsDir, - latestStatusToday: opts.LatestStatusToday, - } - _ = dataStoreImpl.InitDagDir() - return dataStoreImpl -} - -func (f *dataStores) InitDagDir() error { - _, err := os.Stat(f.dags) - if os.IsNotExist(err) { - if err := os.MkdirAll(f.dags, 0755); err != nil { - return err - } - } - - return nil -} - -func (f *dataStores) HistoryStore() persistence.HistoryStore { - // TODO: Add support for other data stores (e.g. sqlite, postgres, etc.) - if f.historyStore == nil { - cfg := jsondb.DefaultConfig() - cfg.LatestStatusToday = f.latestStatusToday - f.historyStore = jsondb.New(f.dataDir, cfg) - } - return f.historyStore -} - -func (f *dataStores) DAGStore() persistence.DAGStore { - if f.dagStore == nil { - f.dagStore = local.NewDAGStore(&local.NewDAGStoreArgs{Dir: f.dags}) - } - return f.dagStore -} - -func (f *dataStores) FlagStore() persistence.FlagStore { - return local.NewFlagStore(storage.NewStorage(f.suspendFlagsDir)) -} diff --git a/internal/persistence/filecache/filecache.go b/internal/persistence/filecache/filecache.go index 0ca36fc0f..dfc2ff94e 100644 --- a/internal/persistence/filecache/filecache.go +++ b/internal/persistence/filecache/filecache.go @@ -4,6 +4,7 @@ package filecache import ( + "context" "fmt" "os" "sync" @@ -42,11 +43,13 @@ func (c *Cache[T]) Stop() { close(c.stopCh) } -func (c *Cache[T]) StartEviction() { +func (c *Cache[T]) StartEviction(ctx context.Context) { go func() { timer := time.NewTimer(time.Minute) for { select { + case <-ctx.Done(): + return case <-timer.C: timer.Reset(time.Minute) c.evict() @@ -91,9 +94,9 @@ func (c *Cache[T]) Invalidate(fileName string) { } func (c *Cache[T]) LoadLatest( - fileName string, loader func() (T, error), + filePath string, loader func() (T, error), ) (T, error) { - stale, lastModified, err := c.IsStale(fileName, c.Entry(fileName)) + stale, lastModified, err := c.IsStale(filePath, c.Entry(filePath)) if err != nil { var zero T return zero, err @@ -104,10 +107,10 @@ func (c *Cache[T]) LoadLatest( var zero T return zero, err } - c.Store(fileName, data, lastModified) + c.Store(filePath, data, lastModified) return data, nil } - item, _ := c.entries.Load(fileName) + item, _ := c.entries.Load(filePath) entry := item.(Entry[T]) return entry.Data, nil } diff --git a/internal/persistence/grep/grep.go b/internal/persistence/grep/grep.go index c5752b54a..0b94dc5b4 100644 --- a/internal/persistence/grep/grep.go +++ b/internal/persistence/grep/grep.go @@ -43,12 +43,15 @@ type Match struct { StartLine int } +var DefaultOptions = Options{ + IsRegexp: true, + Before: 2, + After: 2, +} + // Grep reads data and returns lines that match the given pattern. // If opts is nil, default options will be used. -func Grep(dat []byte, pattern string, opts *Options) ([]*Match, error) { - if opts == nil { - opts = new(Options) - } +func Grep(dat []byte, pattern string, opts Options) ([]*Match, error) { if pattern == "" { return nil, ErrEmptyPattern } @@ -67,7 +70,7 @@ func Grep(dat []byte, pattern string, opts *Options) ([]*Match, error) { } // getMatcher returns a matcher based on the pattern and options. -func getMatcher(pattern string, opts *Options) (Matcher, error) { +func getMatcher(pattern string, opts Options) (Matcher, error) { if opts.Matcher != nil { return opts.Matcher, nil } @@ -97,7 +100,7 @@ func scanLines(dat []byte, matcher Matcher) ([]string, []int, error) { } // buildMatches constructs Match objects from matched line indices. -func buildMatches(lines []string, matches []int, opts *Options) []*Match { +func buildMatches(lines []string, matches []int, opts Options) []*Match { var ret []*Match for _, m := range matches { @@ -114,7 +117,7 @@ func buildMatches(lines []string, matches []int, opts *Options) []*Match { return ret } -func defaultMatcher(pattern string, opts *Options) (Matcher, error) { +func defaultMatcher(pattern string, opts Options) (Matcher, error) { if opts.IsRegexp { reg, err := regexp.Compile(pattern) if err != nil { diff --git a/internal/persistence/grep/grep_test.go b/internal/persistence/grep/grep_test.go index 00cdafbe3..d9217472b 100644 --- a/internal/persistence/grep/grep_test.go +++ b/internal/persistence/grep/grep_test.go @@ -19,7 +19,7 @@ func TestGrep(t *testing.T) { Name string File string Pattern string - Opts *Options + Opts Options Want []*Match IsErr bool }{ @@ -38,7 +38,7 @@ func TestGrep(t *testing.T) { Name: "regexp", File: filepath.Join(dir, "test.txt"), Pattern: "^b.", - Opts: &Options{ + Opts: Options{ IsRegexp: true, }, Want: []*Match{ @@ -52,7 +52,7 @@ func TestGrep(t *testing.T) { Name: "before", File: filepath.Join(dir, "test.txt"), Pattern: "b", - Opts: &Options{ + Opts: Options{ Before: 1, }, Want: []*Match{ @@ -66,7 +66,7 @@ func TestGrep(t *testing.T) { Name: "before+after", File: filepath.Join(dir, "test.txt"), Pattern: "cc", - Opts: &Options{ + Opts: Options{ Before: 2, After: 2, }, @@ -81,7 +81,7 @@ func TestGrep(t *testing.T) { Name: "before+after,firstline", File: filepath.Join(dir, "test.txt"), Pattern: "aa", - Opts: &Options{ + Opts: Options{ Before: 1, After: 1, }, @@ -96,7 +96,7 @@ func TestGrep(t *testing.T) { Name: "before+after,lastline", File: filepath.Join(dir, "test.txt"), Pattern: "ee", - Opts: &Options{ + Opts: Options{ Before: 1, After: 1, }, @@ -129,7 +129,7 @@ func TestGrep(t *testing.T) { Name: "invalid regexp", File: filepath.Join(dir, "test.txt"), Pattern: "(aa", - Opts: &Options{ + Opts: Options{ IsRegexp: true, }, IsErr: true, diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index 567b9384a..fc9d59aac 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -19,12 +19,6 @@ var ( ErrNoStatusData = fmt.Errorf("no status data") ) -type DataStores interface { - HistoryStore() HistoryStore - DAGStore() DAGStore - FlagStore() FlagStore -} - type HistoryStore interface { Open(ctx context.Context, key string, timestamp time.Time, requestID string) error Write(ctx context.Context, status model.Status) error @@ -49,15 +43,15 @@ type DAGStore interface { Rename(ctx context.Context, oldID, newID string) error GetSpec(ctx context.Context, name string) (string, error) UpdateSpec(ctx context.Context, name string, spec []byte) error - Find(ctx context.Context, name string) (*digraph.DAG, error) + FindByName(ctx context.Context, name string) (*digraph.DAG, error) TagList(ctx context.Context) ([]string, []string, error) } type DAGListPaginationArgs struct { Page int Limit int - Name *string - Tag *string + Name string + Tag string } type DagListPaginationResult struct { diff --git a/internal/persistence/jsondb/jsondb.go b/internal/persistence/jsondb/jsondb.go index 01444385c..fc811872d 100644 --- a/internal/persistence/jsondb/jsondb.go +++ b/internal/persistence/jsondb/jsondb.go @@ -40,8 +40,7 @@ var ( type Config struct { Location string LatestStatusToday bool - CacheSize int - CacheTTL time.Duration + FileCache *filecache.Cache[*model.Status] } const ( @@ -52,41 +51,48 @@ const ( dateFormat = "20060102" ) -// DefaultConfig returns default configuration -func DefaultConfig() Config { - return Config{ - CacheSize: 300, - CacheTTL: 3 * time.Hour, - LatestStatusToday: true, - } -} - var _ persistence.HistoryStore = (*JSONDB)(nil) // JSONDB manages DAGs status files in local storage. type JSONDB struct { - location string - config Config - writer *writer - cache *filecache.Cache[*model.Status] + baseDir string + latestStatusToday bool + fileCache *filecache.Cache[*model.Status] + writer *writer +} + +type Option func(*Options) + +type Options struct { + FileCache *filecache.Cache[*model.Status] + LatestStatusToday bool } -// New creates a new JSONDB with default configuration. -func New(location string, cfg Config) *JSONDB { - if cfg.CacheSize <= 0 { - cfg.CacheSize = DefaultConfig().CacheSize +func WithFileCache(cache *filecache.Cache[*model.Status]) Option { + return func(o *Options) { + o.FileCache = cache } - if cfg.CacheTTL <= 0 { - cfg.CacheTTL = DefaultConfig().CacheTTL +} + +func WithLatestStatusToday(latestStatusToday bool) Option { + return func(o *Options) { + o.LatestStatusToday = latestStatusToday } +} - db := &JSONDB{ - config: cfg, - location: location, - cache: filecache.New[*model.Status](cfg.CacheSize, cfg.CacheTTL), +// New creates a new JSONDB instance. +func New(baseDir string, opts ...Option) *JSONDB { + options := &Options{ + LatestStatusToday: true, + } + for _, opt := range opts { + opt(options) + } + return &JSONDB{ + baseDir: baseDir, + latestStatusToday: options.LatestStatusToday, + fileCache: options.FileCache, } - db.cache.StartEviction() - return db } func (db *JSONDB) Update(ctx context.Context, key, requestID string, status model.Status) error { @@ -99,11 +105,16 @@ func (db *JSONDB) Update(ctx context.Context, key, requestID string, status mode if err := writer.open(); err != nil { return err } - defer func() { - db.cache.Invalidate(statusFile.File) _ = writer.close() }() + + if db.fileCache != nil { + defer func() { + db.fileCache.Invalidate(statusFile.File) + }() + } + return writer.write(status) } @@ -140,7 +151,9 @@ func (db *JSONDB) Close(ctx context.Context) error { return err } - db.cache.Invalidate(db.writer.target) + if db.fileCache != nil { + db.fileCache.Invalidate(db.writer.target) + } return db.writer.close() } @@ -149,9 +162,7 @@ func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) files := db.getLatestMatches(db.globPattern(key), itemLimit) for _, file := range files { - status, err := db.cache.LoadLatest(file, func() (*model.Status, error) { - return ParseStatusFile(file) - }) + status, err := db.parseStatusFile(file) if err != nil { continue } @@ -165,14 +176,11 @@ func (db *JSONDB) ReadStatusRecent(_ context.Context, key string, itemLimit int) } func (db *JSONDB) ReadStatusToday(_ context.Context, key string) (*model.Status, error) { - file, err := db.latestToday(key, time.Now(), db.config.LatestStatusToday) + file, err := db.latestToday(key, time.Now(), db.latestStatusToday) if err != nil { return nil, err } - - return db.cache.LoadLatest(file, func() (*model.Status, error) { - return ParseStatusFile(file) - }) + return db.parseStatusFile(file) } func (db *JSONDB) FindByRequestID(_ context.Context, key string, requestID string) (*model.StatusFile, error) { @@ -308,6 +316,15 @@ func (db *JSONDB) Rename(_ context.Context, oldKey, newKey string) error { return nil } +func (db *JSONDB) parseStatusFile(file string) (*model.Status, error) { + if db.fileCache != nil { + return db.fileCache.LoadLatest(file, func() (*model.Status, error) { + return ParseStatusFile(file) + }) + } + return ParseStatusFile(file) +} + func (db *JSONDB) getDirectory(key string, prefix string) string { if key != prefix { // Add a hash postfix to the directory name to avoid conflicts. @@ -315,10 +332,10 @@ func (db *JSONDB) getDirectory(key string, prefix string) string { h := md5.New() _, _ = h.Write([]byte(key)) v := hex.EncodeToString(h.Sum(nil)) - return filepath.Join(db.location, fmt.Sprintf("%s-%s", prefix, v)) + return filepath.Join(db.baseDir, fmt.Sprintf("%s-%s", prefix, v)) } - return filepath.Join(db.location, key) + return filepath.Join(db.baseDir, key) } func (db *JSONDB) generateFilePath(key string, timestamp timeInUTC, requestID string) (string, error) { diff --git a/internal/persistence/jsondb/setup_test.go b/internal/persistence/jsondb/setup_test.go index 445b01458..886a1d03a 100644 --- a/internal/persistence/jsondb/setup_test.go +++ b/internal/persistence/jsondb/setup_test.go @@ -29,7 +29,7 @@ func testSetup(t *testing.T) testHelper { th := testHelper{ Context: context.Background(), - DB: New(tmpDir, DefaultConfig()), + DB: New(tmpDir), tmpDir: tmpDir, } diff --git a/internal/persistence/local/dag_store.go b/internal/persistence/local/dag_store.go index 7c1c9942a..cabb01d53 100644 --- a/internal/persistence/local/dag_store.go +++ b/internal/persistence/local/dag_store.go @@ -12,7 +12,6 @@ import ( "path" "path/filepath" "strings" - "time" "github.com/dagu-org/dagu/internal/digraph" "github.com/dagu-org/dagu/internal/fileutil" @@ -22,40 +21,51 @@ import ( "github.com/dagu-org/dagu/internal/persistence/grep" ) +var _ persistence.DAGStore = (*dagStoreImpl)(nil) +var _ digraph.Finder = (*dagStoreImpl)(nil) + +type DAGStoreOption func(*DAGStoreOptions) + +type DAGStoreOptions struct { + FileCache *filecache.Cache[*digraph.DAG] +} + +func WithFileCache(cache *filecache.Cache[*digraph.DAG]) DAGStoreOption { + return func(o *DAGStoreOptions) { + o.FileCache = cache + } +} + type dagStoreImpl struct { dir string - metaCache *filecache.Cache[*digraph.DAG] + fileCache *filecache.Cache[*digraph.DAG] } -type NewDAGStoreArgs struct { - Dir string -} +func NewDAGStore(dir string, opts ...DAGStoreOption) persistence.DAGStore { + options := &DAGStoreOptions{} + for _, opt := range opts { + opt(options) + } -func NewDAGStore(args *NewDAGStoreArgs) persistence.DAGStore { - dagStore := &dagStoreImpl{ - dir: args.Dir, - metaCache: filecache.New[*digraph.DAG](0, time.Hour*24), + return &dagStoreImpl{ + dir: dir, + fileCache: options.FileCache, } - dagStore.metaCache.StartEviction() - return dagStore } func (d *dagStoreImpl) GetMetadata(ctx context.Context, name string) (*digraph.DAG, error) { - loc, err := d.fileLocation(name) - if err != nil { - return nil, err + filePath := resolveFilePath(d.dir, name) + if d.fileCache == nil { + return digraph.LoadMetadata(ctx, filePath) } - return d.metaCache.LoadLatest(loc, func() (*digraph.DAG, error) { - return digraph.LoadMetadata(ctx, loc) + return d.fileCache.LoadLatest(filePath, func() (*digraph.DAG, error) { + return digraph.LoadMetadata(ctx, filePath) }) } func (d *dagStoreImpl) GetDetails(ctx context.Context, name string) (*digraph.DAG, error) { - loc, err := d.fileLocation(name) - if err != nil { - return nil, err - } - dat, err := digraph.LoadWithoutEval(ctx, loc) + filePath := resolveFilePath(d.dir, name) + dat, err := digraph.LoadWithoutEval(ctx, filePath) if err != nil { return nil, err } @@ -63,11 +73,8 @@ func (d *dagStoreImpl) GetDetails(ctx context.Context, name string) (*digraph.DA } func (d *dagStoreImpl) GetSpec(_ context.Context, name string) (string, error) { - loc, err := d.fileLocation(name) - if err != nil { - return "", err - } - dat, err := os.ReadFile(loc) + filePath := resolveFilePath(d.dir, name) + dat, err := os.ReadFile(filePath) if err != nil { return "", err } @@ -80,23 +87,21 @@ const defaultPerm os.FileMode = 0744 var errDOGFileNotExist = errors.New("the DAG file does not exist") func (d *dagStoreImpl) UpdateSpec(ctx context.Context, name string, spec []byte) error { - // validation + // Load the DAG to validate the spec. _, err := digraph.LoadYAML(ctx, spec) if err != nil { return err } - loc, err := d.fileLocation(name) - if err != nil { - return err - } - if !exists(loc) { - return fmt.Errorf("%w: %s", errDOGFileNotExist, loc) + filePath := resolveFilePath(d.dir, name) + if !fileExists(filePath) { + return fmt.Errorf("%w: %s", errDOGFileNotExist, filePath) } - err = os.WriteFile(loc, spec, defaultPerm) - if err != nil { + if err := os.WriteFile(filePath, spec, defaultPerm); err != nil { return err } - d.metaCache.Invalidate(loc) + if d.fileCache != nil { + d.fileCache.Invalidate(filePath) + } return nil } @@ -106,45 +111,27 @@ func (d *dagStoreImpl) Create(_ context.Context, name string, spec []byte) (stri if err := d.ensureDirExist(); err != nil { return "", err } - loc, err := d.fileLocation(name) - if err != nil { - return "", err - } - if exists(loc) { - return "", fmt.Errorf("%w: %s", errDAGFileAlreadyExists, loc) + filePath := resolveFilePath(d.dir, name) + if fileExists(filePath) { + return "", fmt.Errorf("%w: %s", errDAGFileAlreadyExists, filePath) } // nolint: gosec - return name, os.WriteFile(loc, spec, 0644) + return name, os.WriteFile(filePath, spec, 0644) } func (d *dagStoreImpl) Delete(_ context.Context, name string) error { - loc, err := d.fileLocation(name) - if err != nil { + filePath := resolveFilePath(d.dir, name) + if err := os.Remove(filePath); err != nil { return err } - err = os.Remove(loc) - if err != nil { - return err + if d.fileCache != nil { + d.fileCache.Invalidate(filePath) } - d.metaCache.Invalidate(loc) return nil } -func exists(file string) bool { - _, err := os.Stat(file) - return !os.IsNotExist(err) -} - -func (d *dagStoreImpl) fileLocation(name string) (string, error) { - if strings.Contains(name, "/") { - // this is for backward compatibility - return name, nil - } - return fileutil.EnsureYAMLExtension(path.Join(d.dir, name)), nil -} - func (d *dagStoreImpl) ensureDirExist() error { - if !exists(d.dir) { + if !fileExists(d.dir) { if err := os.MkdirAll(d.dir, 0755); err != nil { return err } @@ -152,42 +139,11 @@ func (d *dagStoreImpl) ensureDirExist() error { return nil } -func (d *dagStoreImpl) containsSearchText(text string, search *string) bool { - if search == nil { - return true - } - ret := strings.Contains(strings.ToLower(text), strings.ToLower(*search)) - return ret -} - -func (d *dagStoreImpl) searchTags(tags []string, searchTag *string) bool { - if searchTag == nil { - return true - } - - for _, tag := range tags { - if tag == *searchTag { - return true - } - } - - return false -} - -func (d *dagStoreImpl) getTagList(tagSet map[string]struct{}) []string { - tagList := make([]string, 0, len(tagSet)) - for tag := range tagSet { - tagList = append(tagList, tag) - } - return tagList -} - func (d *dagStoreImpl) ListPagination(ctx context.Context, params persistence.DAGListPaginationArgs) (*persistence.DagListPaginationResult, error) { var ( - dagList = make([]*digraph.DAG, 0) - errList = make([]string, 0) - count int - currentDag *digraph.DAG + dagList []*digraph.DAG + errList []string + count int ) if err := filepath.WalkDir(d.dir, func(_ string, entry fs.DirEntry, err error) error { @@ -195,32 +151,39 @@ func (d *dagStoreImpl) ListPagination(ctx context.Context, params persistence.DA return err } - if entry.IsDir() || !checkExtension(entry.Name()) { + if entry.IsDir() || !fileutil.IsYAMLFile(entry.Name()) { return nil } baseName := path.Base(entry.Name()) dagName := strings.TrimSuffix(baseName, path.Ext(baseName)) - if params.Tag == nil || *params.Tag == "" { - // if tag is not provided, check before reading the file - if !d.containsSearchText(dagName, params.Name) { - // skip the file if the name + if params.Name != "" && params.Tag == "" { + // If tag is not provided, check before reading the file to avoid + // unnecessary file read and parsing. + if !containsSearchText(dagName, params.Name) { + // Return early if the name does not match the search text. return nil } } - // if tag is provided, read the file and check the tag - if currentDag, err = d.GetMetadata(ctx, dagName); err != nil { + // Read the file and parse the DAG. + parsedDAG, err := d.GetMetadata(ctx, dagName) + if err != nil { errList = append(errList, fmt.Sprintf("reading %s failed: %s", dagName, err)) + return nil } - if !d.containsSearchText(dagName, params.Name) || currentDag == nil || !d.searchTags(currentDag.Tags, params.Tag) { + if params.Name != "" && !containsSearchText(dagName, params.Name) { + return nil + } + + if params.Tag != "" && !containsTag(parsedDAG.Tags, params.Tag) { return nil } count++ if count > (params.Page-1)*params.Limit && len(dagList) < params.Limit { - dagList = append(dagList, currentDag) + dagList = append(dagList, parsedDAG) } return nil @@ -244,19 +207,19 @@ func (d *dagStoreImpl) List(ctx context.Context) (ret []*digraph.DAG, errs []str errs = append(errs, err.Error()) return } - fis, err := os.ReadDir(d.dir) + entries, err := os.ReadDir(d.dir) if err != nil { errs = append(errs, err.Error()) return } - for _, fi := range fis { - if checkExtension(fi.Name()) { - dat, err := d.GetMetadata(ctx, fi.Name()) + for _, entry := range entries { + if fileutil.IsYAMLFile(entry.Name()) { + dat, err := d.GetMetadata(ctx, entry.Name()) if err == nil { ret = append(ret, dat) } else { errs = append(errs, fmt.Sprintf( - "reading %s failed: %s", fi.Name(), err), + "reading %s failed: %s", entry.Name(), err), ) } } @@ -264,18 +227,6 @@ func (d *dagStoreImpl) List(ctx context.Context) (ret []*digraph.DAG, errs []str return ret, errs, nil } -var extensions = []string{".yaml", ".yml"} - -func checkExtension(file string) bool { - ext := filepath.Ext(file) - for _, e := range extensions { - if e == ext { - return true - } - } - return false -} - func (d *dagStoreImpl) Grep(ctx context.Context, pattern string) ( ret []*persistence.GrepResult, errs []string, err error, ) { @@ -286,39 +237,33 @@ func (d *dagStoreImpl) Grep(ctx context.Context, pattern string) ( return } - fis, err := os.ReadDir(d.dir) + entries, err := os.ReadDir(d.dir) if err != nil { logger.Error(ctx, "Failed to read directory", "dir", d.dir, "err", err) } - opts := &grep.Options{ - IsRegexp: true, - Before: 2, - After: 2, - } - - for _, fi := range fis { - if fileutil.IsYAMLFile(fi.Name()) { - filePath := filepath.Join(d.dir, fi.Name()) + for _, entry := range entries { + if fileutil.IsYAMLFile(entry.Name()) { + filePath := filepath.Join(d.dir, entry.Name()) dat, err := os.ReadFile(filePath) if err != nil { - logger.Error(ctx, "Failed to read DAG file", "file", fi.Name(), "err", err) + logger.Error(ctx, "Failed to read DAG file", "file", entry.Name(), "err", err) continue } - m, err := grep.Grep(dat, fmt.Sprintf("(?i)%s", pattern), opts) + matches, err := grep.Grep(dat, fmt.Sprintf("(?i)%s", pattern), grep.DefaultOptions) if err != nil { - errs = append(errs, fmt.Sprintf("grep %s failed: %s", fi.Name(), err)) + errs = append(errs, fmt.Sprintf("grep %s failed: %s", entry.Name(), err)) continue } dag, err := digraph.LoadMetadata(ctx, filePath) if err != nil { - errs = append(errs, fmt.Sprintf("check %s failed: %s", fi.Name(), err)) + errs = append(errs, fmt.Sprintf("check %s failed: %s", entry.Name(), err)) continue } ret = append(ret, &persistence.GrepResult{ - Name: strings.TrimSuffix(fi.Name(), path.Ext(fi.Name())), + Name: strings.TrimSuffix(entry.Name(), path.Ext(entry.Name())), DAG: dag, - Matches: m, + Matches: matches, }) } } @@ -326,58 +271,42 @@ func (d *dagStoreImpl) Grep(ctx context.Context, pattern string) ( } func (d *dagStoreImpl) Rename(_ context.Context, oldID, newID string) error { - oldLoc, err := d.fileLocation(oldID) - if err != nil { - return err - } - newLoc, err := d.fileLocation(newID) - if err != nil { - return err - } - return os.Rename(oldLoc, newLoc) + oldFilePath := resolveFilePath(d.dir, oldID) + newFilePath := resolveFilePath(d.dir, newID) + return os.Rename(oldFilePath, newFilePath) } -func (d *dagStoreImpl) Find(ctx context.Context, name string) (*digraph.DAG, error) { - file, err := d.resolve(name) +func (d *dagStoreImpl) FindByName(ctx context.Context, name string) (*digraph.DAG, error) { + file, err := d.locateDAG(name) if err != nil { return nil, err } return digraph.LoadWithoutEval(ctx, file) } -func (d *dagStoreImpl) resolve(name string) (string, error) { - // check if the name is a file path - if strings.Contains(name, string(filepath.Separator)) { - if !fileutil.FileExists(name) { - return "", fmt.Errorf("DAG %s not found", name) - } - return name, nil - } - - // check if the name is a file path - if strings.Contains(name, string(filepath.Separator)) { - foundPath, err := find(name) - if err != nil { - return "", fmt.Errorf("DAG %s not found", name) +func (d *dagStoreImpl) locateDAG(nameOrPath string) (string, error) { + if strings.Contains(nameOrPath, string(filepath.Separator)) { + foundPath, err := findDAGFile(nameOrPath) + if err == nil { + return foundPath, nil } - return foundPath, nil } - // find the DAG definition - for _, dir := range []string{".", d.dir} { - subWorkflowPath := filepath.Join(dir, name) - foundPath, err := find(subWorkflowPath) + searchPaths := []string{".", d.dir} + for _, dir := range searchPaths { + candidatePath := filepath.Join(dir, nameOrPath) + foundPath, err := findDAGFile(candidatePath) if err == nil { return foundPath, nil } } // DAG not found - return "", fmt.Errorf("workflow %s not found", name) + return "", fmt.Errorf("workflow %s not found", nameOrPath) } -// find finds the sub workflow file with the given name. -func find(name string) (string, error) { +// findDAGFile finds the sub workflow file with the given name. +func findDAGFile(name string) (string, error) { ext := path.Ext(name) if ext == "" { // try all supported extensions @@ -390,35 +319,30 @@ func find(name string) (string, error) { // the name has an extension return filepath.Abs(name) } - return "", fmt.Errorf("sub workflow %s not found", name) + return "", fmt.Errorf("file %s not found", name) } func (d *dagStoreImpl) TagList(ctx context.Context) ([]string, []string, error) { var ( - errList = make([]string, 0) - tagSet = make(map[string]struct{}) - currentDag *digraph.DAG - err error + errList []string + tagSet = make(map[string]struct{}) ) - if err = filepath.WalkDir(d.dir, func(_ string, dir fs.DirEntry, err error) error { + if err := filepath.WalkDir(d.dir, func(_ string, entry fs.DirEntry, err error) error { if err != nil { return err } - if dir.IsDir() || !checkExtension(dir.Name()) { + if entry.IsDir() || !fileutil.IsYAMLFile(entry.Name()) { return nil } - if currentDag, err = d.GetMetadata(ctx, dir.Name()); err != nil { - errList = append(errList, fmt.Sprintf("reading %s failed: %s", dir.Name(), err)) - } - - if currentDag == nil { - return nil + parsedDAG, err := d.GetMetadata(ctx, entry.Name()) + if err != nil { + errList = append(errList, fmt.Sprintf("reading %s failed: %s", entry.Name(), err)) } - for _, tag := range currentDag.Tags { + for _, tag := range parsedDAG.Tags { tagSet[tag] = struct{}{} } @@ -427,5 +351,39 @@ func (d *dagStoreImpl) TagList(ctx context.Context) ([]string, []string, error) return nil, append(errList, err.Error()), err } - return d.getTagList(tagSet), errList, nil + tagList := make([]string, 0, len(tagSet)) + for tag := range tagSet { + tagList = append(tagList, tag) + } + return tagList, errList, nil +} + +func containsSearchText(text string, search string) bool { + return strings.Contains(strings.ToLower(text), strings.ToLower(search)) +} + +func containsTag(tags []string, searchTag string) bool { + for _, tag := range tags { + if strings.EqualFold(tag, searchTag) { + return true + } + } + + return false +} + +func fileExists(file string) bool { + _, err := os.Stat(file) + return !os.IsNotExist(err) +} + +func resolveFilePath(dir, name string) string { + if strings.Contains(name, string(filepath.Separator)) { + filePath, err := filepath.Abs(name) + if err == nil { + return filePath + } + } + filePath := fileutil.EnsureYAMLExtension(path.Join(dir, name)) + return filepath.Clean(filePath) } diff --git a/internal/scheduler/entryreader_test.go b/internal/scheduler/entryreader_test.go index c3e00be0c..79176bb26 100644 --- a/internal/scheduler/entryreader_test.go +++ b/internal/scheduler/entryreader_test.go @@ -13,7 +13,9 @@ import ( "github.com/dagu-org/dagu/internal/build" "github.com/dagu-org/dagu/internal/client" "github.com/dagu-org/dagu/internal/fileutil" - dsclient "github.com/dagu-org/dagu/internal/persistence/client" + "github.com/dagu-org/dagu/internal/persistence/jsondb" + "github.com/dagu-org/dagu/internal/persistence/local" + "github.com/dagu-org/dagu/internal/persistence/local/storage" "github.com/stretchr/testify/require" @@ -95,14 +97,11 @@ func setupTest(t *testing.T) (string, client.Client) { WorkDir: tmpDir, } - dataStore := dsclient.NewDataStores( - cfg.Paths.DAGsDir, - cfg.Paths.DataDir, - cfg.Paths.SuspendFlagsDir, - dsclient.DataStoreOptions{ - LatestStatusToday: cfg.LatestStatusToday, - }, + dagStore := local.NewDAGStore(cfg.Paths.DAGsDir) + historyStore := jsondb.New(cfg.Paths.DataDir) + flagStore := local.NewFlagStore( + storage.NewStorage(cfg.Paths.SuspendFlagsDir), ) - return tmpDir, client.New(dataStore, "", cfg.WorkDir) + return tmpDir, client.New(dagStore, historyStore, flagStore, "", cfg.WorkDir) } diff --git a/internal/test/setup.go b/internal/test/setup.go index 8948219c2..15577b5bc 100644 --- a/internal/test/setup.go +++ b/internal/test/setup.go @@ -22,7 +22,9 @@ import ( "github.com/dagu-org/dagu/internal/fileutil" "github.com/dagu-org/dagu/internal/logger" "github.com/dagu-org/dagu/internal/persistence" - dsclient "github.com/dagu-org/dagu/internal/persistence/client" + "github.com/dagu-org/dagu/internal/persistence/jsondb" + "github.com/dagu-org/dagu/internal/persistence/local" + "github.com/dagu-org/dagu/internal/persistence/local/storage" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -66,20 +68,20 @@ func Setup(t *testing.T, opts ...TestHelperOption) Helper { cfg.Paths.Executable = executablePath cfg.Paths.LogDir = filepath.Join(tmpDir, "logs") - dataStores := dsclient.NewDataStores( - cfg.Paths.DAGsDir, - cfg.Paths.DataDir, - cfg.Paths.SuspendFlagsDir, - dsclient.DataStoreOptions{ - LatestStatusToday: cfg.LatestStatusToday, - }, + dagStore := local.NewDAGStore(cfg.Paths.DAGsDir) + historyStore := jsondb.New(cfg.Paths.DataDir) + flagStore := local.NewFlagStore( + storage.NewStorage(cfg.Paths.SuspendFlagsDir), ) + client := client.New(dagStore, historyStore, flagStore, cfg.Paths.Executable, cfg.WorkDir) + helper := Helper{ - Context: createDefaultContext(), - Config: cfg, - Client: client.New(dataStores, cfg.Paths.Executable, cfg.WorkDir), - DataStores: dataStores, + Context: createDefaultContext(), + Config: cfg, + Client: client, + DAGStore: dagStore, + HistoryStore: historyStore, tmpDir: tmpDir, } @@ -98,7 +100,8 @@ type Helper struct { Config *config.Config LoggingOutput *SyncBuffer Client client.Client - DataStores persistence.DataStores + HistoryStore persistence.HistoryStore + DAGStore persistence.DAGStore tmpDir string } @@ -191,7 +194,8 @@ func (d *DAG) Agent(opts ...AgentOption) *Agent { logDir, logFile, d.Client, - d.DataStores, + d.DAGStore, + d.HistoryStore, helper.opts, ) diff --git a/ui/.vscode/settings.json b/ui/.vscode/settings.json new file mode 100644 index 000000000..b98afa77a --- /dev/null +++ b/ui/.vscode/settings.json @@ -0,0 +1,14 @@ +{ + "[typescript]": { + "editor.defaultFormatter": "esbenp.prettier-vscode", + "editor.formatOnSave": true + }, + "[typescriptreact]": { + "editor.defaultFormatter": "esbenp.prettier-vscode", + "editor.formatOnSave": true + }, + "[javascript]": { + "editor.defaultFormatter": "esbenp.prettier-vscode", + "editor.formatOnSave": true + } +} \ No newline at end of file diff --git a/ui/src/components/molecules/Graph.tsx b/ui/src/components/molecules/Graph.tsx index be96f02c5..b63dc0fbe 100644 --- a/ui/src/components/molecules/Graph.tsx +++ b/ui/src/components/molecules/Graph.tsx @@ -12,6 +12,7 @@ type Props = { steps?: Step[] | Node[]; onClickNode?: onClickNode; showIcons?: boolean; + animate?: boolean; }; declare global { @@ -26,6 +27,7 @@ const Graph: React.FC = ({ type = 'status', onClickNode, showIcons = true, + animate = true, }) => { // Calculate width based on flowchart type and graph breadth const width = React.useMemo(() => { @@ -58,11 +60,11 @@ const Graph: React.FC = ({ }; // Define FontAwesome icons for each status with colors and animations - const statusIcons = { + const statusIcons: { [key: number]: string } = { [NodeStatus.None]: - "", + "", [NodeStatus.Running]: - "", + "", [NodeStatus.Error]: "", [NodeStatus.Cancel]: "", @@ -71,6 +73,12 @@ const Graph: React.FC = ({ [NodeStatus.Skipped]: "", }; + if (!animate) { + // Remove animations if disabled + Object.keys(statusIcons).forEach((key: string) => { + statusIcons[+key] = statusIcons[+key].replace(/animation:.*?;/g, ''); + }); + } const graph = React.useMemo(() => { if (!steps) return ''; @@ -138,12 +146,24 @@ const Graph: React.FC = ({ } // Define node styles for different states with refined colors - dat.push('classDef none fill:#f0f9ff,stroke:#93c5fd,color:#1e40af,stroke-width:1.2px,white-space:nowrap'); - dat.push('classDef running fill:#f0fdf4,stroke:#86efac,color:#166534,stroke-width:1.2px,white-space:nowrap'); - dat.push('classDef error fill:#fef2f2,stroke:#fca5a5,color:#aa1010,stroke-width:1.2px,white-space:nowrap'); - dat.push('classDef cancel fill:#fdf2f8,stroke:#f9a8d4,color:#9d174d,stroke-width:1.2px,white-space:nowrap'); - dat.push('classDef done fill:#f0fdf4,stroke:#86efac,color:#166534,stroke-width:1.2px,white-space:nowrap'); - dat.push('classDef skipped fill:#f8fafc,stroke:#cbd5e1,color:#475569,stroke-width:1.2px,white-space:nowrap'); + dat.push( + 'classDef none fill:#f0f9ff,stroke:#93c5fd,color:#1e40af,stroke-width:1.2px,white-space:nowrap' + ); + dat.push( + 'classDef running fill:#f0fdf4,stroke:#86efac,color:#166534,stroke-width:1.2px,white-space:nowrap' + ); + dat.push( + 'classDef error fill:#fef2f2,stroke:#fca5a5,color:#aa1010,stroke-width:1.2px,white-space:nowrap' + ); + dat.push( + 'classDef cancel fill:#fdf2f8,stroke:#f9a8d4,color:#9d174d,stroke-width:1.2px,white-space:nowrap' + ); + dat.push( + 'classDef done fill:#f0fdf4,stroke:#86efac,color:#166534,stroke-width:1.2px,white-space:nowrap' + ); + dat.push( + 'classDef skipped fill:#f8fafc,stroke:#cbd5e1,color:#475569,stroke-width:1.2px,white-space:nowrap' + ); // Add custom link styles dat.push(...linkStyles); @@ -220,4 +240,4 @@ const graphStatusMap = { [NodeStatus.Cancel]: ':::cancel', [NodeStatus.Success]: ':::done', [NodeStatus.Skipped]: ':::skipped', -}; \ No newline at end of file +}; diff --git a/ui/src/components/organizations/DAGStatus.tsx b/ui/src/components/organizations/DAGStatus.tsx index b788480e4..39936f439 100644 --- a/ui/src/components/organizations/DAGStatus.tsx +++ b/ui/src/components/organizations/DAGStatus.tsx @@ -128,6 +128,7 @@ function DAGStatus({ DAG, name, refresh }: Props) { flowchart={flowchart} onClickNode={onSelectStepOnGraph} showIcons={DAG.Status.Status != SchedulerStatus.None} + animate={DAG.Status.Status == SchedulerStatus.Running} > ) : (