Skip to content

Commit

Permalink
simplify (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
crenshaw-dev authored Nov 4, 2022
1 parent 0a7e274 commit 1feb2af
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 181 deletions.
95 changes: 45 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,33 @@ spec:
entrypoint: main
templates:
- name: main
plugin:
argocd:
actions:
- - app:
sync:
apps:
steps:
- - name: sync
template: sync
arguments:
parameters:
- name: apps
value: |
- name: guestbook-frontend
- name: guestbook-backend
- - name: diff
template: diff
- name: sync
inputs:
parameters:
- name: apps
plugin:
argocd:
app:
sync:
apps: "{{inputs.parameters.apps}}"
- name: diff
plugin:
argocd:
app:
diff:
app:
name: guestbook-frontend
```
## Getting Started
Expand Down Expand Up @@ -71,30 +90,6 @@ The `actions` field of the plugin config accepts a nested list of actions. Paren
child lists are executed in parallel. This allows you to run multiple actions in parallel, and multiple groups of
actions in sequence.

### Running syncs in sequence

```yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: argocd-sequence-example-
spec:
entrypoint: main
templates:
- name: main
plugin:
argocd:
actions:
- - app:
sync:
apps:
- name: guestbook-backend
- - app:
sync:
apps:
- name: guestbook-frontend
```

### Setting sync options

```yaml
Expand All @@ -108,14 +103,13 @@ spec:
- name: main
plugin:
argocd:
actions:
- - app:
sync:
apps:
- name: guestbook-backend
options:
- ServerSideApply=true
- Validate=true
app:
sync:
apps: |
- name: guestbook-backend
options: |
- ServerSideApply=true
- Validate=true
```

### Setting a timeout
Expand All @@ -133,12 +127,14 @@ spec:
- name: main
plugin:
argocd:
actions:
- - app:
sync:
apps:
- name: guestbook-backend
timeout: 30s
app:
sync:
apps: |
- name: guestbook-backend
options: |
- ServerSideApply=true
- Validate=true
timeout: 30s
```

### Specifying the Application's namespace
Expand All @@ -157,12 +153,11 @@ spec:
- name: main
plugin:
argocd:
actions:
- - app:
sync:
apps:
- name: guestbook-backend
namespace: my-apps-namespace
app:
sync:
apps: |
- name: guestbook-backend
namespace: my-apps-namespace
```

## Contributing
Expand Down
42 changes: 27 additions & 15 deletions examples/argocd-example-wf.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: argocd-example-
name: test
spec:
ttlStrategy:
secondsAfterCompletion: 300
Expand All @@ -11,19 +11,31 @@ spec:
entrypoint: main
templates:
- name: main
steps:
- - name: sync
template: sync
arguments:
parameters:
- name: apps
value: |
- name: guestbook
- - name: diff
template: diff
- name: sync
inputs:
parameters:
- name: apps
plugin:
argocd:
# `actions` is a nested list. Items of inner arrays run in parallel. Top-level arrays run in sequence.
# In this example, each inner array has only one item. The syncs run in sequence.
actions:
# TODO: support other action types, e.g. `cluster` and `repository`.
- - app:
diff:
app:
name: guestbook
revision: 382b85852fa33f13d4987424853c5206b9231ff0
# Uncomment this to test a failed sync.
# - - app:
# sync:
# apps:
# - name: not-real
app:
sync:
apps: "{{inputs.parameters.apps}}"
- name: diff
plugin:
argocd:
# TODO: support other action types, e.g. `cluster` and `repository`.
app:
diff:
app:
name: guestbook
revision: 382b85852fa33f13d4987424853c5206b9231ff0
138 changes: 39 additions & 99 deletions internal/argocd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
Expand All @@ -19,6 +18,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/plugins/executor"
"github.com/argoproj/gitops-engine/pkg/sync/hook"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -49,149 +49,89 @@ func (e *ApiExecutor) Execute(args executor.ExecuteTemplateArgs) executor.Execut
return errorResponse(err)
}

totalActionGroups := len(plugin.ArgoCD.Actions)
actionGroupCount := 0

var groupOutputs [][]any
for i, actionGroup := range plugin.ArgoCD.Actions {
outputs, err := e.runActionsParallel(actionGroup)
if err != nil {
return failedResponse(wfv1.Progress(fmt.Sprintf("%d/%d", actionGroupCount, totalActionGroups)), fmt.Errorf("action group %d of %d failed: %w", i+1, totalActionGroups, err))
}
groupOutputs = append(groupOutputs, outputs)
actionGroupCount += 1
}

outputsJson, err := json.Marshal(groupOutputs)
output, err := e.runAction(plugin.ArgoCD)
if err != nil {
err = fmt.Errorf("failed to marshal outputs to JSON: %w", err)
log.Println(err.Error())
return errorResponse(err)
return failedResponse(wfv1.Progress(fmt.Sprintf("0/1")), fmt.Errorf("action failed: %w", err))
}

outputsJsonAnyString := wfv1.AnyString(outputsJson)

return executor.ExecuteTemplateReply{
Node: &wfv1.NodeResult{
Phase: wfv1.NodeSucceeded,
Message: "Actions completed",
Progress: wfv1.Progress(fmt.Sprintf("%d/%d", actionGroupCount, totalActionGroups)),
Message: "Action completed",
Progress: "1/1",
Outputs: &wfv1.Outputs{
Parameters: []wfv1.Parameter{
{
Name: "outputs",
Value: &outputsJsonAnyString,
},
},
Result: pointer.String(output),
},
},
}
}

type actionResult struct {
index int
err error
output any
}

// runActionsParallel runs the given group of actions in parallel and returns aggregated errors, if any.
func (e *ApiExecutor) runActionsParallel(actionGroup []ActionSpec) ([]any, error) {
// runAction runs the given action and returns outputs or errors, if any.
func (e *ApiExecutor) runAction(action ActionSpec) (out string, err error) {
closer, appClient, err := e.apiClient.NewApplicationClient()
if err != nil {
return nil, fmt.Errorf("failed to initialize Application API client: %w", err)
return "", fmt.Errorf("failed to initialize Application API client: %w", err)
}
defer io.Close(closer)

closer, settingsClient, err := e.apiClient.NewSettingsClient()
if err != nil {
return nil, fmt.Errorf("failed to initialize Application API client: %w", err)
return "", fmt.Errorf("failed to initialize Application API client: %w", err)
}
defer io.Close(closer)

wg := sync.WaitGroup{}
actionResults := make(chan actionResult, len(actionGroup))
for i, action := range actionGroup {
i := i
action := action
if action.App == nil {
return nil, fmt.Errorf("action %d of %d is missing a valid action type (sync or diff)", i+1, len(actionGroup))
}
if action.App.Sync != nil && action.App.Diff != nil {
return nil, fmt.Errorf("action %d of %d has both multiple types of actions defined", i+1, len(actionGroup))
}
if action.App.Sync == nil && action.App.Diff == nil {
return nil, fmt.Errorf("action %d of %d has no action defined", i+1, len(actionGroup))
}
wg.Add(1)
go func(actionNum int) {
defer wg.Done()
if action.App.Sync != nil {
err := syncAppsParallel(*action.App.Sync, action.Timeout, appClient)
if err != nil {
actionResults <- actionResult{index: i, err: fmt.Errorf("parallel item %d of %d failed: failed to sync Application(s): %w", actionNum+1, len(actionGroup), err)}
} else {
actionResults <- actionResult{index: i, output: ""}
}
}
if action.App.Diff != nil {
diff, err := diffApp(*action.App.Diff, action.Timeout, appClient, settingsClient)
if err != nil {
actionResults <- actionResult{index: i, err: fmt.Errorf("parallel item %d of %d failed: failed to diff Application: %w", actionNum+1, len(actionGroup), err)}
} else {
actionResults <- actionResult{index: i, output: diff}
}
}
}(i)
if action.App == nil {
return "", errors.New("action is missing a valid action type (i.e. an 'app' block)")
}
go func() {
wg.Wait()
close(actionResults)
}()
var results []actionResult
for out := range actionResults {
results = append(results, out)
if action.App.Sync != nil && action.App.Diff != nil {
return "", errors.New("action has multiple types of action defined (both sync and diff)")
}
sort.Slice(results, func(i, j int) bool {
return results[i].index < results[j].index
})
hasError := false
var errorMessages []string
var outputs []any
for _, result := range results {
if result.err != nil {
hasError = true
errorMessages = append(errorMessages, result.err.Error())
outputs = append(outputs, nil)
} else {
errorMessages = append(errorMessages, "")
outputs = append(outputs, result.output)
if action.App.Sync == nil && action.App.Diff == nil {
return "", errors.New("app action has no action type specified (must be sync or diff)")
}

if action.App.Sync != nil {
err = syncAppsParallel(*action.App.Sync, action.Timeout, appClient)
if err != nil {
}
}
if hasError {
return nil, fmt.Errorf("one or more actions failed: %s", strings.Join(errorMessages, "; "))
if action.App.Diff != nil {
out, err = diffApp(*action.App.Diff, action.Timeout, appClient, settingsClient)
if err != nil {
}
}
return outputs, nil
return out, err
}

// syncAppsParallel loops over the apps in a SyncAction and syncs them in parallel. It waits for all responses and then
// aggregates any errors.
func syncAppsParallel(action SyncAction, timeout string, appClient application.ApplicationServiceClient) error {
var apps []App
err := yaml.Unmarshal([]byte(action.Apps), &apps)
if err != nil {
return fmt.Errorf("failed to unmarshal apps: %w", err)
}
var options []string
err = yaml.Unmarshal([]byte(action.Options), &options)
if err != nil {
return fmt.Errorf("failed to unmarshal options: %w", err)
}
ctx, cancel, err := durationStringToContext(timeout)
if err != nil {
return fmt.Errorf("failed get action context: %w", err)
}
defer cancel()
wg := sync.WaitGroup{}
errChan := make(chan error, len(action.Apps))
for _, app := range action.Apps {
for _, app := range apps {
app := app
wg.Add(1)
go func() {
defer wg.Done()
_, err := appClient.Sync(ctx, &application.ApplicationSyncRequest{
Name: pointer.String(app.Name),
AppNamespace: pointer.String(app.Namespace),
SyncOptions: &application.SyncOptions{Items: action.Options},
SyncOptions: &application.SyncOptions{Items: options},
})
if err != nil {
errChan <- fmt.Errorf("failed to sync app %q: %w", app.Name, err)
Expand Down Expand Up @@ -308,7 +248,7 @@ func diffApp(action DiffAction, timeout string, appClient application.Applicatio
target = item.target
}

diff, err = GetDiff(action.App.Name, live, target)
diff, err = GetDiff(live, target)
if err != nil {
return "", fmt.Errorf("failed to get diff: %w", err)
}
Expand Down
Loading

0 comments on commit 1feb2af

Please sign in to comment.