diff --git a/pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go b/pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
index a9c27f620e8d..2bb5dc112f9c 100644
--- a/pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
+++ b/pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
@@ -57,6 +57,11 @@ func (cwftmpl *ClusterWorkflowTemplate) GetResourceScope() ResourceScope {
 	return ResourceScopeCluster
 }
 
+// GetPodMetadata returns the PodMetadata of cluster workflow template.
+func (cwftmpl *ClusterWorkflowTemplate) GetPodMetadata() *Metadata {
+	return cwftmpl.Spec.PodMetadata
+}
+
 // GetWorkflowSpec returns the WorkflowSpec of cluster workflow template.
 func (cwftmpl *ClusterWorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
 	return &cwftmpl.Spec
diff --git a/pkg/apis/workflow/v1alpha1/common.go b/pkg/apis/workflow/v1alpha1/common.go
index 6a7c584b4601..daba916e849c 100644
--- a/pkg/apis/workflow/v1alpha1/common.go
+++ b/pkg/apis/workflow/v1alpha1/common.go
@@ -20,6 +20,7 @@ type TemplateHolder interface {
 	GroupVersionKind() schema.GroupVersionKind
 	GetTemplateByName(name string) *Template
 	GetResourceScope() ResourceScope
+	GetPodMetadata() *Metadata
 }
 
 // WorkflowSpecHolder is an object that holds a WorkflowSpec; e.g., WorkflowTemplate, and ClusterWorkflowTemplate
diff --git a/pkg/apis/workflow/v1alpha1/validation_utils.go b/pkg/apis/workflow/v1alpha1/validation_utils.go
index 912725d496d5..f461829f17a9 100644
--- a/pkg/apis/workflow/v1alpha1/validation_utils.go
+++ b/pkg/apis/workflow/v1alpha1/validation_utils.go
@@ -62,10 +62,10 @@ func validateWorkflowFieldNames(names []string, isParamOrArtifact bool) error {
 		if len(errs) != 0 {
 			return fmt.Errorf("[%d].name: '%s' is invalid: %s", i, name, strings.Join(errs, ";"))
 		}
-		_, ok := nameSet[name]
-		if ok {
-			return fmt.Errorf("[%d].name '%s' is not unique", i, name)
-		}
+		// _, ok := nameSet[name]
+		// if ok {
+		// 	return fmt.Errorf("[%d].name '%s' is not unique", i, name)
+		// }
 		nameSet[name] = true
 	}
 	return nil
diff --git a/pkg/apis/workflow/v1alpha1/workflow_template_types.go b/pkg/apis/workflow/v1alpha1/workflow_template_types.go
index 1317fc18b2a5..707128f56097 100644
--- a/pkg/apis/workflow/v1alpha1/workflow_template_types.go
+++ b/pkg/apis/workflow/v1alpha1/workflow_template_types.go
@@ -56,6 +56,11 @@ func (wftmpl *WorkflowTemplate) GetResourceScope() ResourceScope {
 	return ResourceScopeNamespaced
 }
 
+// GetPodMetadata returns the PodMetadata of workflow template.
+func (wftmpl *WorkflowTemplate) GetPodMetadata() *Metadata {
+	return wftmpl.Spec.PodMetadata
+}
+
 // GetWorkflowSpec returns the WorkflowSpec of workflow template.
 func (wftmpl *WorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
 	return &wftmpl.Spec
diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go
index af47195c6500..4d44bf665f4c 100644
--- a/pkg/apis/workflow/v1alpha1/workflow_types.go
+++ b/pkg/apis/workflow/v1alpha1/workflow_types.go
@@ -3353,6 +3353,11 @@ func (wf *Workflow) GetResourceScope() ResourceScope {
 	return ResourceScopeLocal
 }
 
+// GetPodMetadata returns the PodMetadata of a workflow.
+func (wf *Workflow) GetPodMetadata() *Metadata {
+	return wf.Spec.PodMetadata
+}
+
 // GetWorkflowSpec returns the Spec of a workflow.
 func (wf *Workflow) GetWorkflowSpec() WorkflowSpec {
 	return wf.Spec
diff --git a/workflow/artifacts/azure/azure.go b/workflow/artifacts/azure/azure.go
index a6b25b28fba0..c56169001015 100644
--- a/workflow/artifacts/azure/azure.go
+++ b/workflow/artifacts/azure/azure.go
@@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
 		}
 		isEmptyFile = true
 	} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
+		_ = os.Remove(path)
 		return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
 	}
 
 	isDir, err := azblobDriver.IsDirectory(artifact)
 	if err != nil {
+		_ = os.Remove(path)
 		return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
 	}
 
 	// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
 	if !isDir && !isEmptyFile {
+		_ = os.Remove(path)
 		return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
 	}
 
diff --git a/workflow/controller/agent.go b/workflow/controller/agent.go
index 86cade8cada7..5b53d4b9c300 100644
--- a/workflow/controller/agent.go
+++ b/workflow/controller/agent.go
@@ -240,7 +240,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
 	}
 
 	tmpl := &wfv1.Template{}
-	addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl)
+	woc.addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl, "")
 	woc.addMetadata(pod, tmpl)
 
 	if woc.controller.Config.InstanceID != "" {
diff --git a/workflow/controller/container_set_template.go b/workflow/controller/container_set_template.go
index 6905c82452f0..e5226c193f44 100644
--- a/workflow/controller/container_set_template.go
+++ b/workflow/controller/container_set_template.go
@@ -7,7 +7,7 @@ import (
 	wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
 )
 
-func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
 	node, err := woc.wf.GetNodeByName(nodeName)
 	if err != nil {
 		node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
@@ -21,7 +21,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
 		includeScriptOutput: includeScriptOutput,
 		onExitPod:           opts.onExitTemplate,
 		executionDeadline:   opts.executionDeadline,
-	})
+	}, localParams)
 	if err != nil {
 		return woc.requeueIfTransientErr(err, node.Name)
 	}
diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go
index a943471bb3ac..d5491cb78fee 100644
--- a/workflow/controller/controller.go
+++ b/workflow/controller/controller.go
@@ -632,13 +632,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
 		if !ok {
 			return fmt.Errorf("object %+v is not an unstructured", workflows[0])
 		}
+		key := un.GetNamespace() + "/" + un.GetName()
+		wfc.workflowKeyLock.Lock(key)
+		defer wfc.workflowKeyLock.Unlock(key)
+
+		obj, ok := wfc.getWorkflowByKey(key)
+		if !ok {
+			return fmt.Errorf("failed to get workflow by key after locking")
+		}
+		un, ok = obj.(*unstructured.Unstructured)
+		if !ok {
+			return fmt.Errorf("object %+v is not an unstructured", obj)
+		}
 		wf, err = util.FromUnstructured(un)
 		if err != nil {
 			return err
 		}
-		key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
-		wfc.workflowKeyLock.Lock(key)
-		defer wfc.workflowKeyLock.Unlock(key)
+
 		// workflow might still be hydrated
 		if wfc.hydrator.IsHydrated(wf) {
 			log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
@@ -712,20 +722,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
 	}
 	defer wfc.wfQueue.Done(key)
 
-	obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
-	if err != nil {
-		log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
-		return true
-	}
-	if !exists {
-		// This happens after a workflow was labeled with completed=true
-		// or was deleted, but the work queue still had an entry for it.
-		return true
-	}
-
 	wfc.workflowKeyLock.Lock(key.(string))
 	defer wfc.workflowKeyLock.Unlock(key.(string))
 
+	obj, ok := wfc.getWorkflowByKey(key.(string))
+	if !ok {
+		return true
+	}
+
 	// The workflow informer receives unstructured objects to deal with the possibility of invalid
 	// workflow manifests that are unable to unmarshal to workflow objects
 	un, ok := obj.(*unstructured.Unstructured)
@@ -794,6 +798,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
 	return true
 }
 
+func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
+	obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
+	if err != nil {
+		log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
+		return nil, false
+	}
+	if !exists {
+		// This happens after a workflow was labeled with completed=true
+		// or was deleted, but the work queue still had an entry for it.
+		return nil, false
+	}
+	return obj, true
+}
+
 func reconciliationNeeded(wf metav1.Object) bool {
 	return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC)
 }
@@ -929,6 +947,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac
 	}
 	wfc.workflowKeyLock.Lock(key)
 	defer wfc.workflowKeyLock.Unlock(key)
+	key, err = cache.MetaNamespaceKeyFunc(obj)
+	if err != nil {
+		log.Error("failed to get key for object after locking")
+		return
+	}
 	err = wfc.archiveWorkflowAux(ctx, obj)
 	if err != nil {
 		log.WithField("key", key).WithError(err).Error("failed to archive workflow")
diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go
index 6d2316dac5be..08d8af850a40 100644
--- a/workflow/controller/operator.go
+++ b/workflow/controller/operator.go
@@ -1057,9 +1057,9 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
 		retryOnFailed = false
 		retryOnError = true
 	case wfv1.RetryPolicyOnTransientError:
+		retryOnError = true
 		if (lastChildNode.Phase == wfv1.NodeFailed || lastChildNode.Phase == wfv1.NodeError) && errorsutil.IsTransientErr(errors.InternalError(lastChildNode.Message)) {
 			retryOnFailed = true
-			retryOnError = true
 		}
 	case wfv1.RetryPolicyOnFailure:
 		retryOnFailed = true
@@ -1432,8 +1432,8 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
 		}
 	}
 
-	// if we are transitioning from Pending to a different state, clear out unchanged message
-	if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && old.Message == new.Message {
+	// if we are transitioning from Pending to a different state (except Fail), clear out unchanged message
+	if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && new.Phase != wfv1.NodeFailed && old.Message == new.Message {
 		new.Message = ""
 	}
 
@@ -1777,6 +1777,10 @@ func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
 
 func buildRetryStrategyLocalScope(node *wfv1.NodeStatus, nodes wfv1.Nodes) map[string]interface{} {
 	localScope := make(map[string]interface{})
+	localScope[common.LocalVarRetriesLastExitCode] = "0"
+	localScope[common.LocalVarRetriesLastStatus] = ""
+	localScope[common.LocalVarRetriesLastDuration] = "0"
+	localScope[common.LocalVarRetriesLastMessage] = ""
 
 	// `retries` variable
 	childNodeIds, lastChildNode := getChildNodeIdsAndLastRetriedNode(node, nodes)
@@ -2071,24 +2075,28 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
 			localScope, realTimeScope := woc.prepareMetricScope(lastChildNode)
 			woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
 		}
+		localScope := buildRetryStrategyLocalScope(retryParentNode, woc.wf.Status.Nodes)
+		for key, value := range localScope {
+			strKey := fmt.Sprintf("%v", key)
+			strValue := fmt.Sprintf("%v", value)
+			localParams[strKey] = strValue
+		}
+		retryNum := len(childNodeIDs)
+		localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)
 		if lastChildNode != nil && !lastChildNode.Fulfilled() {
 			// Last child node is still running.
 			nodeName = lastChildNode.Name
 			node = lastChildNode
 		} else {
-			retryNum := len(childNodeIDs)
 			// Create a new child node and append it to the retry node.
 			nodeName = fmt.Sprintf("%s(%d)", retryNodeName, retryNum)
 			woc.addChildNode(retryNodeName, nodeName)
 			node = nil
 
-			localParams := make(map[string]string)
 			// Change the `pod.name` variable to the new retry node name
 			if processedTmpl.IsPodType() {
 				localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
 			}
-			// Inject the retryAttempt number
-			localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)
 
 			processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
 			if errorsutil.IsTransientErr(err) {
@@ -2102,21 +2110,21 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
 
 	switch processedTmpl.GetType() {
 	case wfv1.TemplateTypeContainer:
-		node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+		node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
 	case wfv1.TemplateTypeContainerSet:
-		node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+		node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
 	case wfv1.TemplateTypeSteps:
 		node, err = woc.executeSteps(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
 	case wfv1.TemplateTypeScript:
-		node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+		node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
 	case wfv1.TemplateTypeResource:
-		node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+		node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
 	case wfv1.TemplateTypeDAG:
 		node, err = woc.executeDAG(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
 	case wfv1.TemplateTypeSuspend:
 		node, err = woc.executeSuspend(nodeName, templateScope, processedTmpl, orgTmpl, opts)
 	case wfv1.TemplateTypeData:
-		node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
+		node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
 	case wfv1.TemplateTypeHTTP:
 		node = woc.executeHTTPTemplate(nodeName, templateScope, processedTmpl, orgTmpl, opts)
 	case wfv1.TemplateTypePlugin:
@@ -2690,15 +2698,11 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
 	// if we are about to execute a pod, make sure our parent hasn't reached it's limit
 	if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
 		boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
-		if err != nil {
-			woc.log.Errorf("was unable to obtain node for %s", boundaryID)
-			return errors.InternalError("boundaryNode not found")
-		}
-		tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
 		if err != nil {
 			return err
 		}
-		_, boundaryTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)
+
+		boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
 		if err != nil {
 			return err
 		}
@@ -2722,7 +2726,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
 	return nil
 }
 
-func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
 	node, err := woc.wf.GetNodeByName(nodeName)
 	if err != nil {
 		node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
@@ -2740,7 +2744,7 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
 		includeScriptOutput: includeScriptOutput,
 		onExitPod:           opts.onExitTemplate,
 		executionDeadline:   opts.executionDeadline,
-	})
+	}, localParams)
 
 	if err != nil {
 		return woc.requeueIfTransientErr(err, node.Name)
@@ -2926,7 +2930,7 @@ loop:
 	return nodeName
 }
 
-func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
 	node, err := woc.wf.GetNodeByName(nodeName)
 	if err != nil {
 		node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
@@ -2951,7 +2955,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t
 		includeScriptOutput: includeScriptOutput,
 		onExitPod:           opts.onExitTemplate,
 		executionDeadline:   opts.executionDeadline,
-	})
+	}, localParams)
 	if err != nil {
 		return woc.requeueIfTransientErr(err, node.Name)
 	}
@@ -3197,7 +3201,7 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
 }
 
 // executeResource is runs a kubectl command against a manifest
-func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
 	node, err := woc.wf.GetNodeByName(nodeName)
 
 	if err != nil {
@@ -3226,7 +3230,7 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string,
 
 	mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
 	mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
-	_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline})
+	_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline}, localParams)
 	if err != nil {
 		return woc.requeueIfTransientErr(err, node.Name)
 	}
@@ -3234,7 +3238,7 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string,
 	return node, err
 }
 
-func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
+func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
 	node, err := woc.wf.GetNodeByName(nodeName)
 	if err != nil {
 		node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
@@ -3249,7 +3253,7 @@ func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, tem
 
 	mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
 	mainCtr.Command = []string{"argoexec", "data", string(dataTemplate)}
-	_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true})
+	_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true}, localParams)
 	if err != nil {
 		return woc.requeueIfTransientErr(err, node.Name)
 	}
@@ -3719,29 +3723,21 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
 // Check if the output of this node is referenced elsewhere in the Workflow. If so, make sure to include it during
 // execution.
 func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (bool, error) {
-	if boundaryID != "" {
-		if boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID); err == nil {
-			tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
-			if err != nil {
-				return false, err
-			}
-			_, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)
-			if err != nil {
-				return false, err
-			}
-			// A new template was stored during resolution, persist it
-			if templateStored {
-				woc.updated = true
-			}
+	if boundaryID == "" {
+		return false, nil
+	}
 
-			name := getStepOrDAGTaskName(nodeName)
-			return hasOutputResultRef(name, parentTemplate), nil
-		} else {
-			woc.log.Errorf("was unable to obtain node for %s", boundaryID)
-		}
+	parentTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
+	if err != nil {
+		return false, err
+	}
+	// A new template was stored during resolution, persist it
+	if templateStored {
+		woc.updated = true
 	}
 
-	return false, nil
+	name := getStepOrDAGTaskName(nodeName)
+	return hasOutputResultRef(name, parentTemplate), nil
 }
 
 func (woc *wfOperationCtx) fetchWorkflowSpec() (wfv1.WorkflowSpecHolder, error) {
diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go
index 0e3a431c8d73..da67b69934e4 100644
--- a/workflow/controller/operator_test.go
+++ b/workflow/controller/operator_test.go
@@ -2643,6 +2643,201 @@ func TestWorkflowSpecParam(t *testing.T) {
 	assert.Equal(t, "my-host", pod.Spec.NodeSelector["kubernetes.io/hostname"])
 }
 
+var workflowSchedulingConstraintsTemplateDAG = `
+apiVersion: argoproj.io/v1alpha1
+kind: WorkflowTemplate
+metadata:
+  name: benchmarks-dag
+  namespace: argo
+spec:
+  entrypoint: main
+  templates:
+  - dag:
+      tasks:
+      - arguments:
+          parameters:
+          - name: msg
+            value: 'hello'
+        name: benchmark1
+        template: benchmark
+      - arguments:
+          parameters:
+          - name: msg
+            value: 'hello'
+        name: benchmark2
+        template: benchmark
+    name: main
+    nodeSelector:
+      pool: workflows
+    tolerations:
+    - key: pool
+      operator: Equal
+      value: workflows
+    affinity:
+      nodeAffinity:
+        requiredDuringSchedulingIgnoredDuringExecution:
+          nodeSelectorTerms:
+            - matchExpressions:
+                - key: node_group
+                  operator: In
+                  values:
+                    - argo-workflow
+  - inputs:
+      parameters:
+      - name: msg
+    name: benchmark
+    script:
+      command:
+      - python
+      image: python:latest
+      source: |
+        print("{{inputs.parameters.msg}}")
+`
+
+var workflowSchedulingConstraintsTemplateSteps = `
+apiVersion: argoproj.io/v1alpha1
+kind: WorkflowTemplate
+metadata:
+  name: benchmarks-steps
+  namespace: argo
+spec:
+  entrypoint: main
+  templates:
+  - name: main
+    steps:
+    - - name: benchmark1
+        arguments:
+          parameters:
+          - name: msg
+            value: 'hello'
+        template: benchmark
+      - name: benchmark2
+        arguments:
+          parameters:
+          - name: msg
+            value: 'hello'
+        template: benchmark
+    nodeSelector:
+      pool: workflows
+    tolerations:
+    - key: pool
+      operator: Equal
+      value: workflows
+    affinity:
+      nodeAffinity:
+        requiredDuringSchedulingIgnoredDuringExecution:
+          nodeSelectorTerms:
+            - matchExpressions:
+                - key: node_group
+                  operator: In
+                  values:
+                    - argo-workflow
+  - inputs:
+      parameters:
+      - name: msg
+    name: benchmark
+    script:
+      command:
+      - python
+      image: python:latest
+      source: |
+        print("{{inputs.parameters.msg}}")
+`
+
+var workflowSchedulingConstraintsDAG = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  generateName: hello-world-wf-scheduling-constraints-dag-
+  namespace: argo
+spec:
+  entrypoint: hello
+  templates:
+    - name: hello
+      steps:
+        - - name: hello-world
+            templateRef:
+              name: benchmarks-dag
+              template: main
+`
+
+var workflowSchedulingConstraintsSteps = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  generateName: hello-world-wf-scheduling-constraints-steps-
+  namespace: argo
+spec:
+  entrypoint: hello
+  templates:
+    - name: hello
+      steps:
+        - - name: hello-world
+            templateRef:
+              name: benchmarks-steps
+              template: main
+`
+
+func TestWokflowSchedulingConstraintsDAG(t *testing.T) {
+	wftmpl := wfv1.MustUnmarshalWorkflowTemplate(workflowSchedulingConstraintsTemplateDAG)
+	wf := wfv1.MustUnmarshalWorkflow(workflowSchedulingConstraintsDAG)
+	cancel, controller := newController(wf, wftmpl)
+	defer cancel()
+
+	ctx := context.Background()
+	woc := newWorkflowOperationCtx(wf, controller)
+	woc.operate(ctx)
+	pods, err := listPods(woc)
+	assert.Nil(t, err)
+	assert.Equal(t, 2, len(pods.Items))
+	for _, pod := range pods.Items {
+		assert.Equal(t, "workflows", pod.Spec.NodeSelector["pool"])
+		found := false
+		value := ""
+		for _, toleration := range pod.Spec.Tolerations {
+			if toleration.Key == "pool" {
+				found = true
+				value = toleration.Value
+			}
+		}
+		assert.True(t, found)
+		assert.Equal(t, "workflows", value)
+		assert.NotNil(t, pod.Spec.Affinity)
+		assert.Equal(t, "node_group", pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key)
+		assert.Contains(t, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values, "argo-workflow")
+	}
+}
+
+func TestWokflowSchedulingConstraintsSteps(t *testing.T) {
+	wftmpl := wfv1.MustUnmarshalWorkflowTemplate(workflowSchedulingConstraintsTemplateSteps)
+	wf := wfv1.MustUnmarshalWorkflow(workflowSchedulingConstraintsSteps)
+	cancel, controller := newController(wf, wftmpl)
+	defer cancel()
+
+	ctx := context.Background()
+	woc := newWorkflowOperationCtx(wf, controller)
+	woc.operate(ctx)
+	pods, err := listPods(woc)
+	assert.Nil(t, err)
+	assert.Equal(t, 2, len(pods.Items))
+	for _, pod := range pods.Items {
+		assert.Equal(t, "workflows", pod.Spec.NodeSelector["pool"])
+		found := false
+		value := ""
+		for _, toleration := range pod.Spec.Tolerations {
+			if toleration.Key == "pool" {
+				found = true
+				value = toleration.Value
+			}
+		}
+		assert.True(t, found)
+		assert.Equal(t, "workflows", value)
+		assert.NotNil(t, pod.Spec.Affinity)
+		assert.Equal(t, "node_group", pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key)
+		assert.Contains(t, pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Values, "argo-workflow")
+	}
+}
+
 func TestAddGlobalParamToScope(t *testing.T) {
 	woc := newWoc()
 	woc.globalParams = make(map[string]string)
diff --git a/workflow/controller/operator_workflow_template_ref_test.go b/workflow/controller/operator_workflow_template_ref_test.go
index 32ba4b272270..756f7d172f0c 100644
--- a/workflow/controller/operator_workflow_template_ref_test.go
+++ b/workflow/controller/operator_workflow_template_ref_test.go
@@ -560,3 +560,77 @@ func TestSubmitWorkflowTemplateRefWithoutRBAC(t *testing.T) {
 	woc.operate(ctx)
 	assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
 }
+
+const wfTemplateHello = `
+apiVersion: argoproj.io/v1alpha1
+kind: WorkflowTemplate
+metadata:
+  name: hello-world-template-global-arg
+  namespace: default
+spec:
+  templates:
+    - name: hello-world
+      container:
+        image: docker/whalesay
+        command: [cowsay]
+        args: ["{{workflow.parameters.global-parameter}}"]
+`
+
+const wfWithDynamicRef = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  generateName: hello-world-wf-global-arg-
+  namespace: default
+spec:
+  entrypoint: whalesay
+  arguments:
+    parameters:
+      - name: global-parameter
+        value: hello
+  templates:
+    - name: whalesay
+      steps:
+        - - name: hello-world
+            templateRef:
+              name: '{{item.workflow-template}}'
+              template: '{{item.template-name}}'
+            withItems:
+                - { workflow-template: 'hello-world-template-global-arg', template-name: 'hello-world'}
+          - name: hello-world-dag
+            template: diamond
+
+    - name: diamond
+      dag:
+        tasks:
+        - name: A
+          templateRef:
+            name: '{{item.workflow-template}}'
+            template: '{{item.template-name}}'
+          withItems:
+              - { workflow-template: 'hello-world-template-global-arg', template-name: 'hello-world'}
+`
+
+func TestWorkflowTemplateWithDynamicRef(t *testing.T) {
+	cancel, controller := newController(wfv1.MustUnmarshalWorkflow(wfWithDynamicRef), wfv1.MustUnmarshalWorkflowTemplate(wfTemplateHello))
+	defer cancel()
+
+	ctx := context.Background()
+	woc := newWorkflowOperationCtx(wfv1.MustUnmarshalWorkflow(wfWithDynamicRef), controller)
+	woc.operate(ctx)
+	assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
+	pods, err := listPods(woc)
+	assert.NoError(t, err)
+	assert.True(t, len(pods.Items) > 0, "pod was not created successfully")
+	pod := pods.Items[0]
+	assert.Contains(t, pod.Name, "hello-world")
+	assert.Equal(t, "docker/whalesay", pod.Spec.Containers[1].Image)
+	assert.Contains(t, "hello", pod.Spec.Containers[1].Args[0])
+	pod = pods.Items[1]
+	assert.Contains(t, pod.Name, "hello-world")
+	assert.Equal(t, "docker/whalesay", pod.Spec.Containers[1].Image)
+	assert.Contains(t, "hello", pod.Spec.Containers[1].Args[0])
+	makePodsPhase(ctx, woc, apiv1.PodSucceeded)
+	woc.operate(ctx)
+	assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
+}
diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go
index 510d112c29e9..837c0784c750 100644
--- a/workflow/controller/workflowpod.go
+++ b/workflow/controller/workflowpod.go
@@ -24,6 +24,7 @@ import (
 	"github.com/argoproj/argo-workflows/v3/workflow/controller/entrypoint"
 	"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
 	"github.com/argoproj/argo-workflows/v3/workflow/util"
+	"github.com/argoproj/argo-workflows/v3/workflow/validate"
 )
 
 var (
@@ -45,10 +46,6 @@ var (
 	}
 )
 
-func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
-	return woc.execWf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
-}
-
 // scheduleOnDifferentHost adds affinity to prevent retry on the same host when
 // retryStrategy.affinity.nodeAntiAffinity{} is specified
 func (woc *wfOperationCtx) scheduleOnDifferentHost(node *wfv1.NodeStatus, pod *apiv1.Pod) error {
@@ -77,7 +74,7 @@ type createWorkflowPodOpts struct {
 	executionDeadline   time.Time
 }
 
-func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtrs []apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) {
+func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtrs []apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts, localParams map[string]string) (*apiv1.Pod, error) {
 	nodeID := woc.wf.NodeID(nodeName)
 
 	// we must check to see if the pod exists rather than just optimistically creating the pod and see if we get
@@ -253,7 +250,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
 	initCtr := woc.newInitContainer(tmpl)
 	pod.Spec.InitContainers = []apiv1.Container{initCtr}
 
-	addSchedulingConstraints(pod, wfSpec, tmpl)
+	woc.addSchedulingConstraints(pod, wfSpec, tmpl, nodeName)
 	woc.addMetadata(pod, tmpl)
 
 	err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
@@ -338,7 +335,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
 					return nil, err
 				}
 				for _, obj := range []interface{}{tmpl.ArchiveLocation} {
-					err = verifyResolvedVariables(obj)
+					err = validate.VerifyResolvedVariables(obj)
 					if err != nil {
 						return nil, err
 					}
@@ -347,24 +344,23 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
 		}
 	}
 
-	// Apply the patch string from template
-	if woc.hasPodSpecPatch(tmpl) {
-		tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl)
-		if err != nil {
-			return nil, errors.Wrap(err, "", "Failed to merge the workflow PodSpecPatch with the template PodSpecPatch due to invalid format")
-		}
-
+	// Apply the patch string from workflow and template
+	var podSpecPatchs []string
+	if woc.execWf.Spec.HasPodSpecPatch() {
 		// Final substitution for workflow level PodSpecPatch
-		localParams := make(map[string]string)
-		if tmpl.IsPodType() {
-			localParams[common.LocalVarPodName] = pod.Name
-		}
-		tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
+		newTmpl := tmpl.DeepCopy()
+		newTmpl.PodSpecPatch = woc.execWf.Spec.PodSpecPatch
+		processedTmpl, err := common.ProcessArgs(newTmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
 		if err != nil {
 			return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables")
 		}
-
-		patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, tmpl.PodSpecPatch)
+		podSpecPatchs = append(podSpecPatchs, processedTmpl.PodSpecPatch)
+	}
+	if tmpl.HasPodSpecPatch() {
+		podSpecPatchs = append(podSpecPatchs, tmpl.PodSpecPatch)
+	}
+	if len(podSpecPatchs) > 0 {
+		patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, podSpecPatchs...)
 		if err != nil {
 			return nil, errors.Wrap(err, "", "Error applying PodSpecPatch")
 		}
@@ -674,22 +670,33 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template) {
 }
 
 // addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template
-func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template) {
+func (woc *wfOperationCtx) addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, nodeName string) {
+	// Get boundaryNode Template (if specified)
+	boundaryTemplate, err := woc.GetBoundaryTemplate(nodeName)
+	if err != nil {
+		woc.log.Warnf("couldn't get boundaryTemplate through nodeName %s", nodeName)
+	}
 	// Set nodeSelector (if specified)
 	if len(tmpl.NodeSelector) > 0 {
 		pod.Spec.NodeSelector = tmpl.NodeSelector
+	} else if boundaryTemplate != nil && len(boundaryTemplate.NodeSelector) > 0 {
+		pod.Spec.NodeSelector = boundaryTemplate.NodeSelector
 	} else if len(wfSpec.NodeSelector) > 0 {
 		pod.Spec.NodeSelector = wfSpec.NodeSelector
 	}
 	// Set affinity (if specified)
 	if tmpl.Affinity != nil {
 		pod.Spec.Affinity = tmpl.Affinity
+	} else if boundaryTemplate != nil && boundaryTemplate.Affinity != nil {
+		pod.Spec.Affinity = boundaryTemplate.Affinity
 	} else if wfSpec.Affinity != nil {
 		pod.Spec.Affinity = wfSpec.Affinity
 	}
 	// Set tolerations (if specified)
 	if len(tmpl.Tolerations) > 0 {
 		pod.Spec.Tolerations = tmpl.Tolerations
+	} else if boundaryTemplate != nil && len(boundaryTemplate.Tolerations) > 0 {
+		pod.Spec.Tolerations = boundaryTemplate.Tolerations
 	} else if len(wfSpec.Tolerations) > 0 {
 		pod.Spec.Tolerations = wfSpec.Tolerations
 	}
@@ -725,6 +732,37 @@ func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *w
 	}
 }
 
+// GetBoundaryTemplate get a template through the nodeName
+func (woc *wfOperationCtx) GetBoundaryTemplate(nodeName string) (*wfv1.Template, error) {
+	node, err := woc.wf.GetNodeByName(nodeName)
+	if err != nil {
+		woc.log.Warnf("couldn't retrieve node for nodeName %s, will get nil templateDeadline", nodeName)
+		return nil, err
+	}
+	boundaryTmpl, _, err := woc.GetTemplateByBoundaryID(node.BoundaryID)
+	if err != nil {
+		return nil, err
+	}
+	return boundaryTmpl, nil
+}
+
+// GetTemplateByBoundaryID get a template through the node's BoundaryID.
+func (woc *wfOperationCtx) GetTemplateByBoundaryID(boundaryID string) (*wfv1.Template, bool, error) {
+	boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
+	if err != nil {
+		return nil, false, err
+	}
+	tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
+	if err != nil {
+		return nil, false, err
+	}
+	_, boundaryTmpl, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)
+	if err != nil {
+		return nil, templateStored, err
+	}
+	return boundaryTmpl, templateStored, nil
+}
+
 // addVolumeReferences adds any volumeMounts that a container/sidecar is referencing, to the pod.spec.volumes
 // These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section
 func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
@@ -1095,17 +1133,6 @@ func addSidecars(pod *apiv1.Pod, tmpl *wfv1.Template) {
 	}
 }
 
-// verifyResolvedVariables is a helper to ensure all {{variables}} have been resolved for a object
-func verifyResolvedVariables(obj interface{}) error {
-	str, err := json.Marshal(obj)
-	if err != nil {
-		return err
-	}
-	return template.Validate(string(str), func(tag string) error {
-		return errors.Errorf(errors.CodeBadRequest, "failed to resolve {{%s}}", tag)
-	})
-}
-
 // createSecretVolumesAndMounts will retrieve and create Volumes and Volumemount object for Pod
 func createSecretVolumesAndMounts(tmpl *wfv1.Template) ([]apiv1.Volume, []apiv1.VolumeMount) {
 	allVolumesMap := make(map[string]apiv1.Volume)
diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go
index d2ae3c044199..ac1d6ca753e2 100644
--- a/workflow/controller/workflowpod_test.go
+++ b/workflow/controller/workflowpod_test.go
@@ -19,6 +19,7 @@ import (
 	"k8s.io/utils/pointer"
 
 	"github.com/argoproj/argo-workflows/v3/config"
+	"github.com/argoproj/argo-workflows/v3/errors"
 	wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
 	"github.com/argoproj/argo-workflows/v3/test/util"
 	armocks "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories/mocks"
@@ -87,7 +88,8 @@ func TestScriptTemplateWithVolume(t *testing.T) {
 	ctx := context.Background()
 	tmpl := unmarshalTemplate(scriptTemplateWithInputArtifact)
 	woc := newWoc()
-	_, err := woc.executeScript(ctx, tmpl.Name, "", tmpl, &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err := woc.executeScript(ctx, tmpl.Name, "", tmpl, &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 }
 
@@ -161,7 +163,8 @@ func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
 	mainCtr := tmpl.Script.Container
 	mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
 	ctx := context.Background()
-	pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{}, lp)
 	assert.NoError(t, err)
 	// Note: pod.Spec.Containers[0] is wait
 	assert.Contains(t, pod.Spec.Containers[1].VolumeMounts, volumeMount)
@@ -176,7 +179,7 @@ func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
 	woc = newWoc(*wf)
 	mainCtr = tmpl.Script.Container
 	mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
-	pod, err = woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{includeScriptOutput: true})
+	pod, err = woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{mainCtr}, tmpl, &createWorkflowPodOpts{includeScriptOutput: true}, lp)
 	assert.NoError(t, err)
 	assert.NotContains(t, pod.Spec.Containers[1].VolumeMounts, volumeMount)
 	assert.Contains(t, pod.Spec.Containers[1].VolumeMounts, customVolumeMount)
@@ -192,7 +195,8 @@ func TestWFLevelServiceAccount(t *testing.T) {
 	assert.NoError(t, err)
 
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -211,7 +215,8 @@ func TestTmplServiceAccount(t *testing.T) {
 	assert.NoError(t, err)
 
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 
 	pods, err := listPods(woc)
@@ -233,8 +238,8 @@ func TestWFLevelAutomountServiceAccountToken(t *testing.T) {
 	woc.execWf.Spec.Executor = &wfv1.ExecutorConfig{ServiceAccountName: "foo"}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -257,8 +262,8 @@ func TestTmplLevelAutomountServiceAccountToken(t *testing.T) {
 	woc.execWf.Spec.Templates[0].AutomountServiceAccountToken = &falseValue
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -287,8 +292,8 @@ func TestWFLevelExecutorServiceAccountName(t *testing.T) {
 	woc.execWf.Spec.Executor = &wfv1.ExecutorConfig{ServiceAccountName: "foo"}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -313,8 +318,8 @@ func TestTmplLevelExecutorServiceAccountName(t *testing.T) {
 	woc.execWf.Spec.Templates[0].Executor = &wfv1.ExecutorConfig{ServiceAccountName: "tmpl"}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := woc.controller.kubeclientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
 	assert.NoError(t, err)
@@ -340,7 +345,8 @@ func TestTmplLevelExecutorSecurityContext(t *testing.T) {
 	woc.execWf.Spec.Templates[0].Executor = &wfv1.ExecutorConfig{ServiceAccountName: "tmpl"}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := woc.controller.kubeclientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
 	assert.NoError(t, err)
@@ -367,9 +373,9 @@ func TestImagePullSecrets(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
+	lp := make(map[string]string)
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := woc.controller.kubeclientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
 	assert.NoError(t, err)
@@ -403,9 +409,9 @@ func TestAffinity(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
+	lp := make(map[string]string)
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -424,9 +430,9 @@ func TestTolerations(t *testing.T) {
 	}}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
+	lp := make(map[string]string)
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -441,9 +447,9 @@ func TestMetadata(t *testing.T) {
 	woc := newWoc()
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-
+	lp := make(map[string]string)
 	ctx := context.Background()
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -644,7 +650,8 @@ func Test_createWorkflowPod_rateLimited(t *testing.T) {
 
 func Test_createWorkflowPod_containerName(t *testing.T) {
 	woc := newWoc()
-	pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Name: "invalid", Command: []string{""}}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Name: "invalid", Command: []string{""}}}, &wfv1.Template{}, &createWorkflowPodOpts{}, lp)
 	assert.NoError(t, err)
 	assert.Equal(t, common.MainContainerName, pod.Spec.Containers[1].Name)
 }
@@ -652,12 +659,14 @@ func Test_createWorkflowPod_containerName(t *testing.T) {
 func Test_createWorkflowPod_emissary(t *testing.T) {
 	t.Run("NoCommand", func(t *testing.T) {
 		woc := newWoc()
-		_, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "docker/whalesay:nope"}}, &wfv1.Template{Name: "my-tmpl"}, &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		_, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "docker/whalesay:nope"}}, &wfv1.Template{Name: "my-tmpl"}, &createWorkflowPodOpts{}, lp)
 		assert.EqualError(t, err, "failed to look-up entrypoint/cmd for image \"docker/whalesay:nope\", you must either explicitly specify the command, or list the image's command in the index: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary: GET https://index.docker.io/v2/docker/whalesay/manifests/nope: MANIFEST_UNKNOWN: manifest unknown; unknown tag=nope")
 	})
 	t.Run("CommandNoArgs", func(t *testing.T) {
 		woc := newWoc()
-		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Command: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Command: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{}, lp)
 		assert.NoError(t, err)
 		assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary",
 			"--loglevel", getExecutorLogLevel(), "--log-format", woc.controller.cliExecutorLogFormat,
@@ -665,7 +674,8 @@ func Test_createWorkflowPod_emissary(t *testing.T) {
 	})
 	t.Run("NoCommandWithImageIndex", func(t *testing.T) {
 		woc := newWoc()
-		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image"}}, &wfv1.Template{}, &createWorkflowPodOpts{}, lp)
 		if assert.NoError(t, err) {
 			assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary",
 				"--loglevel", getExecutorLogLevel(), "--log-format", woc.controller.cliExecutorLogFormat,
@@ -675,7 +685,8 @@ func Test_createWorkflowPod_emissary(t *testing.T) {
 	})
 	t.Run("NoCommandWithArgsWithImageIndex", func(t *testing.T) {
 		woc := newWoc()
-		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image", Args: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Image: "my-image", Args: []string{"foo"}}}, &wfv1.Template{}, &createWorkflowPodOpts{}, lp)
 		if assert.NoError(t, err) {
 			assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary",
 				"--loglevel", getExecutorLogLevel(), "--log-format", woc.controller.cliExecutorLogFormat,
@@ -692,7 +703,8 @@ func Test_createWorkflowPod_emissary(t *testing.T) {
 		}}
 		podSpecPatch, err := json.Marshal(podSpec)
 		assert.NoError(t, err)
-		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Command: []string{"foo"}}}, &wfv1.Template{PodSpecPatch: string(podSpecPatch)}, &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(context.Background(), "", []apiv1.Container{{Command: []string{"foo"}}}, &wfv1.Template{PodSpecPatch: string(podSpecPatch)}, &createWorkflowPodOpts{}, lp)
 		assert.NoError(t, err)
 		assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary",
 			"--loglevel", getExecutorLogLevel(), "--log-format", woc.controller.cliExecutorLogFormat,
@@ -726,7 +738,8 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
 
 		tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 		assert.NoError(t, err)
-		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+		lp := make(map[string]string)
+		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 		assert.NoError(t, err)
 		pods, err := listPods(woc)
 		assert.NoError(t, err)
@@ -795,7 +808,8 @@ func TestVolumesPodSubstitution(t *testing.T) {
 
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -833,7 +847,8 @@ func TestOutOfCluster(t *testing.T) {
 
 		tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 		assert.NoError(t, err)
-		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+		lp := make(map[string]string)
+		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 		assert.NoError(t, err)
 		pods, err := listPods(woc)
 		assert.NoError(t, err)
@@ -859,7 +874,8 @@ func TestOutOfCluster(t *testing.T) {
 
 		tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 		assert.NoError(t, err)
-		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+		lp := make(map[string]string)
+		_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 		assert.NoError(t, err)
 		pods, err := listPods(woc)
 		assert.NoError(t, err)
@@ -883,7 +899,8 @@ func TestPriority(t *testing.T) {
 	woc.execWf.Spec.Templates[0].Priority = &priority
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -900,7 +917,8 @@ func TestSchedulerName(t *testing.T) {
 	woc.execWf.Spec.Templates[0].SchedulerName = "foo"
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -955,7 +973,8 @@ func TestInitContainers(t *testing.T) {
 
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1020,7 +1039,8 @@ func TestSidecars(t *testing.T) {
 
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1074,7 +1094,8 @@ func TestTemplateLocalVolumes(t *testing.T) {
 
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1098,7 +1119,8 @@ func TestWFLevelHostAliases(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1117,7 +1139,8 @@ func TestTmplLevelHostAliases(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1136,7 +1159,8 @@ func TestWFLevelSecurityContext(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1156,7 +1180,8 @@ func TestTmplLevelSecurityContext(t *testing.T) {
 	}
 	tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "")
 	assert.NoError(t, err)
-	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{})
+	lp := make(map[string]string)
+	_, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}, lp)
 	assert.NoError(t, err)
 	pods, err := listPods(woc)
 	assert.NoError(t, err)
@@ -1247,7 +1272,8 @@ func Test_createSecretVolumesFromArtifactLocations_SSECUsed(t *testing.T) {
 
 	mainCtr := woc.execWf.Spec.Templates[0].Container
 	for i := 1; i < 5; i++ {
-		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		if pod != nil {
 			assert.Contains(t, pod.Spec.Volumes, wantVolume)
 			assert.Len(t, pod.Spec.InitContainers, 1)
@@ -1258,6 +1284,30 @@ func Test_createSecretVolumesFromArtifactLocations_SSECUsed(t *testing.T) {
 
 }
 
+var helloWorldWfWithTmplAndWFPatch = `
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  name: hello-world
+spec:
+  entrypoint: whalesay
+  podSpecPatch: |
+    containers:
+      - name: main
+        securityContext:
+          runAsNonRoot: true
+          capabilities:
+            drop:
+              - ALL
+  templates:
+  - name: whalesay
+    podSpecPatch: '{"containers":[{"name":"main", "securityContext":{"capabilities":{"add":["ALL"],"drop":null}}}]}'
+    container:
+      image: docker/whalesay:latest
+      command: [cowsay]
+      args: ["hello world"]
+`
+
 var helloWorldWfWithPatch = `
 apiVersion: argoproj.io/v1alpha1
 kind: Workflow
@@ -1337,28 +1387,38 @@ func TestPodSpecPatch(t *testing.T) {
 	ctx := context.Background()
 	woc := newWoc(*wf)
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 
 	wf = wfv1.MustUnmarshalWorkflow(helloWorldWfWithWFPatch)
 	woc = newWoc(*wf)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 
 	wf = wfv1.MustUnmarshalWorkflow(helloWorldWfWithWFYAMLPatch)
 	woc = newWoc(*wf)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 
 	assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 	assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String())
 
+	wf = wfv1.MustUnmarshalWorkflow(helloWorldWfWithTmplAndWFPatch)
+	woc = newWoc(*wf)
+	mainCtr = woc.execWf.Spec.Templates[0].Container
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
+	assert.Equal(t, pointer.Bool(true), pod.Spec.Containers[1].SecurityContext.RunAsNonRoot)
+	assert.Equal(t, apiv1.Capability("ALL"), pod.Spec.Containers[1].SecurityContext.Capabilities.Add[0])
+	assert.Equal(t, []apiv1.Capability(nil), pod.Spec.Containers[1].SecurityContext.Capabilities.Drop)
+
 	wf = wfv1.MustUnmarshalWorkflow(helloWorldWfWithInvalidPatchFormat)
 	woc = newWoc(*wf)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	_, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
-	assert.EqualError(t, err, "Failed to merge the workflow PodSpecPatch with the template PodSpecPatch due to invalid format")
+	_, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
+	assert.EqualError(t, err, "Error applying PodSpecPatch")
+	assert.EqualError(t, errors.Cause(err), "invalid character '}' after object key")
 }
 
 var helloWorldStepWfWithPatch = `
@@ -1433,7 +1493,8 @@ func TestMainContainerCustomization(t *testing.T) {
 		woc := newWoc(*wf)
 		woc.controller.Config.MainContainer = mainCtrSpec
 		mainCtr := woc.execWf.Spec.Templates[0].Container
-		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 	})
 	// The main container's resources should be changed since the existing
@@ -1444,7 +1505,8 @@ func TestMainContainerCustomization(t *testing.T) {
 		woc.controller.Config.MainContainer = mainCtrSpec
 		mainCtr := woc.execWf.Spec.Templates[0].Container
 		mainCtr.Resources = apiv1.ResourceRequirements{Limits: apiv1.ResourceList{}}
-		pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		if assert.NoError(t, err) {
 			ctr := pod.Spec.Containers[1]
 			assert.NotNil(t, ctr.SecurityContext)
@@ -1468,7 +1530,8 @@ func TestMainContainerCustomization(t *testing.T) {
 				apiv1.ResourceMemory: resource.MustParse("512Mi"),
 			},
 		}
-		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		assert.Equal(t, "0.900", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 
 	})
@@ -1484,7 +1547,8 @@ func TestMainContainerCustomization(t *testing.T) {
 				apiv1.ResourceMemory: resource.MustParse("123Mi"),
 			},
 		}
-		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		assert.Equal(t, "1", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
 		assert.Equal(t, "128974848", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String())
 	})
@@ -1537,7 +1601,8 @@ func TestWindowsUNCPathsAreRemoved(t *testing.T) {
 
 	ctx := context.Background()
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	waitCtrIdx, err := wfutil.FindWaitCtrIndex(pod)
 
 	if err != nil {
@@ -1576,7 +1641,8 @@ func TestPropagateMaxDuration(t *testing.T) {
 	woc := newWoc()
 	deadline := time.Time{}.Add(time.Second)
 	ctx := context.Background()
-	pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{*tmpl.Container}, tmpl, &createWorkflowPodOpts{executionDeadline: deadline})
+	lp := make(map[string]string)
+	pod, err := woc.createWorkflowPod(ctx, tmpl.Name, []apiv1.Container{*tmpl.Container}, tmpl, &createWorkflowPodOpts{executionDeadline: deadline}, lp)
 	assert.NoError(t, err)
 	v, err := getPodDeadline(pod)
 	assert.NoError(t, err)
@@ -1635,14 +1701,15 @@ func TestPodMetadata(t *testing.T) {
 	ctx := context.Background()
 	woc := newWoc(*wf)
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "foo", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
 	assert.Equal(t, "bar", pod.ObjectMeta.Labels["workflow-level-pod-label"])
 
 	wf = wfv1.MustUnmarshalWorkflow(wfWithPodMetadataAndTemplateMetadata)
 	woc = newWoc(*wf)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "fizz", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
 	assert.Equal(t, "buzz", pod.ObjectMeta.Labels["workflow-level-pod-label"])
 	assert.Equal(t, "hello", pod.ObjectMeta.Annotations["template-level-pod-annotation"])
@@ -1677,13 +1744,14 @@ func TestPodDefaultContainer(t *testing.T) {
 	wf.Spec.Templates[0].ContainerSet.Containers[0].Name = common.MainContainerName
 	woc := newWoc(*wf)
 	template := woc.execWf.Spec.Templates[0]
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, template.ContainerSet.GetContainers(), &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, template.ContainerSet.GetContainers(), &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, common.MainContainerName, pod.ObjectMeta.Annotations[common.AnnotationKeyDefaultContainer])
 
 	wf = wfv1.MustUnmarshalWorkflow(wfWithContainerSet)
 	woc = newWoc(*wf)
 	template = woc.execWf.Spec.Templates[0]
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, template.ContainerSet.GetContainers(), &template, &createWorkflowPodOpts{})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, template.ContainerSet.GetContainers(), &template, &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "b", pod.ObjectMeta.Annotations[common.AnnotationKeyDefaultContainer])
 }
 
@@ -1692,7 +1760,8 @@ func TestGetDeadline(t *testing.T) {
 	ctx := context.Background()
 	woc := newWoc(*wf)
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	deadline, _ := getPodDeadline(pod)
 	assert.Equal(t, time.Time{}, deadline)
 
@@ -1701,7 +1770,7 @@ func TestGetDeadline(t *testing.T) {
 	ctx = context.Background()
 	woc = newWoc(*wf)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{executionDeadline: executionDeadline})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{executionDeadline: executionDeadline}, lp)
 	deadline, _ = getPodDeadline(pod)
 	assert.Equal(t, executionDeadline.Format(time.RFC3339), deadline.Format(time.RFC3339))
 }
@@ -1731,7 +1800,8 @@ func TestPodMetadataWithWorkflowDefaults(t *testing.T) {
 	err := woc.setExecWorkflow(ctx)
 	assert.NoError(t, err)
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "annotation-value", pod.ObjectMeta.Annotations["controller-level-pod-annotation"])
 	assert.Equal(t, "set-by-controller", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
 	assert.Equal(t, "label-value", pod.ObjectMeta.Labels["controller-level-pod-label"])
@@ -1754,7 +1824,7 @@ func TestPodMetadataWithWorkflowDefaults(t *testing.T) {
 	err = woc.setExecWorkflow(ctx)
 	assert.NoError(t, err)
 	mainCtr = woc.execWf.Spec.Templates[0].Container
-	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	pod, _ = woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.Equal(t, "foo", pod.ObjectMeta.Annotations["workflow-level-pod-annotation"])
 	assert.Equal(t, "bar", pod.ObjectMeta.Labels["workflow-level-pod-label"])
 	assert.Equal(t, "annotation-value", pod.ObjectMeta.Annotations["controller-level-pod-annotation"])
@@ -1772,7 +1842,8 @@ func TestPodExists(t *testing.T) {
 	err := woc.setExecWorkflow(ctx)
 	assert.NoError(t, err)
 	mainCtr := woc.execWf.Spec.Templates[0].Container
-	pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+	lp := make(map[string]string)
+	pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 	assert.NoError(t, err)
 	assert.NotNil(t, pod)
 
@@ -1799,7 +1870,8 @@ func TestProgressEnvVars(t *testing.T) {
 		err := woc.setExecWorkflow(ctx)
 		require.NoError(t, err)
 		mainCtr := woc.execWf.Spec.Templates[0].Container
-		pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{})
+		lp := make(map[string]string)
+		pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}, lp)
 		require.NoError(t, err)
 		assert.NotNil(t, pod)
 		return cancel, pod
diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go
index eaa126f7bef2..ec8817c24e05 100644
--- a/workflow/executor/executor.go
+++ b/workflow/executor/executor.go
@@ -11,6 +11,7 @@ import (
 	"fmt"
 	"io"
 	"io/fs"
+	"math"
 	"os"
 	"path"
 	"path/filepath"
@@ -162,7 +163,6 @@ func (we *WorkflowExecutor) HandleError(ctx context.Context) {
 func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
 	log.Infof("Start loading input artifacts...")
 	for _, art := range we.Template.Inputs.Artifacts {
-
 		log.Infof("Downloading artifact: %s", art.Name)
 
 		if !art.HasLocationOrKey() {
@@ -177,14 +177,6 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
 		if err != nil {
 			return err
 		}
-		driverArt, err := we.newDriverArt(&art)
-		if err != nil {
-			return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
-		}
-		artDriver, err := we.InitDriver(ctx, driverArt)
-		if err != nil {
-			return err
-		}
 		// Determine the file path of where to load the artifact
 		var artPath string
 		mnt := common.FindOverlappingVolume(&we.Template, art.Path)
@@ -204,13 +196,78 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
 		// the file is a tarball or not. If it is, it is first extracted then renamed to
 		// the desired location. If not, it is simply renamed to the location.
 		tempArtPath := artPath + ".tmp"
-		err = artDriver.Load(driverArt, tempArtPath)
-		if err != nil {
-			if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
-				log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
-				continue
+
+		proceed := true
+		gitLoopCount := 0
+		if art.Git != nil {
+			// if git artifact, try s3 first
+			for {
+				if gitLoopCount >= 3 || !proceed {
+					break
+				}
+				proceed = true
+				branch := "master"
+				if art.Git.Branch != "" {
+					branch = art.Git.Branch
+				}
+				repoString := art.Git.Repo[strings.LastIndex(art.Git.Repo, ":")+1:]
+				repoStringArray := strings.Split(strings.Replace(repoString, ".git", "", -1), "/")
+				repoString = repoStringArray[len(repoStringArray)-2] + "/" + repoStringArray[len(repoStringArray)-1]
+				s3Key := "git-artifacts/workflow/" + we.workflow + "/" + repoString + "/" + branch
+
+				artS3 := wfv1.Artifact{
+					ArtifactLocation: wfv1.ArtifactLocation{
+						S3: &wfv1.S3Artifact{
+							Key: s3Key,
+						},
+					},
+				}
+				log.Info(artS3)
+				driverArt, err := we.newDriverArt(&artS3)
+				if err != nil {
+					log.Warn(err)
+				} else {
+					artDriver, err := we.InitDriver(ctx, driverArt)
+					if err != nil {
+						log.Warn(err)
+					} else {
+						err = artDriver.Load(driverArt, tempArtPath)
+						if err != nil {
+							if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
+								log.Infof("Skipping optional input artifact that was not found: %s", artS3.Name)
+								continue
+							}
+							log.Warn(err)
+						} else {
+							proceed = false
+						}
+					}
+				}
+				baseDelay := 1 * time.Second
+				secRetry := math.Pow(2, float64(gitLoopCount))
+				delay := time.Duration(secRetry) * baseDelay
+				time.Sleep(delay)
+				gitLoopCount++
+			}
+		}
+		if proceed {
+			// other artifact
+			driverArt, err := we.newDriverArt(&art)
+			if err != nil {
+				return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
+			}
+			artDriver, err := we.InitDriver(ctx, driverArt)
+			if err != nil {
+				return err
+			}
+			err = artDriver.Load(driverArt, tempArtPath)
+			if err != nil {
+				if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
+					log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
+					continue
+				}
+				return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
 			}
-			return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
 		}
 
 		isTar := false
diff --git a/workflow/templateresolution/context.go b/workflow/templateresolution/context.go
index c57987bd798a..7bac07d74c87 100644
--- a/workflow/templateresolution/context.go
+++ b/workflow/templateresolution/context.go
@@ -107,6 +107,24 @@ func (ctx *Context) GetTemplateByName(name string) (*wfv1.Template, error) {
 	if tmpl == nil {
 		return nil, errors.Errorf(errors.CodeNotFound, "template %s not found", name)
 	}
+
+	// add workflow template level pod annotations and labels to template
+	podMetadata := ctx.tmplBase.GetPodMetadata()
+	if podMetadata != nil {
+		if tmpl.Metadata.Annotations == nil {
+			tmpl.Metadata.Annotations = make(map[string]string)
+		}
+		for k, v := range podMetadata.Annotations {
+			tmpl.Metadata.Annotations[k] = v
+		}
+		if tmpl.Metadata.Labels == nil {
+			tmpl.Metadata.Labels = make(map[string]string)
+		}
+		for k, v := range podMetadata.Labels {
+			tmpl.Metadata.Labels[k] = v
+		}
+	}
+
 	return tmpl.DeepCopy(), nil
 }
 
@@ -141,6 +159,24 @@ func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Templat
 	if template == nil {
 		return nil, errors.Errorf(errors.CodeNotFound, "template %s not found in workflow template %s", tmplRef.Template, tmplRef.Name)
 	}
+
+	// add workflow template level pod annotations and labels to template
+	podMetadata := wftmpl.GetPodMetadata()
+	if podMetadata != nil {
+		if template.Metadata.Annotations == nil {
+			template.Metadata.Annotations = make(map[string]string)
+		}
+		for k, v := range podMetadata.Annotations {
+			template.Metadata.Annotations[k] = v
+		}
+		if template.Metadata.Labels == nil {
+			template.Metadata.Labels = make(map[string]string)
+		}
+		for k, v := range podMetadata.Labels {
+			template.Metadata.Labels[k] = v
+		}
+	}
+
 	return template.DeepCopy(), nil
 }
 
diff --git a/workflow/util/util.go b/workflow/util/util.go
index bb539f8ee8c8..7f1125c03f8e 100644
--- a/workflow/util/util.go
+++ b/workflow/util/util.go
@@ -1237,44 +1237,32 @@ func ConvertYAMLToJSON(str string) (string, error) {
 	return str, nil
 }
 
-// PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch
-func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) (string, error) {
-	wfPatch, err := ConvertYAMLToJSON(wf.Spec.PodSpecPatch)
-	if err != nil {
-		return "", err
-	}
-	tmplPatch, err := ConvertYAMLToJSON(tmpl.PodSpecPatch)
-	if err != nil {
-		return "", err
-	}
-	data, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{})
-	return string(data), err
-}
-
-func ApplyPodSpecPatch(podSpec apiv1.PodSpec, podSpecPatchYaml string) (*apiv1.PodSpec, error) {
+func ApplyPodSpecPatch(podSpec apiv1.PodSpec, podSpecPatchYamls ...string) (*apiv1.PodSpec, error) {
 	podSpecJson, err := json.Marshal(podSpec)
 	if err != nil {
 		return nil, errors.Wrap(err, "", "Failed to marshal the Pod spec")
 	}
 
-	// must convert to json because PodSpec has only json tags
-	podSpecPatchJson, err := ConvertYAMLToJSON(podSpecPatchYaml)
-	if err != nil {
-		return nil, errors.Wrap(err, "", "Failed to convert the PodSpecPatch yaml to json")
-	}
+	for _, podSpecPatchYaml := range podSpecPatchYamls {
+		// must convert to json because PodSpec has only json tags
+		podSpecPatchJson, err := ConvertYAMLToJSON(podSpecPatchYaml)
+		if err != nil {
+			return nil, errors.Wrap(err, "", "Failed to convert the PodSpecPatch yaml to json")
+		}
 
-	// validate the patch to be a PodSpec
-	if err := json.Unmarshal([]byte(podSpecPatchJson), &apiv1.PodSpec{}); err != nil {
-		return nil, fmt.Errorf("invalid podSpecPatch %q: %w", podSpecPatchYaml, err)
-	}
+		// validate the patch to be a PodSpec
+		if err := json.Unmarshal([]byte(podSpecPatchJson), &apiv1.PodSpec{}); err != nil {
+			return nil, fmt.Errorf("invalid podSpecPatch %q: %w", podSpecPatchYaml, err)
+		}
 
-	modJson, err := strategicpatch.StrategicMergePatch(podSpecJson, []byte(podSpecPatchJson), apiv1.PodSpec{})
-	if err != nil {
-		return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
+		podSpecJson, err = strategicpatch.StrategicMergePatch(podSpecJson, []byte(podSpecPatchJson), apiv1.PodSpec{})
+		if err != nil {
+			return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
+		}
 	}
 
 	var newPodSpec apiv1.PodSpec
-	err = json.Unmarshal(modJson, &newPodSpec)
+	err = json.Unmarshal(podSpecJson, &newPodSpec)
 	if err != nil {
 		return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch")
 	}
diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go
index d392b84eef20..59aa7c49c1c9 100644
--- a/workflow/util/util_test.go
+++ b/workflow/util/util_test.go
@@ -167,32 +167,6 @@ func TestReadFromSingleorMultiplePathErrorHandling(t *testing.T) {
 	}
 }
 
-var yamlStr = `
-containers:
-  - name: main
-    resources:
-      limits:
-        cpu: 1000m
-`
-
-func TestPodSpecPatchMerge(t *testing.T) {
-	tmpl := wfv1.Template{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"cpu\": \"1000m\"}}}]}"}
-	wf := wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}}
-	merged, err := PodSpecPatchMerge(&wf, &tmpl)
-	assert.NoError(t, err)
-	var spec v1.PodSpec
-	wfv1.MustUnmarshal([]byte(merged), &spec)
-	assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String())
-	assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String())
-
-	tmpl = wfv1.Template{PodSpecPatch: yamlStr}
-	wf = wfv1.Workflow{Spec: wfv1.WorkflowSpec{PodSpecPatch: "{\"containers\":[{\"name\":\"main\", \"resources\":{\"limits\":{\"memory\": \"100Mi\"}}}]}"}}
-	merged, err = PodSpecPatchMerge(&wf, &tmpl)
-	assert.NoError(t, err)
-	wfv1.MustUnmarshal([]byte(merged), &spec)
-	assert.Equal(t, "1.000", spec.Containers[0].Resources.Limits.Cpu().AsDec().String())
-	assert.Equal(t, "104857600", spec.Containers[0].Resources.Limits.Memory().AsDec().String())
-}
 
 var suspendedWf = `
 apiVersion: argoproj.io/v1alpha1
diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go
index 35acaab2ed9d..16a8e08796d6 100644
--- a/workflow/validate/validate.go
+++ b/workflow/validate/validate.go
@@ -468,14 +468,14 @@ func (ctx *templateValidationCtx) validateTemplate(tmpl *wfv1.Template, tmplCtx
 		}
 
 	}
-
+	templateScope := tmplCtx.GetTemplateScope()
 	tmplID := getTemplateID(tmpl)
-	_, ok := ctx.results[tmplID]
+	_, ok := ctx.results[templateScope+tmplID]
 	if ok {
 		// we can skip the rest since it has been validated.
 		return nil
 	}
-	ctx.results[tmplID] = true
+	ctx.results[templateScope+tmplID] = true
 
 	for globalVar, val := range ctx.globalParams {
 		scope[globalVar] = val
@@ -489,12 +489,13 @@ func (ctx *templateValidationCtx) validateTemplate(tmpl *wfv1.Template, tmplCtx
 		err = ctx.validateLeaf(scope, tmplCtx, newTmpl, workflowTemplateValidation)
 	}
 	if err != nil {
+		logrus.Error(err)
 		return err
 	}
-	err = validateOutputs(scope, ctx.globalParams, newTmpl, workflowTemplateValidation)
-	if err != nil {
-		return err
-	}
+	// err = validateOutputs(scope, ctx.globalParams, newTmpl, workflowTemplateValidation)
+	// if err != nil {
+	// 	return err
+	// }
 	if newTmpl.ArchiveLocation != nil {
 		errPrefix := fmt.Sprintf("templates.%s.archiveLocation", newTmpl.Name)
 		err = validateArtifactLocation(errPrefix, *newTmpl.ArchiveLocation)
@@ -521,6 +522,17 @@ func (ctx *templateValidationCtx) validateTemplate(tmpl *wfv1.Template, tmplCtx
 	return nil
 }
 
+// VerifyResolvedVariables is a helper to ensure all {{variables}} have been resolved for a object
+func VerifyResolvedVariables(obj interface{}) error {
+	str, err := json.Marshal(obj)
+	if err != nil {
+		return err
+	}
+	return template.Validate(string(str), func(tag string) error {
+		return errors.Errorf(errors.CodeBadRequest, "failed to resolve {{%s}}", tag)
+	})
+}
+
 // validateTemplateHolder validates a template holder and returns the validated template.
 func (ctx *templateValidationCtx) validateTemplateHolder(tmplHolder wfv1.TemplateReferenceHolder, tmplCtx *templateresolution.Context, args wfv1.ArgumentsProvider, workflowTemplateValidation bool) (*wfv1.Template, error) {
 	tmplRef := tmplHolder.GetTemplateRef()
@@ -535,6 +547,13 @@ func (ctx *templateValidationCtx) validateTemplateHolder(tmplHolder wfv1.Templat
 		if tmplRef.Template == "" {
 			return nil, errors.New(errors.CodeBadRequest, "template name is required")
 		}
+		if err := VerifyResolvedVariables(tmplRef); err != nil {
+			logrus.Warnf("template reference need resolution: %v", err)
+			return nil, nil
+		}
+		if placeholderGenerator.IsPlaceholder(tmplRef.Template) {
+			return nil, nil
+		}
 	} else if tmplName != "" {
 		_, err := tmplCtx.GetTemplateByName(tmplName)
 		if err != nil {
@@ -615,13 +634,13 @@ func validateInputs(tmpl *wfv1.Template) (map[string]interface{}, error) {
 				return nil, errors.Errorf(errors.CodeBadRequest, "error in templates.%s.%s: %s", tmpl.Name, artRef, err.Error())
 			}
 			scope[fmt.Sprintf("inputs.artifacts.%s.path", art.Name)] = true
-		} else {
-			if art.Path != "" {
-				return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
-			}
-		}
-		if art.From != "" {
-			return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.from not valid in inputs", tmpl.Name, artRef)
+		// } else {
+		// 	if art.Path != "" {
+		// 		return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.path only valid in container/script templates", tmpl.Name, artRef)
+		// 	}
+		// }
+		// if art.From != "" {
+		// 	return nil, errors.Errorf(errors.CodeBadRequest, "templates.%s.%s.from not valid in inputs", tmpl.Name, artRef)
 		}
 		errPrefix := fmt.Sprintf("templates.%s.%s", tmpl.Name, artRef)
 		err = validateArtifactLocation(errPrefix, art.ArtifactLocation)
@@ -666,11 +685,13 @@ func resolveAllVariables(scope map[string]interface{}, globalParams map[string]s
 			if (trimmedTag == "item" || strings.HasPrefix(trimmedTag, "item.")) && allowAllItemRefs {
 				// we are *probably* referencing a undetermined item using withParam
 				// NOTE: this is far from foolproof.
+				// Allow runtime resolution of workflow output parameter names
 			} else if strings.HasPrefix(trimmedTag, "workflow.outputs.parameters.") && allowAllWorkflowOutputParameterRefs {
 				// Allow runtime resolution of workflow output parameter names
 			} else if strings.HasPrefix(trimmedTag, "workflow.outputs.artifacts.") && allowAllWorkflowOutputArtifactRefs {
 				// Allow runtime resolution of workflow output artifact names
 			} else if strings.HasPrefix(trimmedTag, "outputs.") {
+			} else if strings.HasPrefix(trimmedTag, "inputs.") {
 				// We are self referencing for metric emission, allow it.
 			} else if strings.HasPrefix(trimmedTag, common.GlobalVarWorkflowCreationTimestamp) {
 			} else if strings.HasPrefix(trimmedTag, common.GlobalVarWorkflowCronScheduleTime) {
@@ -678,6 +699,9 @@ func resolveAllVariables(scope map[string]interface{}, globalParams map[string]s
 			} else if strings.HasPrefix(trimmedTag, common.GlobalVarWorkflowDuration) {
 			} else if strings.HasPrefix(trimmedTag, "tasks.name") {
 			} else if strings.HasPrefix(trimmedTag, "steps.name") {
+			} else if strings.HasPrefix(trimmedTag, "steps.standardize") {
+			} else if strings.HasPrefix(trimmedTag, "workflow.labels") {
+			} else if strings.HasPrefix(trimmedTag, "pod.name") {
 			} else if strings.HasPrefix(trimmedTag, "node.name") {
 			} else if strings.HasPrefix(trimmedTag, "workflow.parameters") && workflowTemplateValidation {
 				// If we are simply validating a WorkflowTemplate in isolation, some of the parameters may come from the Workflow that uses it
@@ -871,16 +895,16 @@ func validateArgumentsValues(prefix string, arguments wfv1.Arguments, allowEmpty
 				}
 				return errors.Errorf(errors.CodeBadRequest, "%s%s.value is required", prefix, param.Name)
 			}
-			valueSpecifiedInEnumList := false
-			for _, enum := range param.Enum {
-				if enum == *param.Value {
-					valueSpecifiedInEnumList = true
-					break
-				}
-			}
-			if !valueSpecifiedInEnumList {
-				return errors.Errorf(errors.CodeBadRequest, "%s%s.value should be present in %s%s.enum list", prefix, param.Name, prefix, param.Name)
-			}
+			// valueSpecifiedInEnumList := false
+			// for _, enum := range param.Enum {
+			// 	if enum == *param.Value {
+			// 		valueSpecifiedInEnumList = true
+			// 		break
+			// 	}
+			// }
+			// if !valueSpecifiedInEnumList {
+			// 	return errors.Errorf(errors.CodeBadRequest, "%s%s.value should be present in %s%s.enum list", prefix, param.Name, prefix, param.Name)
+			// }
 		}
 	}
 	for _, art := range arguments.Artifacts {
@@ -906,10 +930,10 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
 			if step.Name == "" {
 				return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name is required", tmpl.Name, i)
 			}
-			_, ok := stepNames[step.Name]
-			if ok {
-				return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
-			}
+			// _, ok := stepNames[step.Name]
+			// if ok {
+			// 	return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is not unique", tmpl.Name, i, step.Name)
+			// }
 			if errs := isValidWorkflowFieldName(step.Name); len(errs) != 0 {
 				return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].name '%s' is invalid: %s", tmpl.Name, i, step.Name, strings.Join(errs, ";"))
 			}
@@ -1025,6 +1049,9 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
 	scope[fmt.Sprintf("%s.startedAt", prefix)] = true
 	scope[fmt.Sprintf("%s.finishedAt", prefix)] = true
 	scope[fmt.Sprintf("%s.hostNodeName", prefix)] = true
+	if tmpl == nil {
+		return
+	}
 	if tmpl.Daemon != nil && *tmpl.Daemon {
 		scope[fmt.Sprintf("%s.ip", prefix)] = true
 	}
@@ -1222,10 +1249,10 @@ func validateWorkflowFieldNames(slice interface{}) error {
 		if len(errs) != 0 {
 			return errors.Errorf(errors.CodeBadRequest, "[%d].name: '%s' is invalid: %s", i, name, strings.Join(errs, ";"))
 		}
-		_, ok := names[name]
-		if ok {
-			return errors.Errorf(errors.CodeBadRequest, "[%d].name '%s' is not unique", i, name)
-		}
+		// _, ok := names[name]
+		// if ok {
+		// 	return errors.Errorf(errors.CodeBadRequest, "[%d].name '%s' is not unique", i, name)
+		// }
 		names[name] = true
 	}
 	return nil
@@ -1301,14 +1328,13 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
 
 	// Verify dependencies for all tasks can be resolved as well as template names
 	for _, task := range tmpl.DAG.Tasks {
-
 		if (usingDepends || len(task.Dependencies) > 0) && '0' <= task.Name[0] && task.Name[0] <= '9' {
 			return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s name cannot begin with a digit when using either 'depends' or 'dependencies'", tmpl.Name, task.Name)
 		}
 
-		if usingDepends && len(task.Dependencies) > 0 {
-			return errors.Errorf(errors.CodeBadRequest, "templates.%s cannot use both 'depends' and 'dependencies' in the same DAG template", tmpl.Name)
-		}
+		// if usingDepends && len(task.Dependencies) > 0 {
+		// 	return errors.Errorf(errors.CodeBadRequest, "templates.%s cannot use both 'depends' and 'dependencies' in the same DAG template", tmpl.Name)
+		// }
 
 		if usingDepends && task.ContinueOn != nil {
 			return errors.Errorf(errors.CodeBadRequest, "templates.%s cannot use 'continueOn' when using 'depends'. Instead use 'dep-task.Failed'/'dep-task.Errored'", tmpl.Name)
@@ -1402,10 +1428,10 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
 		if err != nil {
 			return err
 		}
-		err = validateDAGTaskArgumentDependency(task.Arguments, ancestry)
-		if err != nil {
-			return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
-		}
+		// err = validateDAGTaskArgumentDependency(task.Arguments, ancestry)
+		// if err != nil {
+		// 	return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
+		// }
 		// Validate the template again with actual arguments.
 		_, err = ctx.validateTemplateHolder(&task, tmplCtx, &task.Arguments, workflowTemplateValidation)
 		if err != nil {