Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Aug 12, 2024
1 parent 083a402 commit 935d2d2
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 130 deletions.
25 changes: 13 additions & 12 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"

"github.com/daguflow/dagu/internal/agent"
Expand Down Expand Up @@ -45,32 +44,35 @@ func dryCmd() *cobra.Command {

params, err := cmd.Flags().GetString("params")
if err != nil {
initLogger.Error("Parameter retrieval failed", "error", err)
os.Exit(1)
initLogger.Fatal("Parameter retrieval failed", "error", err)
}

workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "dry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})

if err != nil {
initLogger.Error(
initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -98,11 +100,10 @@ func dryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Workflow execution failed",
agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Expand Down
56 changes: 0 additions & 56 deletions cmd/logging.go

This file was deleted.

30 changes: 14 additions & 16 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"
"time"

Expand Down Expand Up @@ -57,18 +56,16 @@ func restartCmd() *cobra.Command {
specFilePath := args[0]
workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "")
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)

if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
initLogger.Error("Workflow stop operation failed",
initLogger.Fatal("Workflow stop operation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Wait for the specified amount of time before restarting.
Expand All @@ -77,35 +74,37 @@ func restartCmd() *cobra.Command {
// Retrieve the parameter of the previous execution.
params, err := getPreviousExecutionParams(cli, workflow)
if err != nil {
initLogger.Error("Previous execution parameter retrieval failed",
initLogger.Fatal("Previous execution parameter retrieval failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}

// Start the DAG with the same parameter.
// Need to reload the DAG file with the parameter.
workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params)
if err != nil {
initLogger.Error("Workflow reload failed",
initLogger.Fatal("Workflow reload failed",
"error", err,
"file", specFilePath,
"params", params)
os.Exit(1)
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "restart_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -133,11 +132,10 @@ func restartCmd() *cobra.Command {

listenSignals(cmd.Context(), agt)
if err := agt.Run(cmd.Context()); err != nil {
agentLogger.Error("Workflow restart failed",
agentLogger.Fatal("Workflow restart failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Expand Down
26 changes: 13 additions & 13 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,44 +56,45 @@ func retryCmd() *cobra.Command {
specFilePath := args[0]
absoluteFilePath, err := filepath.Abs(specFilePath)
if err != nil {
initLogger.Error("Absolute path resolution failed",
initLogger.Fatal("Absolute path resolution failed",
"error", err,
"file", specFilePath)
os.Exit(1)
}

status, err := historyStore.FindByRequestID(absoluteFilePath, requestID)
if err != nil {
initLogger.Error("Historical execution retrieval failed",
initLogger.Fatal("Historical execution retrieval failed",
"error", err,
"requestID", requestID,
"file", absoluteFilePath)
os.Exit(1)
}

// Start the DAG with the same parameters with the execution that
// is being retried.
workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params)
if err != nil {
initLogger.Error("Workflow specification load failed",
initLogger.Fatal("Workflow specification load failed",
"error", err,
"file", specFilePath,
"params", status.Status.Params)
os.Exit(1)
}

newRequestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "retry_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: newRequestID,
})
if err != nil {
initLogger.Error("Log file creation failed",
initLogger.Fatal("Log file creation failed",
"error", err,
"workflow", workflow.Name)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -126,8 +127,7 @@ func retryCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Failed to start workflow", "error", err)
os.Exit(1)
agentLogger.Fatal("Failed to start workflow", "error", err)
}
},
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/logger"
Expand Down Expand Up @@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
sc := scheduler.New(cfg, logger, cli)
if err := sc.Start(ctx); err != nil {
logger.Error(
logger.Fatal(
"Scheduler initialization failed",
"error",
err,
"specsDirectory",
cfg.DAGs,
)
os.Exit(1)
}
},
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"

"github.com/daguflow/dagu/internal/config"
"github.com/daguflow/dagu/internal/frontend"
Expand Down Expand Up @@ -52,8 +51,7 @@ func serverCmd() *cobra.Command {
cli := newClient(cfg, dataStore, logger)
server := frontend.New(cfg, logger, cli)
if err := server.Serve(cmd.Context()); err != nil {
logger.Error("Server initialization failed", "error", err)
os.Exit(1)
logger.Fatal("Server initialization failed", "error", err)
}
},
}
Expand Down
24 changes: 12 additions & 12 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"log"
"os"
"path/filepath"

"github.com/daguflow/dagu/internal/agent"
Expand Down Expand Up @@ -52,32 +51,34 @@ func startCmd() *cobra.Command {

params, err := cmd.Flags().GetString("params")
if err != nil {
initLogger.Error("Parameter retrieval failed", "error", err)
os.Exit(1)
initLogger.Fatal("Parameter retrieval failed", "error", err)
}

workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params))
if err != nil {
initLogger.Error("Workflow load failed", "error", err, "file", args[0])
os.Exit(1)
initLogger.Fatal("Workflow load failed", "error", err, "file", args[0])
}

requestID, err := generateRequestID()
if err != nil {
initLogger.Error("Request ID generation failed", "error", err)
os.Exit(1)
initLogger.Fatal("Request ID generation failed", "error", err)
}

logFile, err := openLogFile("start_", cfg.LogDir, workflow, requestID)
logFile, err := logger.OpenLogFile(logger.LogFileConfig{
Prefix: "start_",
LogDir: cfg.LogDir,
DAGLogDir: workflow.LogDir,
DAGName: workflow.Name,
RequestID: requestID,
})
if err != nil {
initLogger.Error(
initLogger.Fatal(
"Log file creation failed",
"error",
err,
"workflow",
workflow.Name,
)
os.Exit(1)
}
defer logFile.Close()

Expand Down Expand Up @@ -111,11 +112,10 @@ func startCmd() *cobra.Command {
listenSignals(ctx, agt)

if err := agt.Run(ctx); err != nil {
agentLogger.Error("Workflow execution failed",
agentLogger.Fatal("Workflow execution failed",
"error", err,
"workflow", workflow.Name,
"requestID", requestID)
os.Exit(1)
}
},
}
Expand Down
Loading

0 comments on commit 935d2d2

Please sign in to comment.