From 935d2d2fb5aaaf4a596b23f1bbec37fe5bdff91e Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Mon, 12 Aug 2024 12:13:28 +0900 Subject: [PATCH] wip --- cmd/dry.go | 25 ++++++------- cmd/logging.go | 56 ----------------------------- cmd/restart.go | 30 ++++++++-------- cmd/retry.go | 26 +++++++------- cmd/scheduler.go | 4 +-- cmd/server.go | 4 +-- cmd/start.go | 24 ++++++------- cmd/start_all.go | 7 ++-- cmd/status.go | 7 ++-- cmd/stop.go | 7 ++-- internal/logger/file.go | 75 +++++++++++++++++++++++++++++++++++++++ internal/logger/logger.go | 14 ++++++++ 12 files changed, 149 insertions(+), 130 deletions(-) delete mode 100644 cmd/logging.go create mode 100644 internal/logger/file.go diff --git a/cmd/dry.go b/cmd/dry.go index 7bbdc5a59..7b71d24ce 100644 --- a/cmd/dry.go +++ b/cmd/dry.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "github.com/daguflow/dagu/internal/agent" @@ -45,32 +44,35 @@ func dryCmd() *cobra.Command { params, err := cmd.Flags().GetString("params") if err != nil { - initLogger.Error("Parameter retrieval failed", "error", err) - os.Exit(1) + initLogger.Fatal("Parameter retrieval failed", "error", err) } workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params)) if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "dry_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) + if err != nil { - initLogger.Error( + initLogger.Fatal( "Log file creation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } defer logFile.Close() @@ -98,11 +100,10 @@ func dryCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Workflow execution failed", + agentLogger.Fatal("Workflow execution failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/logging.go b/cmd/logging.go deleted file mode 100644 index 517e0303a..000000000 --- a/cmd/logging.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (C) 2024 The Daguflow/Dagu Authors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package cmd - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/daguflow/dagu/internal/dag" - "github.com/daguflow/dagu/internal/util" -) - -// openLogFile opens a log file for the workflow. -func openLogFile( - prefix string, - logDir string, - workflow *dag.DAG, - requestID string, -) (*os.File, error) { - name := util.ValidFilename(workflow.Name) - if workflow.LogDir != "" { - logDir = filepath.Join(workflow.LogDir, name) - } - // Check if the log directory exists - if _, err := os.Stat(logDir); os.IsNotExist(err) { - // Create the log directory - if err := os.MkdirAll(logDir, 0755); err != nil { - return nil, err - } - } - file := filepath.Join(logDir, fmt.Sprintf("%s%s.%s.%s.log", - prefix, - name, - time.Now().Format("20060102.15:04:05.000"), - util.TruncString(requestID, 8), - )) - // Open or create the log file - return os.OpenFile( - file, os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0644, - ) -} diff --git a/cmd/restart.go b/cmd/restart.go index 640f215be..ad06a3917 100644 --- a/cmd/restart.go +++ b/cmd/restart.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "time" @@ -57,18 +56,16 @@ func restartCmd() *cobra.Command { specFilePath := args[0] workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "") if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } dataStore := newDataStores(cfg) cli := newClient(cfg, dataStore, initLogger) if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil { - initLogger.Error("Workflow stop operation failed", + initLogger.Fatal("Workflow stop operation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } // Wait for the specified amount of time before restarting. @@ -77,35 +74,37 @@ func restartCmd() *cobra.Command { // Retrieve the parameter of the previous execution. params, err := getPreviousExecutionParams(cli, workflow) if err != nil { - initLogger.Error("Previous execution parameter retrieval failed", + initLogger.Fatal("Previous execution parameter retrieval failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } // Start the DAG with the same parameter. // Need to reload the DAG file with the parameter. workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params) if err != nil { - initLogger.Error("Workflow reload failed", + initLogger.Fatal("Workflow reload failed", "error", err, "file", specFilePath, "params", params) - os.Exit(1) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "restart_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) if err != nil { - initLogger.Error("Log file creation failed", + initLogger.Fatal("Log file creation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } defer logFile.Close() @@ -133,11 +132,10 @@ func restartCmd() *cobra.Command { listenSignals(cmd.Context(), agt) if err := agt.Run(cmd.Context()); err != nil { - agentLogger.Error("Workflow restart failed", + agentLogger.Fatal("Workflow restart failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/retry.go b/cmd/retry.go index 99646c963..583cc00fa 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -56,44 +56,45 @@ func retryCmd() *cobra.Command { specFilePath := args[0] absoluteFilePath, err := filepath.Abs(specFilePath) if err != nil { - initLogger.Error("Absolute path resolution failed", + initLogger.Fatal("Absolute path resolution failed", "error", err, "file", specFilePath) - os.Exit(1) } status, err := historyStore.FindByRequestID(absoluteFilePath, requestID) if err != nil { - initLogger.Error("Historical execution retrieval failed", + initLogger.Fatal("Historical execution retrieval failed", "error", err, "requestID", requestID, "file", absoluteFilePath) - os.Exit(1) } // Start the DAG with the same parameters with the execution that // is being retried. workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params) if err != nil { - initLogger.Error("Workflow specification load failed", + initLogger.Fatal("Workflow specification load failed", "error", err, "file", specFilePath, "params", status.Status.Params) - os.Exit(1) } newRequestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "retry_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: newRequestID, + }) if err != nil { - initLogger.Error("Log file creation failed", + initLogger.Fatal("Log file creation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } defer logFile.Close() @@ -126,8 +127,7 @@ func retryCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Failed to start workflow", "error", err) - os.Exit(1) + agentLogger.Fatal("Failed to start workflow", "error", err) } }, } diff --git a/cmd/scheduler.go b/cmd/scheduler.go index e5ef45049..b25bbb102 100644 --- a/cmd/scheduler.go +++ b/cmd/scheduler.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/logger" @@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) sc := scheduler.New(cfg, logger, cli) if err := sc.Start(ctx); err != nil { - logger.Error( + logger.Fatal( "Scheduler initialization failed", "error", err, "specsDirectory", cfg.DAGs, ) - os.Exit(1) } }, } diff --git a/cmd/server.go b/cmd/server.go index 2889bee30..231f89ab9 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/frontend" @@ -52,8 +51,7 @@ func serverCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) server := frontend.New(cfg, logger, cli) if err := server.Serve(cmd.Context()); err != nil { - logger.Error("Server initialization failed", "error", err) - os.Exit(1) + logger.Fatal("Server initialization failed", "error", err) } }, } diff --git a/cmd/start.go b/cmd/start.go index f38002aba..069f384f2 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "github.com/daguflow/dagu/internal/agent" @@ -52,32 +51,34 @@ func startCmd() *cobra.Command { params, err := cmd.Flags().GetString("params") if err != nil { - initLogger.Error("Parameter retrieval failed", "error", err) - os.Exit(1) + initLogger.Fatal("Parameter retrieval failed", "error", err) } workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params)) if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("start_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "start_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) if err != nil { - initLogger.Error( + initLogger.Fatal( "Log file creation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } defer logFile.Close() @@ -111,11 +112,10 @@ func startCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Workflow execution failed", + agentLogger.Fatal("Workflow execution failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/start_all.go b/cmd/start_all.go index a49c3f0f3..ad8119d0d 100644 --- a/cmd/start_all.go +++ b/cmd/start_all.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/frontend" @@ -60,8 +59,7 @@ func startAllCmd() *cobra.Command { sc := scheduler.New(cfg, logger, cli) if err := sc.Start(ctx); err != nil { - logger.Error("Scheduler initialization failed", "error", err, "dags", cfg.DAGs) - os.Exit(1) + logger.Fatal("Scheduler initialization failed", "error", err, "dags", cfg.DAGs) } }() @@ -69,8 +67,7 @@ func startAllCmd() *cobra.Command { server := frontend.New(cfg, logger, cli) if err := server.Serve(ctx); err != nil { - logger.Error("Server initialization failed", "error", err) - os.Exit(1) + logger.Fatal("Server initialization failed", "error", err) } }, } diff --git a/cmd/status.go b/cmd/status.go index 9c823f008..6266670f5 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/dag" @@ -44,8 +43,7 @@ func statusCmd() *cobra.Command { // Load the DAG file and get the current running status. workflow, err := dag.Load(cfg.BaseConfig, args[0], "") if err != nil { - logger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + logger.Fatal("Workflow load failed", "error", err, "file", args[0]) } dataStore := newDataStores(cfg) @@ -54,8 +52,7 @@ func statusCmd() *cobra.Command { curStatus, err := cli.GetCurrentStatus(workflow) if err != nil { - logger.Error("Current status retrieval failed", "error", err) - os.Exit(1) + logger.Fatal("Current status retrieval failed", "error", err) } logger.Info("Current status", "pid", curStatus.PID, "status", curStatus.Status) diff --git a/cmd/stop.go b/cmd/stop.go index 4ea594fa9..a3ddea739 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/dag" @@ -50,8 +49,7 @@ func stopCmd() *cobra.Command { workflow, err := dag.Load(cfg.BaseConfig, args[0], "") if err != nil { - logger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + logger.Fatal("Workflow load failed", "error", err, "file", args[0]) } logger.Info("Workflow stop initiated", "workflow", workflow.Name) @@ -60,14 +58,13 @@ func stopCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) if err := cli.Stop(workflow); err != nil { - logger.Error( + logger.Fatal( "Workflow stop operation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } }, } diff --git a/internal/logger/file.go b/internal/logger/file.go new file mode 100644 index 000000000..2a6f374af --- /dev/null +++ b/internal/logger/file.go @@ -0,0 +1,75 @@ +// Copyright (C) 2024 The Daguflow/Dagu Authors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logger + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/daguflow/dagu/internal/util" +) + +// LogFileConfig holds the configuration for opening a log file +type LogFileConfig struct { + Prefix string + LogDir string + DAGLogDir string + DAGName string + RequestID string +} + +// OpenLogFile opens a log file for the workflow. +func OpenLogFile(config LogFileConfig) (*os.File, error) { + logDir, err := prepareLogDirectory(config) + if err != nil { + return nil, fmt.Errorf("failed to prepare log directory: %w", err) + } + + filename := generateLogFilename(config) + return openFile(filepath.Join(logDir, filename)) +} + +func prepareLogDirectory(config LogFileConfig) (string, error) { + logDir := config.LogDir + if config.LogDir != "" { + logDir = filepath.Join(config.DAGLogDir, util.ValidFilename(config.DAGName)) + } + + if err := os.MkdirAll(logDir, 0755); err != nil { + return "", fmt.Errorf("failed to create log directory: %w", err) + } + + return logDir, nil +} + +func generateLogFilename(config LogFileConfig) string { + return fmt.Sprintf("%s%s.%s.%s.log", + config.Prefix, + util.ValidFilename(config.DAGName), + time.Now().Format("20060102.15:04:05.000"), + util.TruncString(config.RequestID, 8), + ) +} + +func openFile(filepath string) (*os.File, error) { + return os.OpenFile( + filepath, + os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, + 0644, + ) +} diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 73fa37022..5988f39ba 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -31,11 +31,13 @@ type ( Info(msg string, tags ...any) Warn(msg string, tags ...any) Error(msg string, tags ...any) + Fatal(msg string, tags ...any) Debugf(format string, v ...any) Infof(format string, v ...any) Warnf(format string, v ...any) Errorf(format string, v ...any) + Fatalf(format string, v ...any) With(attrs ...any) Logger WithGroup(name string) Logger @@ -182,6 +184,12 @@ func (a *appLogger) Errorf(format string, v ...any) { a.logger.Error(fmt.Sprintf(a.prefix+format, v...)) } +// Fatalf implements logger.Logger. +func (a *appLogger) Fatalf(format string, v ...any) { + a.logger.Error(fmt.Sprintf(a.prefix+format, v...)) + os.Exit(1) +} + // Infof implements logger.Logger. func (a *appLogger) Infof(format string, v ...any) { a.logger.Info(fmt.Sprintf(a.prefix+format, v...)) @@ -200,6 +208,12 @@ func (a *appLogger) Debug(msg string, tags ...any) { // Error implements logger.Logger. func (a *appLogger) Error(msg string, tags ...any) { a.logger.Error(a.prefix+msg, tags...) + os.Exit(1) +} + +// Fatal implements logger.Logger. +func (a *appLogger) Fatal(msg string, tags ...any) { + a.logger.Error(a.prefix+msg, tags...) } // Info implements logger.Logger.