Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-1.1.2 #14

Open
wants to merge 20 commits into
base: release-3.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
745982e
fix: RetryPolicyOnTransientError also retries on error
tczhao Jun 3, 2024
dc7d497
fix: Apply podSpecPatch in woc.execWf.Spec and template to pod sequen…
tczhao Jun 3, 2024
c15d740
feat: enable template params in wf podspecpatch
tczhao Jun 3, 2024
8dfd517
fix: test
tczhao Jun 4, 2024
214b32e
fix: prevent update race in workflow cache (Fixes #9574) (#12233)
drawlerr Jan 16, 2024
ab2ac37
fix: skip reset message when transition from pending to fail
tczhao Jun 12, 2024
eba7bab
feat: enable various lastRetry parameters in podspecpatch
tczhao Jun 18, 2024
e2e3db6
fix: retry parameter issue in evicted pending node
tczhao Jun 19, 2024
cfe014e
fix: set template metadata from workflow template PodMetadata. Fixes:…
tczhao Jun 27, 2024
c81edc2
refactor: invert conditionals for less nesting in `includeScriptOutpu…
agilgur5 Dec 6, 2023
cfa71ce
feat: support dag and steps level scheduling constraints. Fixes: #125…
shuangkun Mar 15, 2024
8c55a90
feat: load git from s3 first
tczhao Oct 11, 2024
f57f919
feat: support dynamic templateref naming. Fixes: #10542 (#12842)
shuangkun Apr 12, 2024
e235f13
fix: skip problematic validation
tczhao Oct 30, 2024
dbd9b71
fix: make etcd errors transient (#12567)
Joibel Jan 24, 2024
8c1cee2
fix: retry large archived wf. Fixes #12740 (#12741)
heidongxianhua Apr 28, 2024
e138ec7
feat: add WORKFLOW_VALIDATION_PATTERN
tczhao Nov 24, 2024
43ccc1f
fix: pause before leading
tczhao Nov 26, 2024
b966acc
feat: soft affinity for retry
tczhao Nov 28, 2024
bf81a05
fix: cronOperator/serverResubmitWf retry create workflow on transient…
tczhao Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
time.Sleep(time.Second)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (cwftmpl *ClusterWorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeCluster
}

// GetPodMetadata returns the PodMetadata of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetPodMetadata() *Metadata {
return cwftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &cwftmpl.Spec
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/workflow/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TemplateHolder interface {
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
GetResourceScope() ResourceScope
GetPodMetadata() *Metadata
}

// WorkflowSpecHolder is an object that holds a WorkflowSpec; e.g., WorkflowTemplate, and ClusterWorkflowTemplate
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/workflow/v1alpha1/validation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func validateWorkflowFieldNames(names []string, isParamOrArtifact bool) error {
if len(errs) != 0 {
return fmt.Errorf("[%d].name: '%s' is invalid: %s", i, name, strings.Join(errs, ";"))
}
_, ok := nameSet[name]
if ok {
return fmt.Errorf("[%d].name '%s' is not unique", i, name)
}
// _, ok := nameSet[name]
// if ok {
// return fmt.Errorf("[%d].name '%s' is not unique", i, name)
// }
nameSet[name] = true
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (wftmpl *WorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeNamespaced
}

// GetPodMetadata returns the PodMetadata of workflow template.
func (wftmpl *WorkflowTemplate) GetPodMetadata() *Metadata {
return wftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of workflow template.
func (wftmpl *WorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &wftmpl.Spec
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3353,6 +3353,11 @@ func (wf *Workflow) GetResourceScope() ResourceScope {
return ResourceScopeLocal
}

// GetPodMetadata returns the PodMetadata of a workflow.
func (wf *Workflow) GetPodMetadata() *Metadata {
return wf.Spec.PodMetadata
}

// GetWorkflowSpec returns the Spec of a workflow.
func (wf *Workflow) GetWorkflowSpec() WorkflowSpec {
return wf.Spec
Expand Down
2 changes: 1 addition & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa
}

grpcServer := grpc.NewServer(sOpts...)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive)
wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo)
infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor))
eventpkg.RegisterEventServiceServer(grpcServer, eventServer)
eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer())
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {

archivedRepo := &mocks.WorkflowArchive{}

wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo)
wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo)
archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"},
Spec: v1alpha1.WorkflowSpec{
Expand Down
28 changes: 25 additions & 3 deletions server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
"github.com/argoproj/argo-workflows/v3/workflow/util"

sutils "github.com/argoproj/argo-workflows/v3/server/utils"
Expand All @@ -31,12 +32,14 @@ import (
const disableValueListRetrievalKeyPattern = "DISABLE_VALUE_LIST_RETRIEVAL_KEY_PATTERN"

type archivedWorkflowServer struct {
wfArchive sqldb.WorkflowArchive
wfArchive sqldb.WorkflowArchive
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
}

// NewWorkflowArchiveServer returns a new archivedWorkflowServer
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowarchivepkg.ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
Expand Down Expand Up @@ -282,6 +285,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
oriUid := wf.UID

_, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{})
if apierr.IsNotFound(err) {
Expand All @@ -299,12 +303,30 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
}
}

log.WithFields(log.Fields{"Dehydrate workflow uid=": wf.UID}).Info("RetryArchivedWorkflow")
// If the Workflow needs to be dehydrated in order to capture and retain all of the previous state for the subsequent workflow, then do so
err = w.hydrator.Dehydrate(wf)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

wf.ObjectMeta.ResourceVersion = ""
wf.ObjectMeta.UID = ""
result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{})
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
// if the Workflow was dehydrated before, we need to capture and maintain its previous state for the new Workflow
if !w.hydrator.IsHydrated(wf) {
offloadedNodes, err := w.offloadNodeStatusRepo.Get(string(oriUid), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
_, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, offloadedNodes)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
}

return result, nil
}
Expand Down
8 changes: 7 additions & 1 deletion server/workflowarchive/archived_workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
authorizationv1 "k8s.io/api/authorization/v1"
Expand All @@ -17,8 +18,10 @@ import (
kubefake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/server/auth"
Expand All @@ -29,7 +32,10 @@ func Test_archivedWorkflowServer(t *testing.T) {
repo := &mocks.WorkflowArchive{}
kubeClient := &kubefake.Clientset{}
wfClient := &argofake.Clientset{}
w := NewWorkflowArchiveServer(repo)
offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true)
offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil)
w := NewWorkflowArchiveServer(repo, offloadNodeStatusRepo)
allowed := true
kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &authorizationv1.SelfSubjectAccessReview{
Expand Down
31 changes: 29 additions & 2 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ func IsTransientErr(err error) bool {
return false
}
err = argoerrs.Cause(err)
isTransient := isExceededQuotaErr(err) || apierr.IsTooManyRequests(err) || isResourceQuotaConflictErr(err) || isTransientNetworkErr(err) || apierr.IsServerTimeout(err) || apierr.IsServiceUnavailable(err) || matchTransientErrPattern(err) ||
errors.Is(err, NewErrTransient(""))
isTransient := isExceededQuotaErr(err) ||
apierr.IsTooManyRequests(err) ||
isResourceQuotaConflictErr(err) ||
isResourceQuotaTimeoutErr(err) ||
isTransientNetworkErr(err) ||
apierr.IsServerTimeout(err) ||
apierr.IsServiceUnavailable(err) ||
isTransientEtcdErr(err) ||
matchTransientErrPattern(err) ||
errors.Is(err, NewErrTransient("")) ||
isTransientSqbErr(err)
if isTransient {
log.Infof("Transient error: %v", err)
} else {
Expand Down Expand Up @@ -57,6 +66,20 @@ func isResourceQuotaConflictErr(err error) bool {
return apierr.IsConflict(err) && strings.Contains(err.Error(), "Operation cannot be fulfilled on resourcequota")
}

func isResourceQuotaTimeoutErr(err error) bool {
return apierr.IsInternalError(err) && strings.Contains(err.Error(), "resource quota evaluation timed out")
}

func isTransientEtcdErr(err error) bool {
// Some clusters expose these (transient) etcd errors to the caller
if strings.Contains(err.Error(), "etcdserver: leader changed") {
return true
} else if strings.Contains(err.Error(), "etcdserver: request timed out") {
return true
}
return false
}

func isTransientNetworkErr(err error) bool {
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
Expand Down Expand Up @@ -101,3 +124,7 @@ func generateErrorString(err error) string {
}
return errorString
}

func isTransientSqbErr(err error) bool {
return strings.Contains(err.Error(), "upper: no more rows in")
}
3 changes: 3 additions & 0 deletions workflow/artifacts/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
}
isEmptyFile = true
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
_ = os.Remove(path)
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
}

isDir, err := azblobDriver.IsDirectory(artifact)
if err != nil {
_ = os.Remove(path)
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
}

// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
if !isDir && !isEmptyFile {
_ = os.Remove(path)
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
}

tmpl := &wfv1.Template{}
addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl)
woc.addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl, "")
woc.addMetadata(pod, tmpl)

if woc.controller.Config.InstanceID != "" {
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -21,7 +21,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down
51 changes: 37 additions & 14 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
if !ok {
return fmt.Errorf("object %+v is not an unstructured", workflows[0])
}
key := un.GetNamespace() + "/" + un.GetName()
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

obj, ok := wfc.getWorkflowByKey(key)
if !ok {
return fmt.Errorf("failed to get workflow by key after locking")
}
un, ok = obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("object %+v is not an unstructured", obj)
}
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
Expand Down Expand Up @@ -712,20 +722,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
}
defer wfc.wfQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return true
}

wfc.workflowKeyLock.Lock(key.(string))
defer wfc.workflowKeyLock.Unlock(key.(string))

obj, ok := wfc.getWorkflowByKey(key.(string))
if !ok {
return true
}

// The workflow informer receives unstructured objects to deal with the possibility of invalid
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
Expand Down Expand Up @@ -794,6 +798,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return nil, false
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return nil, false
}
return obj, true
}

func reconciliationNeeded(wf metav1.Object) bool {
return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC)
}
Expand Down Expand Up @@ -929,6 +947,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac
}
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Error("failed to get key for object after locking")
return
}
err = wfc.archiveWorkflowAux(ctx, obj)
if err != nil {
log.WithField("key", key).WithError(err).Error("failed to archive workflow")
Expand Down
Loading
Loading