Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-1.1.1 #13

Open
wants to merge 21 commits into
base: release-3.5.2
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
745982e
fix: RetryPolicyOnTransientError also retries on error
tczhao Jun 3, 2024
dc7d497
fix: Apply podSpecPatch in woc.execWf.Spec and template to pod sequen…
tczhao Jun 3, 2024
c15d740
feat: enable template params in wf podspecpatch
tczhao Jun 3, 2024
8dfd517
fix: test
tczhao Jun 4, 2024
214b32e
fix: prevent update race in workflow cache (Fixes #9574) (#12233)
drawlerr Jan 16, 2024
ab2ac37
fix: skip reset message when transition from pending to fail
tczhao Jun 12, 2024
eba7bab
feat: enable various lastRetry parameters in podspecpatch
tczhao Jun 18, 2024
e2e3db6
fix: retry parameter issue in evicted pending node
tczhao Jun 19, 2024
cfe014e
fix: set template metadata from workflow template PodMetadata. Fixes:…
tczhao Jun 27, 2024
c81edc2
refactor: invert conditionals for less nesting in `includeScriptOutpu…
agilgur5 Dec 6, 2023
cfa71ce
feat: support dag and steps level scheduling constraints. Fixes: #125…
shuangkun Mar 15, 2024
8c55a90
feat: load git from s3 first
tczhao Oct 11, 2024
f57f919
feat: support dynamic templateref naming. Fixes: #10542 (#12842)
shuangkun Apr 12, 2024
e235f13
fix: skip problematic validation
tczhao Oct 30, 2024
dbd9b71
fix: make etcd errors transient (#12567)
Joibel Jan 24, 2024
8c1cee2
fix: retry large archived wf. Fixes #12740 (#12741)
heidongxianhua Apr 28, 2024
e138ec7
feat: add WORKFLOW_VALIDATION_PATTERN
tczhao Nov 24, 2024
43ccc1f
fix: pause before leading
tczhao Nov 26, 2024
b966acc
feat: soft affinity for retry
tczhao Nov 28, 2024
c683289
fix: find correct retry node when using `templateRef`. Fixes: #12633 …
shuangkun Feb 22, 2024
65f4049
fix: nodeAntiAffinity is not working as expected when boundaryID is e…
shuangkun May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
@@ -147,6 +147,7 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
time.Sleep(time.Second)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
},
2 changes: 1 addition & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}}
}

5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
Original file line number Diff line number Diff line change
@@ -57,6 +57,11 @@ func (cwftmpl *ClusterWorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeCluster
}

// GetPodMetadata returns the PodMetadata of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetPodMetadata() *Metadata {
return cwftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &cwftmpl.Spec
1 change: 1 addition & 0 deletions pkg/apis/workflow/v1alpha1/common.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ type TemplateHolder interface {
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
GetResourceScope() ResourceScope
GetPodMetadata() *Metadata
}

// WorkflowSpecHolder is an object that holds a WorkflowSpec; e.g., WorkflowTemplate, and ClusterWorkflowTemplate
8 changes: 4 additions & 4 deletions pkg/apis/workflow/v1alpha1/validation_utils.go
Original file line number Diff line number Diff line change
@@ -62,10 +62,10 @@ func validateWorkflowFieldNames(names []string, isParamOrArtifact bool) error {
if len(errs) != 0 {
return fmt.Errorf("[%d].name: '%s' is invalid: %s", i, name, strings.Join(errs, ";"))
}
_, ok := nameSet[name]
if ok {
return fmt.Errorf("[%d].name '%s' is not unique", i, name)
}
// _, ok := nameSet[name]
// if ok {
// return fmt.Errorf("[%d].name '%s' is not unique", i, name)
// }
nameSet[name] = true
}
return nil
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_template_types.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,11 @@ func (wftmpl *WorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeNamespaced
}

// GetPodMetadata returns the PodMetadata of workflow template.
func (wftmpl *WorkflowTemplate) GetPodMetadata() *Metadata {
return wftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of workflow template.
func (wftmpl *WorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &wftmpl.Spec
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
@@ -3353,6 +3353,11 @@ func (wf *Workflow) GetResourceScope() ResourceScope {
return ResourceScopeLocal
}

// GetPodMetadata returns the PodMetadata of a workflow.
func (wf *Workflow) GetPodMetadata() *Metadata {
return wf.Spec.PodMetadata
}

// GetWorkflowSpec returns the Spec of a workflow.
func (wf *Workflow) GetWorkflowSpec() WorkflowSpec {
return wf.Spec
2 changes: 1 addition & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
@@ -307,7 +307,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa
}

grpcServer := grpc.NewServer(sOpts...)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo)
infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor))
eventpkg.RegisterEventServiceServer(grpcServer, eventServer)
eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer())
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
@@ -590,7 +590,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {

archivedRepo := &mocks.WorkflowArchive{}

wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo)
wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo)
archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"},
Spec: v1alpha1.WorkflowSpec{
28 changes: 25 additions & 3 deletions server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
"github.com/argoproj/argo-workflows/v3/workflow/util"

sutils "github.com/argoproj/argo-workflows/v3/server/utils"
@@ -31,12 +32,14 @@ import (
const disableValueListRetrievalKeyPattern = "DISABLE_VALUE_LIST_RETRIEVAL_KEY_PATTERN"

type archivedWorkflowServer struct {
wfArchive sqldb.WorkflowArchive
wfArchive sqldb.WorkflowArchive
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
}

// NewWorkflowArchiveServer returns a new archivedWorkflowServer
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
@@ -282,6 +285,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
oriUid := wf.UID

_, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{})
if apierr.IsNotFound(err) {
@@ -299,12 +303,30 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
}
}

log.WithFields(log.Fields{"Dehydrate workflow uid=": wf.UID}).Info("RetryArchivedWorkflow")
// If the Workflow needs to be dehydrated in order to capture and retain all of the previous state for the subsequent workflow, then do so
err = w.hydrator.Dehydrate(wf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
// if the Workflow was dehydrated before, we need to capture and maintain its previous state for the new Workflow
if !w.hydrator.IsHydrated(wf) {
offloadedNodes, err := w.offloadNodeStatusRepo.Get(string(oriUid), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
_, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, offloadedNodes)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
}

return result, nil
}
8 changes: 7 additions & 1 deletion server/workflowarchive/archived_workflow_server_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
authorizationv1 "k8s.io/api/authorization/v1"
@@ -17,8 +18,10 @@ import (
kubefake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/server/auth"
@@ -29,7 +32,10 @@ func Test_archivedWorkflowServer(t *testing.T) {
repo := &mocks.WorkflowArchive{}
kubeClient := &kubefake.Clientset{}
wfClient := &argofake.Clientset{}
w := NewWorkflowArchiveServer(repo)
offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true)
offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil)
w := NewWorkflowArchiveServer(repo, offloadNodeStatusRepo)
allowed := true
kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &authorizationv1.SelfSubjectAccessReview{
5 changes: 5 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
@@ -209,6 +209,11 @@ var (
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning
}), "to have running pod"
}
ToHaveFailedPod Condition = func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed
}), "to have failed pod"
}
)

// `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving.
113 changes: 113 additions & 0 deletions test/e2e/retry_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,9 @@
package e2e

import (
"context"
"io"
"strings"
"testing"
"time"

@@ -120,6 +123,116 @@ spec:
})
}

func (s *RetryTestSuite) TestWorkflowTemplateWithRetryStrategyInContainerSet() {
var name string
var ns string
s.Given().
WorkflowTemplate("@testdata/workflow-template-with-containerset.yaml").
Workflow(`
metadata:
name: workflow-template-containerset
spec:
workflowTemplateRef:
name: containerset-with-retrystrategy
`).
When().
CreateWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, status.Phase, wfv1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.Name == "workflow-template-containerset"
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
name = pod.GetName()
ns = pod.GetNamespace()
})
// Success, no need retry
s.Run("ContainerLogs", func() {
ctx := context.Background()
podLogOptions := &apiv1.PodLogOptions{Container: "c1"}
stream, err := s.KubeClient.CoreV1().Pods(ns).GetLogs(name, podLogOptions).Stream(ctx)
assert.Nil(s.T(), err)
defer stream.Close()
logBytes, err := io.ReadAll(stream)
assert.Nil(s.T(), err)
output := string(logBytes)
count := strings.Count(output, "capturing logs")
assert.Equal(s.T(), 1, count)
assert.Contains(s.T(), output, "hi")
})
// Command err. No retry logic is entered.
s.Run("ContainerLogs", func() {
ctx := context.Background()
podLogOptions := &apiv1.PodLogOptions{Container: "c2"}
stream, err := s.KubeClient.CoreV1().Pods(ns).GetLogs(name, podLogOptions).Stream(ctx)
assert.Nil(s.T(), err)
defer stream.Close()
logBytes, err := io.ReadAll(stream)
assert.Nil(s.T(), err)
output := string(logBytes)
count := strings.Count(output, "capturing logs")
assert.Equal(s.T(), 0, count)
assert.Contains(s.T(), output, "executable file not found in $PATH")
})
// Retry when err.
s.Run("ContainerLogs", func() {
ctx := context.Background()
podLogOptions := &apiv1.PodLogOptions{Container: "c3"}
stream, err := s.KubeClient.CoreV1().Pods(ns).GetLogs(name, podLogOptions).Stream(ctx)
assert.Nil(s.T(), err)
defer stream.Close()
logBytes, err := io.ReadAll(stream)
assert.Nil(s.T(), err)
output := string(logBytes)
count := strings.Count(output, "capturing logs")
assert.Equal(s.T(), 2, count)
countFailureInfo := strings.Count(output, "intentional failure")
assert.Equal(s.T(), 2, countFailureInfo)
})
}

func (s *RetryTestSuite) TestRetryNodeAntiAffinity() {
s.Given().
Workflow(`
metadata:
name: test-nodeantiaffinity-strategy
spec:
entrypoint: main
templates:
- name: main
retryStrategy:
limit: '1'
retryPolicy: "Always"
affinity:
nodeAntiAffinity: {}
container:
name: main
image: 'argoproj/argosay:v2'
args: [ exit, "1" ]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToHaveFailedPod).
Wait(5 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
if status.Phase == wfv1.WorkflowFailed {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
}
if status.Phase == wfv1.WorkflowRunning {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.Contains(t, nodeStatusRetry.Message, "1 node(s) didn't match Pod's node affinity/selector")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
}
})
}

func TestRetrySuite(t *testing.T) {
suite.Run(t, new(RetryTestSuite))
}
31 changes: 29 additions & 2 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
@@ -28,8 +28,17 @@ func IsTransientErr(err error) bool {
return false
}
err = argoerrs.Cause(err)
isTransient := isExceededQuotaErr(err) || apierr.IsTooManyRequests(err) || isResourceQuotaConflictErr(err) || isTransientNetworkErr(err) || apierr.IsServerTimeout(err) || apierr.IsServiceUnavailable(err) || matchTransientErrPattern(err) ||
errors.Is(err, NewErrTransient(""))
isTransient := isExceededQuotaErr(err) ||
apierr.IsTooManyRequests(err) ||
isResourceQuotaConflictErr(err) ||
isResourceQuotaTimeoutErr(err) ||
isTransientNetworkErr(err) ||
apierr.IsServerTimeout(err) ||
apierr.IsServiceUnavailable(err) ||
isTransientEtcdErr(err) ||
matchTransientErrPattern(err) ||
errors.Is(err, NewErrTransient("")) ||
isTransientSqbErr(err)
if isTransient {
log.Infof("Transient error: %v", err)
} else {
@@ -57,6 +66,20 @@ func isResourceQuotaConflictErr(err error) bool {
return apierr.IsConflict(err) && strings.Contains(err.Error(), "Operation cannot be fulfilled on resourcequota")
}

func isResourceQuotaTimeoutErr(err error) bool {
return apierr.IsInternalError(err) && strings.Contains(err.Error(), "resource quota evaluation timed out")
}

func isTransientEtcdErr(err error) bool {
// Some clusters expose these (transient) etcd errors to the caller
if strings.Contains(err.Error(), "etcdserver: leader changed") {
return true
} else if strings.Contains(err.Error(), "etcdserver: request timed out") {
return true
}
return false
}

func isTransientNetworkErr(err error) bool {
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
@@ -101,3 +124,7 @@ func generateErrorString(err error) string {
}
return errorString
}

func isTransientSqbErr(err error) bool {
return strings.Contains(err.Error(), "upper: no more rows in")
}
3 changes: 3 additions & 0 deletions workflow/artifacts/azure/azure.go
Original file line number Diff line number Diff line change
@@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
}
isEmptyFile = true
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
_ = os.Remove(path)
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
}

isDir, err := azblobDriver.IsDirectory(artifact)
if err != nil {
_ = os.Remove(path)
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
}

// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
if !isDir && !isEmptyFile {
_ = os.Remove(path)
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
}

Loading