Skip to content

Commit

Permalink
persistence: chore: cleanup (#761)
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta authored Dec 31, 2024
1 parent 6a9ff21 commit 5646c48
Show file tree
Hide file tree
Showing 34 changed files with 834 additions and 822 deletions.
3 changes: 1 addition & 2 deletions cmd/common_test.go → cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ func (td *testDAG) AssertCurrentStatus(t *testing.T, expected scheduler.Status)
func (th *testDAG) AssertLastStatus(t *testing.T, expected scheduler.Status) {
t.Helper()

hs := th.DataStores.HistoryStore()
require.Eventually(t, func() bool {
status := hs.ReadStatusRecent(th.Context, th.Path, 1)
status := th.HistoryStore.ReadStatusRecent(th.Context, th.Path, 1)
if len(status) < 1 {
return false
}
Expand Down
75 changes: 0 additions & 75 deletions cmd/common.go

This file was deleted.

30 changes: 16 additions & 14 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/dagu-org/dagu/internal/agent"
"github.com/dagu-org/dagu/internal/config"
"github.com/dagu-org/dagu/internal/digraph"
"github.com/dagu-org/dagu/internal/logger"
"github.com/spf13/cobra"
)

Expand All @@ -33,6 +32,7 @@ func runDry(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
setup := newSetup(cfg)

cmd.Flags().StringP("params", "p", "", "parameters")
params, err := cmd.Flags().GetString("params")
Expand All @@ -41,6 +41,7 @@ func runDry(cmd *cobra.Command, args []string) error {
}

ctx := cmd.Context()

dag, err := digraph.Load(ctx, cfg.Paths.BaseConfig, args[0], removeQuotes(params))
if err != nil {
return fmt.Errorf("failed to load DAG from %s: %w", args[0], err)
Expand All @@ -51,31 +52,32 @@ func runDry(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to generate request ID: %w", err)
}

logSettings := logFileSettings{
Prefix: dryPrefix,
LogDir: cfg.Paths.LogDir,
DAGLogDir: dag.LogDir,
DAGName: dag.Name,
RequestID: requestID,
logFile, err := setup.openLogFile(dryPrefix, dag, requestID)
if err != nil {
return fmt.Errorf("failed to initialize log file for DAG %s: %w", dag.Name, err)
}
defer logFile.Close()

ctx = setup.loggerContextWithFile(ctx, false, logFile)

logFile, err := openLogFile(logSettings)
dagStore, err := setup.dagStore()
if err != nil {
return fmt.Errorf("failed to create log file for DAG %s: %w", dag.Name, err)
return fmt.Errorf("failed to initialize DAG store: %w", err)
}
defer logFile.Close()

ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, false))
dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore)
cli, err := setup.client()
if err != nil {
return fmt.Errorf("failed to initialize client: %w", err)
}

agt := agent.New(
requestID,
dag,
filepath.Dir(logFile.Name()),
logFile.Name(),
cli,
dataStore,
dagStore,
setup.historyStore(),
&agent.Options{Dry: true},
)

Expand Down
129 changes: 0 additions & 129 deletions cmd/logger.go

This file was deleted.

49 changes: 24 additions & 25 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func runRestart(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("failed to load configuration: %w", err)
}
setup := newSetup(cfg)

quiet, err := cmd.Flags().GetBool("quiet")
if err != nil {
return fmt.Errorf("failed to get quiet flag: %w", err)
}

ctx := cmd.Context()
ctx = logger.WithLogger(ctx, buildLogger(cfg, quiet))
ctx := setup.loggerContext(cmd.Context(), quiet)

specFilePath := args[0]

Expand All @@ -59,20 +59,20 @@ func runRestart(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to load DAG from %s: %w", specFilePath, err)
}

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore)

// Handle the restart process
if err := handleRestartProcess(ctx, cli, cfg, dag, quiet, specFilePath); err != nil {
if err := handleRestartProcess(ctx, setup, dag, quiet, specFilePath); err != nil {
logger.Error(ctx, "Failed to restart process", "path", specFilePath, "err", err)
return fmt.Errorf("restart process failed for DAG %s: %w", dag.Name, err)
}

return nil
}

func handleRestartProcess(ctx context.Context, cli client.Client, cfg *config.Config,
dag *digraph.DAG, quiet bool, specFilePath string) error {
func handleRestartProcess(ctx context.Context, setup *setup, dag *digraph.DAG, quiet bool, specFilePath string) error {
cli, err := setup.client()
if err != nil {
return fmt.Errorf("failed to initialize client: %w", err)
}

// Stop if running
if err := stopDAGIfRunning(ctx, cli, dag); err != nil {
Expand All @@ -89,47 +89,46 @@ func handleRestartProcess(ctx context.Context, cli client.Client, cfg *config.Co
}

// Reload DAG with parameters
dag, err = digraph.Load(ctx, cfg.Paths.BaseConfig, specFilePath, params)
dag, err = digraph.Load(ctx, setup.cfg.Paths.BaseConfig, specFilePath, params)
if err != nil {
return fmt.Errorf("failed to reload DAG with params: %w", err)
}

return executeDAG(ctx, cli, cfg, dag, quiet)
return executeDAG(ctx, cli, setup, dag, quiet)
}

func executeDAG(ctx context.Context, cli client.Client, cfg *config.Config,
func executeDAG(ctx context.Context, cli client.Client, setup *setup,
dag *digraph.DAG, quiet bool) error {

requestID, err := generateRequestID()
if err != nil {
return fmt.Errorf("failed to generate request ID: %w", err)
}

logFile, err := openLogFile(logFileSettings{
Prefix: restartPrefix,
LogDir: cfg.Paths.LogDir,
DAGLogDir: dag.LogDir,
DAGName: dag.Name,
RequestID: requestID,
})
logFile, err := setup.openLogFile(restartPrefix, dag, requestID)
if err != nil {
return fmt.Errorf("failed to create log file: %w", err)
return fmt.Errorf("failed to initialize log file: %w", err)
}
defer logFile.Close()

logger.Info(ctx, "DAG restart initiated",
"DAG", dag.Name,
"requestID", requestID,
"logFile", logFile.Name())
ctx = setup.loggerContextWithFile(ctx, quiet, logFile)

logger.Info(ctx, "DAG restart initiated", "DAG", dag.Name, "requestID", requestID, "logFile", logFile.Name())

dagStore, err := setup.dagStore()
if err != nil {
logger.Error(ctx, "Failed to initialize DAG store", "err", err)
return fmt.Errorf("failed to initialize DAG store: %w", err)
}

ctx = logger.WithLogger(ctx, buildLoggerWithFile(logFile, quiet))
agt := agent.New(
requestID,
dag,
filepath.Dir(logFile.Name()),
logFile.Name(),
cli,
newDataStores(cfg),
dagStore,
setup.historyStore(),
&agent.Options{Dry: false})

listenSignals(ctx, agt)
Expand Down
Loading

0 comments on commit 5646c48

Please sign in to comment.