diff --git a/api/deployment/v1/message.pb.go b/api/deployment/v1/message.pb.go index a34c50f417..49a9ca24b7 100644 --- a/api/deployment/v1/message.pb.go +++ b/api/deployment/v1/message.pb.go @@ -305,8 +305,13 @@ type VersionLocalState struct { // executions and remains "drained". DrainageInfo *v11.VersionDrainageInfo `protobuf:"bytes,7,opt,name=drainage_info,json=drainageInfo,proto3" json:"drainage_info,omitempty"` // Arbitrary user-provided metadata attached to this version. - Metadata *v11.VersionMetadata `protobuf:"bytes,8,opt,name=metadata,proto3" json:"metadata,omitempty"` - StartedDeploymentWorkflow bool `protobuf:"varint,9,opt,name=started_deployment_workflow,json=startedDeploymentWorkflow,proto3" json:"started_deployment_workflow,omitempty"` + Metadata *v11.VersionMetadata `protobuf:"bytes,8,opt,name=metadata,proto3" json:"metadata,omitempty"` + // Deployment workflow should always be running before starting the version workflow. + // We should not start the deployment workflow. If we cannot find the deployment workflow when signaling, it means a bug and we should fix it. + // Deprecated. + // + // Deprecated: Marked as deprecated in temporal/server/api/deployment/v1/message.proto. + StartedDeploymentWorkflow bool `protobuf:"varint,9,opt,name=started_deployment_workflow,json=startedDeploymentWorkflow,proto3" json:"started_deployment_workflow,omitempty"` // Key: Task Queue Name TaskQueueFamilies map[string]*VersionLocalState_TaskQueueFamilyData `protobuf:"bytes,10,rep,name=task_queue_families,json=taskQueueFamilies,proto3" json:"task_queue_families,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // Number of task queues which will be synced in a single batch. @@ -420,6 +425,7 @@ func (x *VersionLocalState) GetMetadata() *v11.VersionMetadata { return nil } +// Deprecated: Marked as deprecated in temporal/server/api/deployment/v1/message.proto. func (x *VersionLocalState) GetStartedDeploymentWorkflow() bool { if x != nil { return x.StartedDeploymentWorkflow @@ -3312,7 +3318,7 @@ const file_temporal_server_api_deployment_v1_message_proto_rawDesc = "" + "\vupdate_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\n" + "updateTime\x12\x18\n" + "\adeleted\x18\x03 \x01(\bR\adeleted\x12L\n" + - "\x06status\x18\x06 \x01(\x0e24.temporal.api.enums.v1.WorkerDeploymentVersionStatusR\x06status\"\xe8\v\n" + + "\x06status\x18\x06 \x01(\x0e24.temporal.api.enums.v1.WorkerDeploymentVersionStatusR\x06status\"\xec\v\n" + "\x11VersionLocalState\x12T\n" + "\aversion\x18\x01 \x01(\v2:.temporal.server.api.deployment.v1.WorkerDeploymentVersionR\aversion\x12;\n" + "\vcreate_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\n" + @@ -3324,8 +3330,8 @@ const file_temporal_server_api_deployment_v1_message_proto_rawDesc = "" + "\x15first_activation_time\x18\f \x01(\v2\x1a.google.protobuf.TimestampR\x13firstActivationTime\x12P\n" + "\x16last_deactivation_time\x18\r \x01(\v2\x1a.google.protobuf.TimestampR\x14lastDeactivationTime\x12T\n" + "\rdrainage_info\x18\a \x01(\v2/.temporal.api.deployment.v1.VersionDrainageInfoR\fdrainageInfo\x12G\n" + - "\bmetadata\x18\b \x01(\v2+.temporal.api.deployment.v1.VersionMetadataR\bmetadata\x12>\n" + - "\x1bstarted_deployment_workflow\x18\t \x01(\bR\x19startedDeploymentWorkflow\x12{\n" + + "\bmetadata\x18\b \x01(\v2+.temporal.api.deployment.v1.VersionMetadataR\bmetadata\x12B\n" + + "\x1bstarted_deployment_workflow\x18\t \x01(\bB\x02\x18\x01R\x19startedDeploymentWorkflow\x12{\n" + "\x13task_queue_families\x18\n" + " \x03(\v2K.temporal.server.api.deployment.v1.VersionLocalState.TaskQueueFamiliesEntryR\x11taskQueueFamilies\x12&\n" + "\x0fsync_batch_size\x18\v \x01(\x05R\rsyncBatchSize\x12L\n" + diff --git a/config/dynamicconfig/development-sql.yaml b/config/dynamicconfig/development-sql.yaml index ea01750910..7861e96772 100644 --- a/config/dynamicconfig/development-sql.yaml +++ b/config/dynamicconfig/development-sql.yaml @@ -31,14 +31,14 @@ ### Worker Versioning Replay Test configs ### uncomment and set the right deploymentWorkflowVersion # -#matching.deploymentWorkflowVersion: -# - value: 2 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion) -#matching.PollerHistoryTTL: -# - value: 1s -#matching.wv.VersionDrainageStatusVisibilityGracePeriod: -# - value: 5s -#matching.wv.VersionDrainageStatusRefreshInterval: -# - value: 5s +matching.deploymentWorkflowVersion: + - value: 2 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion) +matching.PollerHistoryTTL: + - value: 1s +matching.wv.VersionDrainageStatusVisibilityGracePeriod: + - value: 5s +matching.wv.VersionDrainageStatusRefreshInterval: + - value: 5s # ### END of Worker Versioning Replay Test configs diff --git a/proto/internal/temporal/server/api/deployment/v1/message.proto b/proto/internal/temporal/server/api/deployment/v1/message.proto index d60111ba52..876db42434 100644 --- a/proto/internal/temporal/server/api/deployment/v1/message.proto +++ b/proto/internal/temporal/server/api/deployment/v1/message.proto @@ -112,7 +112,10 @@ message VersionLocalState { // Arbitrary user-provided metadata attached to this version. temporal.api.deployment.v1.VersionMetadata metadata = 8; - bool started_deployment_workflow = 9; + // Deployment workflow should always be running before starting the version workflow. + // We should not start the deployment workflow. If we cannot find the deployment workflow when signaling, it means a bug and we should fix it. + // Deprecated. + bool started_deployment_workflow = 9 [deprecated = true]; // Key: Task Queue Name map task_queue_families = 10; diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 4c9b0b5ad4..3b43f17a70 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -98,12 +98,6 @@ var ( ) const ( - errTooManySetCurrentVersionRequests = "Too many SetWorkerDeploymentCurrentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits." - errTooManySetRampingVersionRequests = "Too many SetWorkerDeploymentRampingVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits." - errTooManyDeleteDeploymentRequests = "Too many DeleteWorkerDeployment requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits." - errTooManyDeleteVersionRequests = "Too many DeleteWorkerDeploymentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits." - errTooManyVersionMetadataRequests = "Too many UpdateWorkerDeploymentVersionMetadata requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits." - maxReasonLength = 1000 // Maximum length for the reason field in RateLimitUpdate configurations. defaultUserTerminateReason = "terminated by user via frontend" defaultUserTerminateIdentity = "frontend-service" @@ -3451,9 +3445,6 @@ func (wh *WorkflowHandler) SetWorkerDeploymentCurrentVersion(ctx context.Context resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.Identity, request.IgnoreMissingTaskQueues, request.GetConflictToken(), request.GetAllowNoPollers()) if err != nil { - if common.IsResourceExhausted(err) { - return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetCurrentVersionRequests) - } return nil, err } @@ -3515,9 +3506,6 @@ func (wh *WorkflowHandler) SetWorkerDeploymentRampingVersion(ctx context.Context resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.GetPercentage(), request.GetIdentity(), request.IgnoreMissingTaskQueues, request.GetConflictToken(), request.GetAllowNoPollers()) if err != nil { - if common.IsResourceExhausted(err) { - return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetRampingVersionRequests) - } return nil, err } @@ -3641,9 +3629,6 @@ func (wh *WorkflowHandler) DeleteWorkerDeployment(ctx context.Context, request * err = wh.workerDeploymentClient.DeleteWorkerDeployment(ctx, namespaceEntry, request.DeploymentName, request.Identity) if err != nil { - if common.IsResourceExhausted(err) { - return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteDeploymentRequests) - } return nil, err } @@ -3670,9 +3655,6 @@ func (wh *WorkflowHandler) DeleteWorkerDeploymentVersion(ctx context.Context, re err = wh.workerDeploymentClient.DeleteWorkerDeploymentVersion(ctx, namespaceEntry, versionStr, request.SkipDrainage, request.Identity) if err != nil { - if common.IsResourceExhausted(err) { - return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteVersionRequests) - } return nil, err } @@ -3704,9 +3686,6 @@ func (wh *WorkflowHandler) UpdateWorkerDeploymentVersionMetadata(ctx context.Con identity := uuid.NewString() updatedMetadata, err := wh.workerDeploymentClient.UpdateVersionMetadata(ctx, namespaceEntry, versionStr, request.UpsertEntries, request.RemoveEntries, identity) if err != nil { - if common.IsResourceExhausted(err) { - return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyVersionMetadataRequests) - } return nil, err } diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index 6511e52759..44001d2207 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -738,8 +738,9 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion( return err } var errResourceExhausted *serviceerror.ResourceExhausted - if !errors.As(err, &errResourceExhausted) { + if !errors.As(err, &errResourceExhausted) || errResourceExhausted.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_WORKER_DEPLOYMENT_LIMITS { // Do not surface low level error to user + // Also, we don't surface resource exhausted errors that are not about deployment limits as they are caused by our workflow-based implementation. c.logger.Error("error while registering version", tag.Error(err)) err = errDeploymentVersionNotReady } diff --git a/service/worker/workerdeployment/activities.go b/service/worker/workerdeployment/activities.go index 19b4cfff9d..0b494349c9 100644 --- a/service/worker/workerdeployment/activities.go +++ b/service/worker/workerdeployment/activities.go @@ -3,14 +3,19 @@ package workerdeployment import ( "cmp" "context" + "fmt" "sync" enumspb "go.temporal.io/api/enums/v1" + updatepb "go.temporal.io/api/update/v1" "go.temporal.io/sdk/activity" deploymentspb "go.temporal.io/server/api/deployment/v1" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/sdk" + "go.temporal.io/server/common/worker_versioning" ) type ( @@ -18,6 +23,7 @@ type ( namespace *namespace.Namespace deploymentClient Client matchingClient resource.MatchingClient + historyClient historyservice.HistoryServiceClient } ) @@ -141,20 +147,42 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo } func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error { + fmt.Printf("act Deleting version %s from deployment %s\n", args.Version, args.DeploymentName) identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - err := a.deploymentClient.DeleteVersionFromWorkerDeployment( + versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(args.Version) + if err != nil { + return err + } + + workflowID := GenerateVersionWorkflowID(args.DeploymentName, versionObj.GetBuildId()) + updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.DeleteVersionArgs{ + Identity: identity, + Version: args.Version, + SkipDrainage: args.SkipDrainage, + AsyncPropagation: args.AsyncPropagation, + }) + if err != nil { + return err + } + + outcome, err := updateWorkflow( ctx, + a.historyClient, a.namespace, - args.DeploymentName, - args.Version, - identity, - args.RequestId, - args.SkipDrainage, - args.AsyncPropagation, + workflowID, + &updatepb.Request{ + Input: &updatepb.Input{Name: DeleteVersion, Args: updatePayload}, + Meta: &updatepb.Meta{UpdateId: args.RequestId, Identity: identity}, + }, ) if err != nil { return err } + + err = extractApplicationErrorOrInternal(outcome.GetFailure()) + if err != nil { + return err + } return nil } diff --git a/service/worker/workerdeployment/client.go b/service/worker/workerdeployment/client.go index fc7129ea7d..9084398a18 100644 --- a/service/worker/workerdeployment/client.go +++ b/service/worker/workerdeployment/client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "strings" "time" "github.com/dgryski/go-farm" @@ -22,21 +21,19 @@ import ( deploymentspb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence/visibility/manager" - "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" - "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/testhooks" "go.temporal.io/server/common/worker_versioning" - "go.temporal.io/server/service/history/consts" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -124,6 +121,7 @@ type Client interface { ) (*workflowservice.SetWorkerDeploymentManagerResponse, error) // Used internally by the Worker Deployment Version workflow in its StartWorkerDeployment Activity + // Deprecated. StartWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, @@ -151,17 +149,6 @@ type Client interface { requestID string, ) (*deploymentspb.SyncVersionStateResponse, error) - // Used internally by the Worker Deployment workflow in its DeleteVersion Activity - DeleteVersionFromWorkerDeployment( - ctx context.Context, - namespaceEntry *namespace.Namespace, - deploymentName, version string, - identity string, - requestID string, - skipDrainage bool, - asyncPropagation bool, - ) error - // Used internally by the Drainage workflow (child of Worker Deployment Version workflow) // in its GetVersionDrainageStatus Activity GetVersionDrainageStatus( @@ -189,6 +176,8 @@ type Client interface { type ErrRegister struct{ error } +var retryPolicy = backoff.NewExponentialRetryPolicy(100 * time.Millisecond).WithExpirationInterval(1 * time.Minute) + // ClientImpl implements Client type ClientImpl struct { logger log.Logger @@ -200,6 +189,7 @@ type ClientImpl struct { maxTaskQueuesInDeploymentVersion dynamicconfig.IntPropertyFnWithNamespaceFilter maxDeployments dynamicconfig.IntPropertyFnWithNamespaceFilter testHooks testhooks.TestHooks + metricsHandler metrics.Handler } func (d *ClientImpl) SetManager( @@ -214,7 +204,7 @@ func (d *ClientImpl) SetManager( newManagerID = request.GetManagerIdentity() } //revive:disable-next-line:defer - defer d.record("SetManager", &retErr, newManagerID, request.GetIdentity())() + defer d.convertAndRecordError("SetManager", &retErr, newManagerID, request.GetIdentity())() // validating params err := validateVersionWfParams(worker_versioning.WorkerDeploymentNameFieldName, request.GetDeploymentName(), d.maxIDLengthLimit()) @@ -232,8 +222,9 @@ func (d *ClientImpl) SetManager( return nil, err } - outcome, err := d.update( + outcome, err := updateWorkflow( ctx, + d.historyClient, namespaceEntry, GenerateDeploymentWorkflowID(request.GetDeploymentName()), &updatepb.Request{ @@ -264,10 +255,6 @@ func (d *ClientImpl) SetManager( } success := outcome.GetSuccess() - if success == nil { - return nil, serviceerror.NewInternal("outcome missing success and failure") - } - if err := sdk.PreferProtoDataConverter.FromPayloads(success, &res); err != nil { return nil, err } @@ -279,7 +266,8 @@ func (d *ClientImpl) SetManager( var _ Client = (*ClientImpl)(nil) -var errRetry = errors.New("retry update") +var errUpdateInProgress = errors.New("update in progress") +var errWorkflowHistoryTooLong = errors.New("workflow history too long") func (d *ClientImpl) RegisterTaskQueueWorker( ctx context.Context, @@ -290,7 +278,7 @@ func (d *ClientImpl) RegisterTaskQueueWorker( identity string, ) (retErr error) { //revive:disable-next-line:defer - defer d.record("RegisterTaskQueueWorker", &retErr, taskQueueName, taskQueueType, identity)() + defer d.convertAndRecordError("RegisterTaskQueueWorker", &retErr, taskQueueName, taskQueueType, identity)() // Creating request ID out of build ID + TQ name + TQ type. Many updates may come from multiple // matching partitions, we do not want them to create new update requests. @@ -369,7 +357,7 @@ func (d *ClientImpl) DescribeVersion( buildID := v.GetBuildId() //revive:disable-next-line:defer - defer d.record("DescribeVersion", &retErr, deploymentName, buildID)() + defer d.convertAndRecordError("DescribeVersion", &retErr, deploymentName, buildID)() // validate deployment name if err = validateVersionWfParams(worker_versioning.WorkerDeploymentNameFieldName, deploymentName, d.maxIDLengthLimit()); err != nil { @@ -394,7 +382,8 @@ func (d *ClientImpl) DescribeVersion( }, } - res, err := d.historyClient.QueryWorkflow(ctx, req) + res, err := d.queryWorkflowWithRetry(ctx, req) + if err != nil { var notFound *serviceerror.NotFound if errors.As(err, ¬Found) { @@ -431,6 +420,16 @@ func (d *ClientImpl) DescribeVersion( return versionInfo, tqInfos, nil } +func (d *ClientImpl) queryWorkflowWithRetry(ctx context.Context, req *historyservice.QueryWorkflowRequest) (*historyservice.QueryWorkflowResponse, error) { + var res *historyservice.QueryWorkflowResponse + var err error + err = backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { + res, err = d.historyClient.QueryWorkflow(ctx, req) + return err + }, retryPolicy, isRetryableQueryError) + return res, err +} + func (d *ClientImpl) UpdateVersionMetadata( ctx context.Context, namespaceEntry *namespace.Namespace, @@ -440,7 +439,7 @@ func (d *ClientImpl) UpdateVersionMetadata( identity string, ) (_ *deploymentpb.VersionMetadata, retErr error) { //revive:disable-next-line:defer - defer d.record("UpdateVersionMetadata", &retErr, namespaceEntry.Name(), version, upsertEntries, removeEntries, identity)() + defer d.convertAndRecordError("UpdateVersionMetadata", &retErr, namespaceEntry.Name(), version, upsertEntries, removeEntries, identity)() requestID := uuid.NewString() versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(version) @@ -458,7 +457,7 @@ func (d *ClientImpl) UpdateVersionMetadata( } workflowID := GenerateVersionWorkflowID(versionObj.GetDeploymentName(), versionObj.GetBuildId()) - outcome, err := d.update(ctx, namespaceEntry, workflowID, &updatepb.Request{ + outcome, err := updateWorkflow(ctx, d.historyClient, namespaceEntry, workflowID, &updatepb.Request{ Input: &updatepb.Input{Name: UpdateVersionMetadata, Args: updatePayload}, Meta: &updatepb.Meta{UpdateId: requestID, Identity: identity}, }) @@ -467,15 +466,8 @@ func (d *ClientImpl) UpdateVersionMetadata( } if failure := outcome.GetFailure(); failure != nil { - if failure.GetApplicationFailureInfo().GetType() == errVersionDeleted { - return nil, serviceerror.NewNotFoundf(ErrWorkerDeploymentVersionNotFound, versionObj.GetBuildId(), versionObj.GetDeploymentName()) - } return nil, serviceerror.NewInternal(failure.Message) } - success := outcome.GetSuccess() - if success == nil { - return nil, serviceerror.NewInternal("outcome missing success and failure") - } var res deploymentspb.UpdateVersionMetadataResponse if err := sdk.PreferProtoDataConverter.FromPayloads(outcome.GetSuccess(), &res); err != nil { @@ -491,7 +483,7 @@ func (d *ClientImpl) DescribeWorkerDeployment( deploymentName string, ) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error) { //revive:disable-next-line:defer - defer d.record("DescribeWorkerDeployment", &retErr, deploymentName)() + defer d.convertAndRecordError("DescribeWorkerDeployment", &retErr, deploymentName)() // validating params err := validateVersionWfParams(worker_versioning.WorkerDeploymentNameFieldName, deploymentName, d.maxIDLengthLimit()) @@ -512,7 +504,7 @@ func (d *ClientImpl) DescribeWorkerDeployment( }, } - res, err := d.historyClient.QueryWorkflow(ctx, req) + res, err := d.queryWorkflowWithRetry(ctx, req) if err != nil { var notFound *serviceerror.NotFound if errors.As(err, ¬Found) { @@ -582,7 +574,7 @@ func (d *ClientImpl) ListWorkerDeployments( nextPageToken []byte, ) (_ []*deploymentspb.WorkerDeploymentSummary, _ []byte, retError error) { //revive:disable-next-line:defer - defer d.record("ListWorkerDeployments", &retError)() + defer d.convertAndRecordError("ListWorkerDeployments", &retError)() query := WorkerDeploymentVisibilityBaseListQuery @@ -643,7 +635,7 @@ func (d *ClientImpl) SetCurrentVersion( allowNoPollers bool, ) (_ *deploymentspb.SetCurrentVersionResponse, retErr error) { //revive:disable-next-line:defer - defer d.record("SetCurrentVersion", &retErr, namespaceEntry.Name(), version, identity)() + defer d.convertAndRecordError("SetCurrentVersion", &retErr, namespaceEntry.Name(), version, identity)() versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(version) if err != nil { @@ -694,8 +686,9 @@ func (d *ClientImpl) SetCurrentVersion( } } else { // we *don't* want to start the Worker Deployment workflow; it should be started by a poller - outcome, err = d.update( + outcome, err = updateWorkflow( ctx, + d.historyClient, namespaceEntry, GenerateDeploymentWorkflowID(deploymentName), &updatepb.Request{ @@ -728,10 +721,6 @@ func (d *ClientImpl) SetCurrentVersion( } success := outcome.GetSuccess() - if success == nil { - return nil, serviceerror.NewInternal("outcome missing success and failure") - } - if err := sdk.PreferProtoDataConverter.FromPayloads(success, &res); err != nil { return nil, err } @@ -751,7 +740,7 @@ func (d *ClientImpl) SetRampingVersion( allowNoPollers bool, ) (_ *deploymentspb.SetRampingVersionResponse, retErr error) { //revive:disable-next-line:defer - defer d.record("SetRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)() + defer d.convertAndRecordError("SetRampingVersion", &retErr, namespaceEntry.Name(), version, percentage, identity)() var err error var versionObj *deploymentspb.WorkerDeploymentVersion @@ -808,8 +797,9 @@ func (d *ClientImpl) SetRampingVersion( return nil, err } } else { - outcome, err = d.update( + outcome, err = updateWorkflow( ctx, + d.historyClient, namespaceEntry, workflowID, &updatepb.Request{ @@ -845,10 +835,6 @@ func (d *ClientImpl) SetRampingVersion( } success := outcome.GetSuccess() - if success == nil { - return nil, serviceerror.NewInternal("outcome missing success and failure") - } - if err := sdk.PreferProtoDataConverter.FromPayloads(success, &res); err != nil { return nil, err } @@ -870,7 +856,7 @@ func (d *ClientImpl) DeleteWorkerDeploymentVersion( buildId := v.GetBuildId() //revive:disable-next-line:defer - defer d.record("DeleteWorkerDeploymentVersion", &retErr, namespaceEntry.Name(), deploymentName, buildId)() + defer d.convertAndRecordError("DeleteWorkerDeploymentVersion", &retErr, namespaceEntry.Name(), deploymentName, buildId)() requestID := uuid.NewString() if identity == "" { @@ -896,8 +882,9 @@ func (d *ClientImpl) DeleteWorkerDeploymentVersion( workflowID := GenerateDeploymentWorkflowID(deploymentName) - outcome, err := d.update( + outcome, err := updateWorkflow( ctx, + d.historyClient, namespaceEntry, workflowID, &updatepb.Request{ @@ -919,11 +906,6 @@ func (d *ClientImpl) DeleteWorkerDeploymentVersion( } return serviceerror.NewInternal(failure.Message) } - - success := outcome.GetSuccess() - if success == nil { - return serviceerror.NewInternal("outcome missing success and failure") - } return nil } @@ -934,7 +916,7 @@ func (d *ClientImpl) DeleteWorkerDeployment( identity string, ) (retErr error) { //revive:disable-next-line:defer - defer d.record("DeleteWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)() + defer d.convertAndRecordError("DeleteWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)() // validating params err := validateVersionWfParams(worker_versioning.WorkerDeploymentNameFieldName, deploymentName, d.maxIDLengthLimit()) @@ -956,8 +938,9 @@ func (d *ClientImpl) DeleteWorkerDeployment( } workflowID := GenerateDeploymentWorkflowID(deploymentName) - outcome, err := d.update( + outcome, err := updateWorkflow( ctx, + d.historyClient, namespaceEntry, workflowID, &updatepb.Request{ @@ -976,12 +959,6 @@ func (d *ClientImpl) DeleteWorkerDeployment( if failure := outcome.GetFailure(); failure != nil { return serviceerror.NewInternal(failure.Message) } - - success := outcome.GetSuccess() - if success == nil { - return serviceerror.NewInternal("outcome missing success and failure") - } - return nil } @@ -993,7 +970,7 @@ func (d *ClientImpl) StartWorkerDeployment( requestID string, ) (retErr error) { //revive:disable-next-line:defer - defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)() + defer d.convertAndRecordError("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)() workflowID := GenerateDeploymentWorkflowID(deploymentName) @@ -1006,7 +983,7 @@ func (d *ClientImpl) StartWorkerDeployment( return err } - startReq := d.makeStartRequest(requestID, workflowID, identity, WorkerDeploymentWorkflowType, namespaceEntry, nil, input) + startReq := makeStartRequest(requestID, workflowID, identity, WorkerDeploymentWorkflowType, namespaceEntry, nil, input) historyStartReq := &historyservice.StartWorkflowExecutionRequest{ NamespaceId: namespaceEntry.ID().String(), @@ -1025,7 +1002,7 @@ func (d *ClientImpl) StartWorkerDeploymentVersion( requestID string, ) (retErr error) { //revive:disable-next-line:defer - defer d.record("StartWorkerDeploymentVersion", &retErr, namespaceEntry.Name(), deploymentName, identity)() + defer d.convertAndRecordError("StartWorkerDeploymentVersion", &retErr, namespaceEntry.Name(), deploymentName, identity)() err := validateVersionWfParams(worker_versioning.WorkerDeploymentNameFieldName, deploymentName, d.maxIDLengthLimit()) if err != nil { @@ -1041,7 +1018,7 @@ func (d *ClientImpl) StartWorkerDeploymentVersion( if err != nil { return err } - startReq := d.makeStartRequest(requestID, workflowID, identity, WorkerDeploymentVersionWorkflowType, namespaceEntry, nil, input) + startReq := makeStartRequest(requestID, workflowID, identity, WorkerDeploymentVersionWorkflowType, namespaceEntry, nil, input) historyStartReq := &historyservice.StartWorkflowExecutionRequest{ NamespaceId: namespaceEntry.ID().String(), @@ -1061,7 +1038,7 @@ func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment( requestID string, ) (_ *deploymentspb.SyncVersionStateResponse, retErr error) { //revive:disable-next-line:defer - defer d.record("SyncVersionWorkflowFromWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, version, args, identity)() + defer d.convertAndRecordError("SyncVersionWorkflowFromWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, version, args, identity)() versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(version) if err != nil { @@ -1076,8 +1053,9 @@ func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment( workflowID := GenerateVersionWorkflowID(deploymentName, versionObj.GetBuildId()) // updates an already existing deployment version workflow. - outcome, err := d.update( + outcome, err := updateWorkflow( ctx, + d.historyClient, namespaceEntry, workflowID, &updatepb.Request{ @@ -1102,10 +1080,6 @@ func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment( } success := outcome.GetSuccess() - if success == nil { - return nil, serviceerror.NewInternal("outcome missing success and failure") - } - var res deploymentspb.SyncVersionStateResponse if err := sdk.PreferProtoDataConverter.FromPayloads(success, &res); err != nil { return nil, err @@ -1113,111 +1087,6 @@ func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment( return &res, nil } -func (d *ClientImpl) DeleteVersionFromWorkerDeployment( - ctx context.Context, - namespaceEntry *namespace.Namespace, - deploymentName, version string, - identity string, - requestID string, - skipDrainage bool, - asyncPropagation bool, -) (retErr error) { - //revive:disable-next-line:defer - defer d.record("DeleteVersionFromWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, version, identity, skipDrainage)() - - versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(version) - if err != nil { - return err - } - - workflowID := GenerateVersionWorkflowID(deploymentName, versionObj.GetBuildId()) - updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.DeleteVersionArgs{ - Identity: identity, - Version: version, - SkipDrainage: skipDrainage, - AsyncPropagation: asyncPropagation, - }) - if err != nil { - return err - } - - outcome, err := d.update( - ctx, - namespaceEntry, - workflowID, - &updatepb.Request{ - Input: &updatepb.Input{Name: DeleteVersion, Args: updatePayload}, - Meta: &updatepb.Meta{UpdateId: requestID, Identity: identity}, - }, - ) - if err != nil { - return err - } - - if failure := outcome.GetFailure(); failure != nil { - if strings.Contains(failure.Message, errVersionIsDrainingSuffix) { - return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionIsDraining, worker_versioning.WorkerDeploymentVersionToStringV32(versionObj)), errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts - } else if strings.Contains(failure.Message, errVersionHasPollersSuffix) { - return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionHasPollers, worker_versioning.WorkerDeploymentVersionToStringV32(versionObj)), errFailedPrecondition, nil) // non-retryable error to stop multiple activity attempts - } - return serviceerror.NewInternal(failure.Message) - } - - success := outcome.GetSuccess() - if success == nil { - return serviceerror.NewInternal("outcome missing success and failure") - } - return nil -} - -// update updates an already existing deployment version/deployment workflow. -func (d *ClientImpl) update( - ctx context.Context, - namespaceEntry *namespace.Namespace, - workflowID string, - updateRequest *updatepb.Request, -) (*updatepb.Outcome, error) { - - updateReq := &historyservice.UpdateWorkflowExecutionRequest{ - NamespaceId: namespaceEntry.ID().String(), - Request: &workflowservice.UpdateWorkflowExecutionRequest{ - Namespace: namespaceEntry.Name().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - }, - Request: updateRequest, - WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED}, - }, - } - - policy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) - - var outcome *updatepb.Outcome - err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { - // historyClient retries internally on retryable rpc errors, we just have to retry on - // successful but un-completed responses. - res, err := d.historyClient.UpdateWorkflowExecution(ctx, updateReq) - if err != nil { - return err - } - - if res.GetResponse() == nil { - return serviceerror.NewInternal("failed to update workflow with workflowID: " + workflowID) - } - - stage := res.GetResponse().GetStage() - if stage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { - // update not completed, try again - return errRetry - } - - outcome = res.GetResponse().GetOutcome() - return nil - }, policy, isRetryableUpdateError) - - return outcome, err -} - func (d *ClientImpl) updateWithStartWorkerDeployment( ctx context.Context, namespaceEntry *namespace.Namespace, @@ -1266,8 +1135,9 @@ func (d *ClientImpl) updateWithStartWorkerDeployment( return nil, err } - return d.updateWithStart( + return updateWorkflowWithStart( ctx, + d.historyClient, namespaceEntry, WorkerDeploymentWorkflowType, workflowID, @@ -1293,6 +1163,7 @@ func (d *ClientImpl) countWorkerDeployments( Query: query, }, ) + metrics.WorkerDeploymentVersionVisibilityQueryCount.With(d.metricsHandler).Record(1, metrics.OperationTag("countWorkerDeployments")) if err != nil { return 0, err } @@ -1322,8 +1193,9 @@ func (d *ClientImpl) updateWithStartWorkerDeploymentVersion( return nil, err } - return d.updateWithStart( + return updateWorkflowWithStart( ctx, + d.historyClient, namespaceEntry, WorkerDeploymentVersionWorkflowType, workflowID, @@ -1335,117 +1207,7 @@ func (d *ClientImpl) updateWithStartWorkerDeploymentVersion( ) } -func (d *ClientImpl) updateWithStart( - ctx context.Context, - namespaceEntry *namespace.Namespace, - workflowType string, - workflowID string, - memo *commonpb.Memo, - input *commonpb.Payloads, - updateRequest *updatepb.Request, - identity string, - requestID string, -) (*updatepb.Outcome, error) { - // Start workflow execution, if it hasn't already - startReq := d.makeStartRequest(requestID, workflowID, identity, workflowType, namespaceEntry, memo, input) - - updateReq := &workflowservice.UpdateWorkflowExecutionRequest{ - Namespace: namespaceEntry.Name().String(), - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - }, - Request: updateRequest, - WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED}, - } - - // This is an atomic operation; if one operation fails, both will. - multiOpReq := &historyservice.ExecuteMultiOperationRequest{ - NamespaceId: namespaceEntry.ID().String(), - WorkflowId: workflowID, - Operations: []*historyservice.ExecuteMultiOperationRequest_Operation{ - { - Operation: &historyservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ - StartWorkflow: &historyservice.StartWorkflowExecutionRequest{ - NamespaceId: namespaceEntry.ID().String(), - StartRequest: startReq, - }, - }, - }, - { - Operation: &historyservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ - UpdateWorkflow: &historyservice.UpdateWorkflowExecutionRequest{ - NamespaceId: namespaceEntry.ID().String(), - Request: updateReq, - }, - }, - }, - }, - } - - policy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) - var outcome *updatepb.Outcome - - err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { - // historyClient retries internally on retryable rpc errors, we just have to retry on - // successful but un-completed responses. - res, err := d.historyClient.ExecuteMultiOperation(ctx, multiOpReq) - if err != nil { - return err - } - - // we should get exactly one of each of these - var startRes *historyservice.StartWorkflowExecutionResponse - var updateRes *workflowservice.UpdateWorkflowExecutionResponse - for _, response := range res.Responses { - if sr := response.GetStartWorkflow(); sr != nil { - startRes = sr - } else if ur := response.GetUpdateWorkflow().GetResponse(); ur != nil { - updateRes = ur - } - } - if startRes == nil { - return serviceerror.NewInternal("failed to start deployment workflow") - } else if updateRes == nil { - return serviceerror.NewInternal("failed to update deployment workflow") - } - - if updateRes.Stage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { - // update not completed, try again - return errRetry - } - - outcome = updateRes.GetOutcome() - return nil - }, policy, isRetryableUpdateError) - - return outcome, err -} - -func isRetryableUpdateError(err error) bool { - if errors.Is(err, errRetry) || err.Error() == consts.ErrWorkflowClosing.Error() { - return true - } - - // All updates that are admitted as the workflow is closing due to CaN are considered retryable. - // The ErrWorkflowClosing could be nested. - var errMultiOps *serviceerror.MultiOperationExecution - if errors.As(err, &errMultiOps) { - for _, e := range errMultiOps.OperationErrors() { - if e.Error() == consts.ErrWorkflowClosing.Error() { - return true - } - } - } - return false -} - -func (d *ClientImpl) buildSearchAttributes() *commonpb.SearchAttributes { - sa := &commonpb.SearchAttributes{} - searchattribute.AddSearchAttribute(&sa, sadefs.TemporalNamespaceDivision, payload.EncodeString(WorkerDeploymentNamespaceDivision)) - return sa -} - -func (d *ClientImpl) record(operation string, retErr *error, args ...any) func() { +func (d *ClientImpl) convertAndRecordError(operation string, retErr *error, args ...any) func() { start := time.Now() return func() { elapsed := time.Since(start) @@ -1453,20 +1215,50 @@ func (d *ClientImpl) record(operation string, retErr *error, args ...any) func() // TODO: add metrics recording here if *retErr != nil { - if isFailedPrecondition(*retErr) { - d.logger.Debug("deployment client failure due to a failed precondition", + if isFailedPreconditionOrNotFound(*retErr) { + d.logger.Debug("deployment client failure due to a failed precondition or not found error", tag.Error(*retErr), tag.Operation(operation), tag.NewDurationTag("elapsed", elapsed), tag.NewAnyTag("args", args), ) } else { - d.logger.Error("deployment client error", - tag.Error(*retErr), - tag.Operation(operation), - tag.NewDurationTag("elapsed", elapsed), - tag.NewAnyTag("args", args), - ) + if isRetryableUpdateError(*retErr) || isRetryableQueryError(*retErr) { + d.logger.Debug("deployment client throttling due to retriable error", + tag.Error(*retErr), + tag.Operation(operation), + tag.NewDurationTag("elapsed", elapsed), + tag.NewAnyTag("args", args), + ) + var errResourceExhausted *serviceerror.ResourceExhausted + if !errors.As(*retErr, &errResourceExhausted) || errResourceExhausted.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_WORKER_DEPLOYMENT_LIMITS { + // if it's not a deployment limits error, we don't want to expose the underlying cause to the user + *retErr = &serviceerror.ResourceExhausted{ + Message: ErrTooManyRequests, + Scope: enumspb.RESOURCE_EXHAUSTED_SCOPE_NAMESPACE, + // These errors are caused by workflow throughput limits, so BUSY_WORKFLOW is the most appropriate cause. + // This cause is not sent back to the user. + Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, + } + } + } else if errors.Is(*retErr, context.DeadlineExceeded) || + errors.Is(*retErr, context.Canceled) || + common.IsContextDeadlineExceededErr(*retErr) || + common.IsContextCanceledErr(*retErr) { + d.logger.Debug("deployment client timeout or cancellation", + tag.Error(*retErr), + tag.Operation(operation), + tag.NewDurationTag("elapsed", elapsed), + tag.NewAnyTag("args", args), + ) + } else { + d.logger.Error("deployment client unexpected error", + tag.Error(*retErr), + tag.Operation(operation), + tag.NewDurationTag("elapsed", elapsed), + tag.NewAnyTag("args", args), + ) + } } } else { d.logger.Debug("deployment client success", @@ -1645,6 +1437,7 @@ func (d *ClientImpl) GetVersionDrainageStatus( Query: makeDeploymentQuery(worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(version))), } countResponse, err := d.visibilityManager.CountWorkflowExecutions(ctx, &countRequest) + metrics.WorkerDeploymentVersionVisibilityQueryCount.With(d.metricsHandler).Record(1, metrics.OperationTag("GetVersionDrainageStatus")) if err != nil { return enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED, err } @@ -1858,24 +1651,3 @@ func (d *ClientImpl) makeVersionWorkflowArgs( }, } } - -func (d *ClientImpl) makeStartRequest( - requestID, workflowID, identity, workflowType string, - namespaceEntry *namespace.Namespace, - memo *commonpb.Memo, - input *commonpb.Payloads, -) *workflowservice.StartWorkflowExecutionRequest { - return &workflowservice.StartWorkflowExecutionRequest{ - RequestId: requestID, - Namespace: namespaceEntry.Name().String(), - WorkflowId: workflowID, - WorkflowType: &commonpb.WorkflowType{Name: workflowType}, - TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue}, - Input: input, - WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, - WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, - SearchAttributes: d.buildSearchAttributes(), - Memo: memo, - Identity: identity, - } -} diff --git a/service/worker/workerdeployment/fx.go b/service/worker/workerdeployment/fx.go index b4e1736271..57d4d5e8bf 100644 --- a/service/worker/workerdeployment/fx.go +++ b/service/worker/workerdeployment/fx.go @@ -44,6 +44,7 @@ type ( Logger log.Logger ClientFactory sdk.ClientFactory MatchingClient resource.MatchingClient + HistoryClient resource.HistoryClient WorkerDeploymentClient Client } @@ -65,6 +66,7 @@ func ClientProvider( visibilityManager manager.VisibilityManager, dc *dynamicconfig.Collection, testHooks testhooks.TestHooks, + metricsHandler metrics.Handler, ) Client { return &ClientImpl{ logger: logger, @@ -76,6 +78,7 @@ func ClientProvider( maxTaskQueuesInDeploymentVersion: dynamicconfig.MatchingMaxTaskQueuesInDeploymentVersion.Get(dc), maxDeployments: dynamicconfig.MatchingMaxDeployments.Get(dc), testHooks: testHooks, + metricsHandler: metricsHandler, } } @@ -133,6 +136,7 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na namespace: ns, deploymentClient: s.activityDeps.WorkerDeploymentClient, matchingClient: s.activityDeps.MatchingClient, + historyClient: s.activityDeps.HistoryClient, } registry.RegisterActivity(activities) return nil diff --git a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_initial.json.gz b/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_initial.json.gz deleted file mode 100644 index 46a35d324f..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_initial.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_maprange.json.gz b/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_maprange.json.gz deleted file mode 100644 index dd3436a0b3..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_deployment-workflow_maprange.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_worker_deployment_wf_1748123053.json.gz b/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_worker_deployment_wf_1748123053.json.gz deleted file mode 100644 index ff1aba3e42..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v0/run_1753913370/replay_worker_deployment_wf_1748123053.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_019b0571-2ff6-7e21-bde0-068c74babed3.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_019b0571-2ff6-7e21-bde0-068c74babed3.json.gz deleted file mode 100644 index 70d74d3740..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_019b0571-2ff6-7e21-bde0-068c74babed3.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_3144c171-8e69-435b-9665-a8fbe2c654b7.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_3144c171-8e69-435b-9665-a8fbe2c654b7.json.gz deleted file mode 100644 index f6fc8d312c..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_3144c171-8e69-435b-9665-a8fbe2c654b7.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_5065e55c-b84a-483d-aad5-bccf85c1e61c.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_5065e55c-b84a-483d-aad5-bccf85c1e61c.json.gz deleted file mode 100644 index 94e6697e0c..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_5065e55c-b84a-483d-aad5-bccf85c1e61c.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_56b48846-7f6d-448d-97d5-83a7eb9dde36.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_56b48846-7f6d-448d-97d5-83a7eb9dde36.json.gz deleted file mode 100644 index 6d04bad9f4..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_56b48846-7f6d-448d-97d5-83a7eb9dde36.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_763a5a49-1aca-4041-8b20-0f2dd0d20111.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_763a5a49-1aca-4041-8b20-0f2dd0d20111.json.gz deleted file mode 100644 index 7a0f8cde15..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_763a5a49-1aca-4041-8b20-0f2dd0d20111.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_bcfd0a93-3fe3-4d51-b14e-43884c2edd59.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_bcfd0a93-3fe3-4d51-b14e-43884c2edd59.json.gz deleted file mode 100644 index 0cdf76b49a..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_bcfd0a93-3fe3-4d51-b14e-43884c2edd59.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d22a9fc3-bac3-4fb6-8288-70337844d695.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d22a9fc3-bac3-4fb6-8288-70337844d695.json.gz deleted file mode 100644 index 18af278765..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d22a9fc3-bac3-4fb6-8288-70337844d695.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d37eb2e7-09f8-40d4-816a-a2670fcb6c3d.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d37eb2e7-09f8-40d4-816a-a2670fcb6c3d.json.gz deleted file mode 100644 index c6261f4249..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_d37eb2e7-09f8-40d4-816a-a2670fcb6c3d.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f2ed4344-bbd3-4f0e-9fb6-a802a777e89b.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f2ed4344-bbd3-4f0e-9fb6-a802a777e89b.json.gz deleted file mode 100644 index 6c70e46b26..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f2ed4344-bbd3-4f0e-9fb6-a802a777e89b.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f4a0028c-eb47-4bc6-9142-6d2ac5397261.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f4a0028c-eb47-4bc6-9142-6d2ac5397261.json.gz deleted file mode 100644 index 966a959e63..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_f4a0028c-eb47-4bc6-9142-6d2ac5397261.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_fd3ecdd7-3900-4c47-8ea4-519dc09ae352.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_fd3ecdd7-3900-4c47-8ea4-519dc09ae352.json.gz deleted file mode 100644 index 629151c1b5..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_version_wf_run_fd3ecdd7-3900-4c47-8ea4-519dc09ae352.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_019b0571-2b9f-718a-9977-8d7671b071ab.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_019b0571-2b9f-718a-9977-8d7671b071ab.json.gz deleted file mode 100644 index 69af833e70..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_019b0571-2b9f-718a-9977-8d7671b071ab.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_0ea6fc1a-b364-4b35-92e3-f2ea041e72bf.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_0ea6fc1a-b364-4b35-92e3-f2ea041e72bf.json.gz deleted file mode 100644 index 20ea0f4325..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_0ea6fc1a-b364-4b35-92e3-f2ea041e72bf.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_10c3e0a8-3ce8-4ddb-ba6e-adf4931e881f.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_10c3e0a8-3ce8-4ddb-ba6e-adf4931e881f.json.gz deleted file mode 100644 index a77fbca380..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_10c3e0a8-3ce8-4ddb-ba6e-adf4931e881f.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_28ed8fba-920e-45c4-8759-8ee9599109d4.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_28ed8fba-920e-45c4-8759-8ee9599109d4.json.gz deleted file mode 100644 index 08d4bcd9e3..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_28ed8fba-920e-45c4-8759-8ee9599109d4.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_37fecbcd-5d15-44a2-8c6d-7cbb4187b276.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_37fecbcd-5d15-44a2-8c6d-7cbb4187b276.json.gz deleted file mode 100644 index 3a02daf432..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_37fecbcd-5d15-44a2-8c6d-7cbb4187b276.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_3f62c950-252c-446d-b460-14cbcea80857.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_3f62c950-252c-446d-b460-14cbcea80857.json.gz deleted file mode 100644 index 34fbddbe3f..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_3f62c950-252c-446d-b460-14cbcea80857.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_42672cd2-eecd-442f-9fcd-e6206768926a.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_42672cd2-eecd-442f-9fcd-e6206768926a.json.gz deleted file mode 100644 index 6fa0ea017e..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_42672cd2-eecd-442f-9fcd-e6206768926a.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_4bda9742-adb0-4797-9fd7-b5cb7e37b5fb.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_4bda9742-adb0-4797-9fd7-b5cb7e37b5fb.json.gz deleted file mode 100644 index b767f7373e..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_4bda9742-adb0-4797-9fd7-b5cb7e37b5fb.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_6e6e7a31-c5bd-4460-b7a0-05169c048ac0.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_6e6e7a31-c5bd-4460-b7a0-05169c048ac0.json.gz deleted file mode 100644 index 331edf312c..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_6e6e7a31-c5bd-4460-b7a0-05169c048ac0.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7809581b-0521-4754-a29b-e3a135b807f2.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7809581b-0521-4754-a29b-e3a135b807f2.json.gz deleted file mode 100644 index a3b65a3734..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7809581b-0521-4754-a29b-e3a135b807f2.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7ac46524-1c7b-4463-8213-39f6f4dd31e4.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7ac46524-1c7b-4463-8213-39f6f4dd31e4.json.gz deleted file mode 100644 index 42876e0b91..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7ac46524-1c7b-4463-8213-39f6f4dd31e4.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7b3e54fa-f4f0-43eb-9bf4-c9563419d263.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7b3e54fa-f4f0-43eb-9bf4-c9563419d263.json.gz deleted file mode 100644 index d1eeec88fd..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_7b3e54fa-f4f0-43eb-9bf4-c9563419d263.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_8e820cde-82e1-4b72-931b-88bc479991ec.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_8e820cde-82e1-4b72-931b-88bc479991ec.json.gz deleted file mode 100644 index 3dd80319c6..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_8e820cde-82e1-4b72-931b-88bc479991ec.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_a6aac9f0-7797-4456-9ba6-29bf6c7ee38d.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_a6aac9f0-7797-4456-9ba6-29bf6c7ee38d.json.gz deleted file mode 100644 index 43536f3ba6..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_a6aac9f0-7797-4456-9ba6-29bf6c7ee38d.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c740e74e-739f-4aa0-a986-bc03051d84e2.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c740e74e-739f-4aa0-a986-bc03051d84e2.json.gz deleted file mode 100644 index bb747e5698..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c740e74e-739f-4aa0-a986-bc03051d84e2.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c793628e-97da-4130-890b-5154ea122e85.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c793628e-97da-4130-890b-5154ea122e85.json.gz deleted file mode 100644 index b1a9ca4ebb..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c793628e-97da-4130-890b-5154ea122e85.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c8603c68-f433-4554-8cf1-1a2ceb090f87.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c8603c68-f433-4554-8cf1-1a2ceb090f87.json.gz deleted file mode 100644 index f58080a263..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c8603c68-f433-4554-8cf1-1a2ceb090f87.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c86768a2-ab3a-428e-8159-33cbefcc7ec1.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c86768a2-ab3a-428e-8159-33cbefcc7ec1.json.gz deleted file mode 100644 index b27b344472..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_c86768a2-ab3a-428e-8159-33cbefcc7ec1.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_e4e589f1-064d-492f-9bf4-634ff49e1489.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_e4e589f1-064d-492f-9bf4-634ff49e1489.json.gz deleted file mode 100644 index da31d14c9f..0000000000 Binary files a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/replay_worker_deployment_wf_run_e4e589f1-064d-492f-9bf4-634ff49e1489.json.gz and /dev/null differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/expected_counts.txt b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/expected_counts.txt similarity index 72% rename from service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/expected_counts.txt rename to service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/expected_counts.txt index f42c106632..4b7fa8ba2b 100644 --- a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765322898/expected_counts.txt +++ b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/expected_counts.txt @@ -1,5 +1,5 @@ # Expected workflow counts for replay testing -# Generated by generate_history.sh on Tue Dec 9 15:28:20 PST 2025 +# Generated by generate_history.sh on Fri Dec 12 17:31:46 PST 2025 EXPECTED_DEPLOYMENT_WORKFLOWS=19 EXPECTED_VERSION_WORKFLOWS=11 ACTUAL_DEPLOYMENT_WORKFLOWS=19 diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_019b1555-4606-7ed4-96f0-2378b646d14d.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_019b1555-4606-7ed4-96f0-2378b646d14d.json.gz new file mode 100644 index 0000000000..f4522f7d37 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_019b1555-4606-7ed4-96f0-2378b646d14d.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_10ab002f-0d3a-4ee5-88dd-0e79ec0d1d67.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_10ab002f-0d3a-4ee5-88dd-0e79ec0d1d67.json.gz new file mode 100644 index 0000000000..eeee62c897 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_10ab002f-0d3a-4ee5-88dd-0e79ec0d1d67.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_14ea3e44-70f4-4e35-9c01-11772eb39a33.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_14ea3e44-70f4-4e35-9c01-11772eb39a33.json.gz new file mode 100644 index 0000000000..339e773292 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_14ea3e44-70f4-4e35-9c01-11772eb39a33.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1a27e6e0-e78c-4bec-86f1-b251cf2eccaf.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1a27e6e0-e78c-4bec-86f1-b251cf2eccaf.json.gz new file mode 100644 index 0000000000..b89130048b Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1a27e6e0-e78c-4bec-86f1-b251cf2eccaf.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1c4892fb-9615-4c84-9300-0677505ff9e5.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1c4892fb-9615-4c84-9300-0677505ff9e5.json.gz new file mode 100644 index 0000000000..04fe652e65 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_1c4892fb-9615-4c84-9300-0677505ff9e5.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_27d2427e-f09f-4f93-9618-31af002ae6a9.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_27d2427e-f09f-4f93-9618-31af002ae6a9.json.gz new file mode 100644 index 0000000000..093a484eaa Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_27d2427e-f09f-4f93-9618-31af002ae6a9.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_8139ca24-abf1-4146-9e1d-ca49d2785d54.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_8139ca24-abf1-4146-9e1d-ca49d2785d54.json.gz new file mode 100644 index 0000000000..ed8e891226 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_8139ca24-abf1-4146-9e1d-ca49d2785d54.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_a85e9798-5fa4-4f2e-af4a-f91036064b0b.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_a85e9798-5fa4-4f2e-af4a-f91036064b0b.json.gz new file mode 100644 index 0000000000..a99e423789 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_a85e9798-5fa4-4f2e-af4a-f91036064b0b.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_d9a7f46a-6ccd-4f54-a5e1-536b9447ce0f.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_d9a7f46a-6ccd-4f54-a5e1-536b9447ce0f.json.gz new file mode 100644 index 0000000000..b130110b46 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_d9a7f46a-6ccd-4f54-a5e1-536b9447ce0f.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_dfffa73b-f0be-4059-bb6b-bdca1e4a9553.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_dfffa73b-f0be-4059-bb6b-bdca1e4a9553.json.gz new file mode 100644 index 0000000000..5a4ac20863 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_dfffa73b-f0be-4059-bb6b-bdca1e4a9553.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_e23aef46-0d38-49ad-8eaf-6eefb1683095.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_e23aef46-0d38-49ad-8eaf-6eefb1683095.json.gz new file mode 100644 index 0000000000..718af761d2 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_version_wf_run_e23aef46-0d38-49ad-8eaf-6eefb1683095.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_019b1555-41fb-7af4-984a-48b0b548e052.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_019b1555-41fb-7af4-984a-48b0b548e052.json.gz new file mode 100644 index 0000000000..059b18c951 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_019b1555-41fb-7af4-984a-48b0b548e052.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_0641674c-b31c-4646-adba-bf2821320152.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_0641674c-b31c-4646-adba-bf2821320152.json.gz new file mode 100644 index 0000000000..ccd6760771 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_0641674c-b31c-4646-adba-bf2821320152.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_08178650-bbf2-4ccf-ac3c-3d92c90c4c36.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_08178650-bbf2-4ccf-ac3c-3d92c90c4c36.json.gz new file mode 100644 index 0000000000..29ef83fc0c Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_08178650-bbf2-4ccf-ac3c-3d92c90c4c36.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_22ba0b7b-c009-4e24-8501-32abb53408f8.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_22ba0b7b-c009-4e24-8501-32abb53408f8.json.gz new file mode 100644 index 0000000000..034d88c7a9 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_22ba0b7b-c009-4e24-8501-32abb53408f8.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_23f2aa05-91c7-4741-8bd0-c50aabf0ec1a.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_23f2aa05-91c7-4741-8bd0-c50aabf0ec1a.json.gz new file mode 100644 index 0000000000..d97f9c6531 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_23f2aa05-91c7-4741-8bd0-c50aabf0ec1a.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_2899e468-6c15-45ce-a9bb-1eabdc07b11d.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_2899e468-6c15-45ce-a9bb-1eabdc07b11d.json.gz new file mode 100644 index 0000000000..0c3b576678 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_2899e468-6c15-45ce-a9bb-1eabdc07b11d.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_33862798-4968-4a9e-867c-588181be3cfc.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_33862798-4968-4a9e-867c-588181be3cfc.json.gz new file mode 100644 index 0000000000..fd5fe179f9 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_33862798-4968-4a9e-867c-588181be3cfc.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_42ee4ea7-f980-404c-98e6-75d4417ccef6.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_42ee4ea7-f980-404c-98e6-75d4417ccef6.json.gz new file mode 100644 index 0000000000..00d24ef279 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_42ee4ea7-f980-404c-98e6-75d4417ccef6.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_55eeeb0d-15f1-48f9-9d9c-f4bb20403f31.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_55eeeb0d-15f1-48f9-9d9c-f4bb20403f31.json.gz new file mode 100644 index 0000000000..440eda0296 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_55eeeb0d-15f1-48f9-9d9c-f4bb20403f31.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_58116cfa-ca67-40a8-a89d-02934b8fc4c0.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_58116cfa-ca67-40a8-a89d-02934b8fc4c0.json.gz new file mode 100644 index 0000000000..efd3aeccf7 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_58116cfa-ca67-40a8-a89d-02934b8fc4c0.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_608bc2bb-589b-4eb9-8571-1d83d71b77d8.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_608bc2bb-589b-4eb9-8571-1d83d71b77d8.json.gz new file mode 100644 index 0000000000..94a5089c76 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_608bc2bb-589b-4eb9-8571-1d83d71b77d8.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_6a1e16da-9268-44c7-9b3d-d4aebb2f6f67.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_6a1e16da-9268-44c7-9b3d-d4aebb2f6f67.json.gz new file mode 100644 index 0000000000..abdd31730e Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_6a1e16da-9268-44c7-9b3d-d4aebb2f6f67.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_71d86583-748b-4f5c-b757-78760bde1614.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_71d86583-748b-4f5c-b757-78760bde1614.json.gz new file mode 100644 index 0000000000..65b32aeaa4 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_71d86583-748b-4f5c-b757-78760bde1614.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_ca534f99-cd6d-4a03-816d-b7c754e60ebe.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_ca534f99-cd6d-4a03-816d-b7c754e60ebe.json.gz new file mode 100644 index 0000000000..9928be3ea9 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_ca534f99-cd6d-4a03-816d-b7c754e60ebe.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_deaef8bd-a07d-4191-a30c-5781321e1b79.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_deaef8bd-a07d-4191-a30c-5781321e1b79.json.gz new file mode 100644 index 0000000000..92c5d33a52 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_deaef8bd-a07d-4191-a30c-5781321e1b79.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_e9390f52-eddb-4a3f-b8c8-8cc599c03acb.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_e9390f52-eddb-4a3f-b8c8-8cc599c03acb.json.gz new file mode 100644 index 0000000000..5b2b7a629f Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_e9390f52-eddb-4a3f-b8c8-8cc599c03acb.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f18f813b-0c5c-4c7f-874f-900e4ccac3f9.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f18f813b-0c5c-4c7f-874f-900e4ccac3f9.json.gz new file mode 100644 index 0000000000..c6c0bf954d Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f18f813b-0c5c-4c7f-874f-900e4ccac3f9.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f41f1a81-884c-4952-92ba-7518dfa90739.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f41f1a81-884c-4952-92ba-7518dfa90739.json.gz new file mode 100644 index 0000000000..aeb67550c6 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f41f1a81-884c-4952-92ba-7518dfa90739.json.gz differ diff --git a/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f9449ab5-e1c8-48ee-b192-c123bcba85d2.json.gz b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f9449ab5-e1c8-48ee-b192-c123bcba85d2.json.gz new file mode 100644 index 0000000000..791c1a3cd9 Binary files /dev/null and b/service/worker/workerdeployment/replaytester/testdata/v2/run_1765589504/replay_worker_deployment_wf_run_f9449ab5-e1c8-48ee-b192-c123bcba85d2.json.gz differ diff --git a/service/worker/workerdeployment/util.go b/service/worker/workerdeployment/util.go index 8ea02de30d..953cd58a95 100644 --- a/service/worker/workerdeployment/util.go +++ b/service/worker/workerdeployment/util.go @@ -1,6 +1,7 @@ package workerdeployment import ( + "context" "errors" "fmt" "strings" @@ -9,13 +10,26 @@ import ( commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + updatepb "go.temporal.io/api/update/v1" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" deploymentspb "go.temporal.io/server/api/deployment/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/sdk" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/worker_versioning" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" + update2 "go.temporal.io/server/service/history/workflow/update" ) const ( @@ -27,17 +41,16 @@ const ( WorkerDeploymentNamespaceDivision = "TemporalWorkerDeployment" // Updates - RegisterWorkerInDeploymentVersion = "register-task-queue-worker" // for Worker Deployment Version wf - SyncVersionState = "sync-version-state" // for Worker Deployment Version wfs - UpdateVersionMetadata = "update-version-metadata" // for Worker Deployment Version wfs - RegisterWorkerInWorkerDeployment = "register-worker-in-deployment" // for Worker Deployment wfs - SetCurrentVersion = "set-current-version" // for Worker Deployment wfs - SetRampingVersion = "set-ramping-version" // for Worker Deployment wfs - AddVersionToWorkerDeployment = "add-version-to-worker-deployment" // for Worker Deployment wfs - DeleteVersion = "delete-version" // for WorkerDeployment wfs - DeleteDeployment = "delete-deployment" // for WorkerDeployment wfs - SetManagerIdentity = "set-manager-identity" // for WorkerDeployment wfs - serverDeleteVersionIdentity = "try-delete-for-add-version" // identity of the worker-deployment workflow when it tries to delete a version on the event that the addition + RegisterWorkerInDeploymentVersion = "register-task-queue-worker" // for Worker Deployment Version wf + SyncVersionState = "sync-version-state" // for Worker Deployment Version wfs + UpdateVersionMetadata = "update-version-metadata" // for Worker Deployment Version wfs + RegisterWorkerInWorkerDeployment = "register-worker-in-deployment" // for Worker Deployment wfs + SetCurrentVersion = "set-current-version" // for Worker Deployment wfs + SetRampingVersion = "set-ramping-version" // for Worker Deployment wfs + DeleteVersion = "delete-version" // for WorkerDeployment wfs + DeleteDeployment = "delete-deployment" // for WorkerDeployment wfs + SetManagerIdentity = "set-manager-identity" // for WorkerDeployment wfs + serverDeleteVersionIdentity = "try-delete-for-add-version" // identity of the worker-deployment workflow when it tries to delete a version on the event that the addition // of a version exceeds the max number of versions allowed in a worker-deployment (defaultMaxVersions) // Signals @@ -63,13 +76,14 @@ const ( errVersionNotFound = "Version not found in deployment" errDeploymentDeleted = "worker deployment deleted" // returned in the race condition that the deployment is deleted but the workflow is not yet closed. errVersionDeleted = "worker deployment version deleted" // returned in the race condition that the deployment version is deleted but the workflow is not yet closed. + errLongHistory = "errLongHistory" // update is not accepted until CaN happens. client should retry + errVersionIsDraining = "errVersionIsDraining" + errVersionHasPollers = "errVersionHasPollersSuffix" errFailedPrecondition = "FailedPrecondition" - errVersionIsDrainingSuffix = "cannot be deleted since it is draining" - ErrVersionIsDraining = "version '%s' " + errVersionIsDrainingSuffix - errVersionHasPollersSuffix = "cannot be deleted since it has active pollers" - ErrVersionHasPollers = "version '%s' " + errVersionHasPollersSuffix + ErrVersionIsDraining = "version '%s' cannot be deleted since it is draining" + ErrVersionHasPollers = "version '%s' cannot be deleted since it has active pollers" ErrVersionIsCurrentOrRamping = "version '%s' cannot be deleted since it is current or ramping" ErrRampingVersionDoesNotHaveAllTaskQueues = "proposed ramping version '%s' is missing active task queues from the current version; these would become unversioned if it is set as the ramping version" @@ -77,6 +91,7 @@ const ( ErrManagerIdentityMismatch = "ManagerIdentity '%s' is set and does not match user identity '%s'; to proceed, set your own identity as the ManagerIdentity, remove the ManagerIdentity, or wait for the other client to do so" ErrWorkerDeploymentNotFound = "no Worker Deployment found with name '%s'; does your Worker Deployment have pollers?" ErrWorkerDeploymentVersionNotFound = "build ID '%s' not found in Worker Deployment '%s'" + ErrTooManyRequests = "too many requests issued to the same Worker Deployment. Please try again later" ) var ( @@ -159,9 +174,248 @@ func durationEq(a, b any) bool { return a == b } -// isFailedPrecondition checks if the error is a FailedPrecondition error. It also checks if the FailedPrecondition error is wrapped in an ApplicationError. -func isFailedPrecondition(err error) bool { +func isFailedPreconditionOrNotFound(err error) bool { var failedPreconditionError *serviceerror.FailedPrecondition - var applicationError *temporal.ApplicationError - return errors.As(err, &failedPreconditionError) || (errors.As(err, &applicationError) && applicationError.Type() == errFailedPrecondition) + var notFound *serviceerror.NotFound + return errors.As(err, &failedPreconditionError) || errors.As(err, ¬Found) +} + +// update updates an already existing deployment version/deployment workflow. +func updateWorkflow( + ctx context.Context, + historyClient historyservice.HistoryServiceClient, + namespaceEntry *namespace.Namespace, + workflowID string, + updateRequest *updatepb.Request, +) (*updatepb.Outcome, error) { + updateReq := &historyservice.UpdateWorkflowExecutionRequest{ + NamespaceId: namespaceEntry.ID().String(), + Request: &workflowservice.UpdateWorkflowExecutionRequest{ + Namespace: namespaceEntry.Name().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + }, + Request: updateRequest, + WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED}, + }, + } + + var outcome *updatepb.Outcome + err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { + // historyClient retries internally on retryable rpc errors, we just have to retry on + // successful but un-completed responses. + res, err := historyClient.UpdateWorkflowExecution(ctx, updateReq) + if err != nil { + return err + } + + if err := convertUpdateFailure(res.GetResponse()); err != nil { + return err + } + + outcome = res.GetResponse().GetOutcome() + return nil + }, retryPolicy, isRetryableUpdateError) + + return outcome, err +} + +// extractApplicationErrorOrInternal extract application error from update failure preserving error type and retriability. +// If the failure is no-nil but not an application error, it returns an internal error. +func extractApplicationErrorOrInternal(failure *failurepb.Failure) error { + if failure != nil { + if af := failure.GetApplicationFailureInfo(); af != nil { + if af.GetNonRetryable() { + return temporal.NewNonRetryableApplicationError(failure.GetMessage(), af.GetType(), nil) + } + return temporal.NewApplicationError(failure.GetMessage(), af.GetType(), nil) + } + return serviceerror.NewInternal(failure.Message) + } + return nil +} + +func updateWorkflowWithStart( + ctx context.Context, + historyClient historyservice.HistoryServiceClient, + namespaceEntry *namespace.Namespace, + workflowType string, + workflowID string, + memo *commonpb.Memo, + input *commonpb.Payloads, + updateRequest *updatepb.Request, + identity string, + requestID string, +) (*updatepb.Outcome, error) { + // Start workflow execution, if it hasn't already + startReq := makeStartRequest(requestID, workflowID, identity, workflowType, namespaceEntry, memo, input) + + updateReq := &workflowservice.UpdateWorkflowExecutionRequest{ + Namespace: namespaceEntry.Name().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + }, + Request: updateRequest, + WaitPolicy: &updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED}, + } + + // This is an atomic operation; if one operation fails, both will. + multiOpReq := &historyservice.ExecuteMultiOperationRequest{ + NamespaceId: namespaceEntry.ID().String(), + WorkflowId: workflowID, + Operations: []*historyservice.ExecuteMultiOperationRequest_Operation{ + { + Operation: &historyservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ + StartWorkflow: &historyservice.StartWorkflowExecutionRequest{ + NamespaceId: namespaceEntry.ID().String(), + StartRequest: startReq, + }, + }, + }, + { + Operation: &historyservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow{ + UpdateWorkflow: &historyservice.UpdateWorkflowExecutionRequest{ + NamespaceId: namespaceEntry.ID().String(), + Request: updateReq, + }, + }, + }, + }, + } + + var outcome *updatepb.Outcome + + err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { + // historyClient retries internally on retryable rpc errors, we just have to retry on + // successful but un-completed responses. + res, err := historyClient.ExecuteMultiOperation(ctx, multiOpReq) + if err != nil { + return err + } + + // we should get exactly one of each of these + var startRes *historyservice.StartWorkflowExecutionResponse + var updateRes *workflowservice.UpdateWorkflowExecutionResponse + for _, response := range res.Responses { + if sr := response.GetStartWorkflow(); sr != nil { + startRes = sr + } else if ur := response.GetUpdateWorkflow().GetResponse(); ur != nil { + updateRes = ur + } + } + if startRes == nil { + return serviceerror.NewInternal("failed to start deployment workflow") + } + + if err := convertUpdateFailure(updateRes); err != nil { + return err + } + + outcome = updateRes.GetOutcome() + return nil + }, retryPolicy, isRetryableUpdateError) + + return outcome, err +} + +func convertUpdateFailure(updateRes *workflowservice.UpdateWorkflowExecutionResponse) error { + if updateRes == nil { + return serviceerror.NewInternal("failed to update deployment workflow") + } + + if updateRes.Stage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { + // update not completed, try again + return errUpdateInProgress + } + + if failure := updateRes.GetOutcome().GetFailure(); failure != nil { + if failure.GetApplicationFailureInfo().GetType() == errLongHistory { + // Retriable + return errWorkflowHistoryTooLong + } else if failure.GetApplicationFailureInfo().GetType() == errVersionDeleted { + // Non-retriable + return serviceerror.NewNotFoundf("Worker Deployment Version not found") + } else if failure.GetApplicationFailureInfo().GetType() == errDeploymentDeleted { + // Non-retriable + return serviceerror.NewNotFoundf("Worker Deployment not found") + } else if failure.GetApplicationFailureInfo().GetType() == errFailedPrecondition { + return serviceerror.NewFailedPrecondition(failure.GetMessage()) + } + + // we let caller handle other update failures + } else if updateRes.GetOutcome().GetSuccess() == nil { + return serviceerror.NewInternal("outcome missing success and failure") + } + return nil +} + +func isRetryableQueryError(err error) bool { + var internalErr *serviceerror.Internal + return api.IsRetryableError(err) && !errors.As(err, &internalErr) +} + +func isRetryableUpdateError(err error) bool { + if errors.Is(err, errUpdateInProgress) || errors.Is(err, errWorkflowHistoryTooLong) || + err.Error() == consts.ErrWorkflowClosing.Error() || err.Error() == update2.AbortedByServerErr.Error() { + return true + } + + var errResourceExhausted *serviceerror.ResourceExhausted + if errors.As(err, &errResourceExhausted) && + (errResourceExhausted.Cause == enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT || + errResourceExhausted.Cause == enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW) { + // We're hitting the max concurrent update limit for the wf. Retrying will eventually succeed. + return true + } + + var errWfNotReady *serviceerror.WorkflowNotReady + if errors.As(err, &errWfNotReady) { + // Update edge cases, can retry. + return true + } + + // All updates that are admitted as the workflow is closing due to CaN are considered retryable. + // The ErrWorkflowClosing and ResourceExhausted could be nested. + var errMultiOps *serviceerror.MultiOperationExecution + if errors.As(err, &errMultiOps) { + for _, e := range errMultiOps.OperationErrors() { + if errors.As(e, &errResourceExhausted) && + (errResourceExhausted.Cause == enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT || + errResourceExhausted.Cause == enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW) { + // We're hitting the max concurrent update limit for the wf. Retrying will eventually succeed. + return true + } + if e.Error() == consts.ErrWorkflowClosing.Error() || e.Error() == update2.AbortedByServerErr.Error() { + return true + } + } + } + return false +} + +func makeStartRequest( + requestID, workflowID, identity, workflowType string, + namespaceEntry *namespace.Namespace, + memo *commonpb.Memo, + input *commonpb.Payloads, +) *workflowservice.StartWorkflowExecutionRequest { + return &workflowservice.StartWorkflowExecutionRequest{ + RequestId: requestID, + Namespace: namespaceEntry.Name().String(), + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue}, + Input: input, + WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, + SearchAttributes: buildSearchAttributes(), + Memo: memo, + Identity: identity, + } +} + +func buildSearchAttributes() *commonpb.SearchAttributes { + sa := &commonpb.SearchAttributes{} + searchattribute.AddSearchAttribute(&sa, sadefs.TemporalNamespaceDivision, payload.EncodeString(WorkerDeploymentNamespaceDivision)) + return sa } diff --git a/service/worker/workerdeployment/version_workflow.go b/service/worker/workerdeployment/version_workflow.go index 79de1f95d7..163db1f47b 100644 --- a/service/worker/workerdeployment/version_workflow.go +++ b/service/worker/workerdeployment/version_workflow.go @@ -204,17 +204,23 @@ func (d *VersionWorkflowRunner) run(ctx workflow.Context) error { return err } - // First ensure deployment workflow is running - if !d.VersionState.StartedDeploymentWorkflow { - activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) - err := workflow.ExecuteActivity(activityCtx, d.a.StartWorkerDeploymentWorkflow, &deploymentspb.StartWorkerDeploymentRequest{ - DeploymentName: d.VersionState.Version.DeploymentName, - RequestId: d.newUUID(ctx), - }).Get(ctx, nil) - if err != nil { - return err + // Deployment workflow should always be running before starting the version workflow. + // We should not start the deployment workflow. If we cannot find the deployment workflow when signaling, it means a bug and we should fix it. + if !d.hasMinVersion(VersionDataRevisionNumber) { + // First ensure deployment workflow is running + //nolint:staticcheck // SA1019 + if !d.VersionState.StartedDeploymentWorkflow { + activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) + err := workflow.ExecuteActivity(activityCtx, d.a.StartWorkerDeploymentWorkflow, &deploymentspb.StartWorkerDeploymentRequest{ + DeploymentName: d.VersionState.Version.DeploymentName, + RequestId: d.newUUID(ctx), + }).Get(ctx, nil) + if err != nil { + return err + } + //nolint:staticcheck // SA1019 + d.VersionState.StartedDeploymentWorkflow = true } - d.VersionState.StartedDeploymentWorkflow = true } // Listen to signals in a different goroutine to make business logic clearer @@ -244,14 +250,22 @@ func (d *VersionWorkflowRunner) run(ctx workflow.Context) error { } func (d *VersionWorkflowRunner) validateUpdateVersionMetadata(args *deploymentspb.UpdateVersionMetadataArgs) error { + return d.ensureNotDeleted() +} + +func (d *VersionWorkflowRunner) ensureNotDeleted() error { if d.deleteVersion { - // Deployment workflow should not call this function if version is marked for deletion, but still checking for safety. + // Deployment workflow should not call updates if version is marked for deletion, but still checking for safety. return temporal.NewNonRetryableApplicationError(errVersionDeleted, errVersionDeleted, nil) } return nil } func (d *VersionWorkflowRunner) handleUpdateVersionMetadata(ctx workflow.Context, args *deploymentspb.UpdateVersionMetadataArgs) (*deploymentspb.UpdateVersionMetadataResponse, error) { + if err := d.preUpdateChecks(ctx); err != nil { + return nil, err + } + if d.VersionState.Metadata == nil && args.UpsertEntries != nil { d.VersionState.Metadata = &deploymentpb.VersionMetadata{} d.VersionState.Metadata.Entries = make(map[string]*commonpb.Payload) @@ -311,6 +325,10 @@ func (d *VersionWorkflowRunner) validateDeleteVersion(args *deploymentspb.Delete } func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context, args *deploymentspb.DeleteVersionArgs) error { + if err := d.preUpdateChecks(ctx); err != nil { + return err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -324,11 +342,19 @@ func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context, args * d.lock.Unlock() }() - // wait until deployment workflow started - err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) - if err != nil { - d.logger.Error("Update canceled before worker deployment workflow started") - return serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") + if d.deleteVersion { + // already deleted, returning success so it is idempotent + return nil + } + + if !d.hasMinVersion(VersionDataRevisionNumber) { + // wait until deployment workflow started + //nolint:staticcheck // SA1019 + err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) + if err != nil { + d.logger.Error("Update canceled before worker deployment workflow started") + return serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") + } } activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions) @@ -342,7 +368,7 @@ func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context, args * if !args.SkipDrainage { if d.GetVersionState().GetDrainageInfo().GetStatus() == enumspb.VERSION_DRAINAGE_STATUS_DRAINING { // activity won't retry on this error since version not eligible for deletion - return serviceerror.NewFailedPrecondition(ErrVersionIsDraining) + return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionIsDraining, worker_versioning.WorkerDeploymentVersionToStringV32(d.VersionState.GetVersion())), errVersionIsDraining, nil) } } @@ -350,7 +376,7 @@ func (d *VersionWorkflowRunner) handleDeleteVersion(ctx workflow.Context, args * hasPollers, err := d.doesVersionHaveActivePollers(ctx) if hasPollers { // activity won't retry on this error since version not eligible for deletion - return serviceerror.NewFailedPrecondition(ErrVersionHasPollers) + return temporal.NewNonRetryableApplicationError(fmt.Sprintf(ErrVersionHasPollers, worker_versioning.WorkerDeploymentVersionToStringV32(d.VersionState.GetVersion())), errVersionHasPollers, nil) } if err != nil { // some other error allowing activity retries @@ -458,6 +484,8 @@ func (d *VersionWorkflowRunner) doesVersionHaveActivePollers(ctx workflow.Contex } func (d *VersionWorkflowRunner) validateRegisterWorker(args *deploymentspb.RegisterWorkerInVersionArgs) error { + // Should not ensure not deleted, instead the version would revive if deleted. + if _, ok := d.VersionState.TaskQueueFamilies[args.TaskQueueName].GetTaskQueues()[int32(args.TaskQueueType)]; ok { return temporal.NewApplicationError("task queue already exists in deployment version", errNoChangeType) } @@ -471,6 +499,13 @@ func (d *VersionWorkflowRunner) validateRegisterWorker(args *deploymentspb.Regis } func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error { + // Should not ensure not deleted, instead the version would revive if deleted. + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + // History is too large, do not accept new updates until wf CaNs. + // Since this needs workflow context we cannot do it in validators. + return temporal.NewApplicationError(errLongHistory, errLongHistory) + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -514,6 +549,12 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args if d.VersionState.TaskQueueFamilies[args.TaskQueueName].TaskQueues == nil { d.VersionState.TaskQueueFamilies[args.TaskQueueName].TaskQueues = make(map[int32]*deploymentspb.TaskQueueVersionData) } + + if _, ok := d.VersionState.TaskQueueFamilies[args.TaskQueueName].TaskQueues[int32(args.TaskQueueType)]; ok { + // already registered, returning success so it is idempotent + return nil + } + d.VersionState.TaskQueueFamilies[args.TaskQueueName].TaskQueues[int32(args.TaskQueueType)] = &deploymentspb.TaskQueueVersionData{} if withRevisionNumbers && args.GetRoutingConfig() != nil { @@ -583,10 +624,10 @@ func (d *VersionWorkflowRunner) versionDataToSync() *deploymentspb.WorkerDeploym // If routing update time has changed then we want to let the update through. func (d *VersionWorkflowRunner) validateSyncState(args *deploymentspb.SyncVersionStateUpdateArgs) error { - if d.deleteVersion { - // Deployment workflow should not call this function if version is marked for deletion, but still checking for safety. - return temporal.NewNonRetryableApplicationError(errVersionDeleted, errVersionDeleted, nil) + if err := d.ensureNotDeleted(); err != nil { + return err } + res := &deploymentspb.SyncVersionStateResponse{VersionState: d.VersionState} if args.GetRoutingUpdateTime().AsTime().Equal(d.GetVersionState().GetRoutingUpdateTime().AsTime()) { return temporal.NewApplicationError("no change", errNoChangeType, res) @@ -596,6 +637,10 @@ func (d *VersionWorkflowRunner) validateSyncState(args *deploymentspb.SyncVersio //nolint:staticcheck // SA1019 func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *deploymentspb.SyncVersionStateUpdateArgs) (*deploymentspb.SyncVersionStateResponse, error) { + if err := d.preUpdateChecks(ctx); err != nil { + return nil, err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -613,11 +658,13 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl return nil, err } - // wait until deployment workflow started - err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) - if err != nil { - d.logger.Error("Update canceled before worker deployment workflow started") - return nil, serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") + if !d.hasMinVersion(VersionDataRevisionNumber) { + // wait until deployment workflow started + err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow }) + if err != nil { + d.logger.Error("Update canceled before worker deployment workflow started") + return nil, serviceerror.NewDeadlineExceeded("Update canceled before worker deployment workflow started") + } } state := d.GetVersionState() @@ -681,6 +728,20 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl }, nil } +func (d *VersionWorkflowRunner) preUpdateChecks(ctx workflow.Context) error { + err := d.ensureNotDeleted() + if err != nil { + return err + } + + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + // History is too large, do not accept new updates until wf CaNs. + // Since this needs workflow context we cannot do it in validators. + return temporal.NewApplicationError(errLongHistory, errLongHistory) + } + return nil +} + // updateStateFromRoutingConfig updates the version state based on routing config received from Deployment. // returns true if any information that affects version data to be synced in TQs was changed. func (d *VersionWorkflowRunner) updateStateFromRoutingConfig( @@ -812,8 +873,6 @@ func (d *VersionWorkflowRunner) refreshDrainageInfo(ctx workflow.Context) { return } - d.metrics.Counter(metrics.WorkerDeploymentVersionVisibilityQueryCount.Name()).Inc(1) - if d.hasMinVersion(VersionDataRevisionNumber) { // Need to lock so there is no race condition with setCurrent/Ramping if err = d.lock.Lock(ctx); err != nil { diff --git a/service/worker/workerdeployment/version_workflow_test.go b/service/worker/workerdeployment/version_workflow_test.go index f58ad7cbee..b59d5783e8 100644 --- a/service/worker/workerdeployment/version_workflow_test.go +++ b/service/worker/workerdeployment/version_workflow_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/suite" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" "go.temporal.io/sdk/workflow" deploymentspb "go.temporal.io/server/api/deployment/v1" @@ -552,7 +553,9 @@ func (s *VersionWorkflowSuite) Test_DeleteVersion_FailsWhenDraining() { OnAccept: func() {}, OnComplete: func(result interface{}, err error) { s.Require().Error(err, "delete version should fail when version is draining") - s.Contains(err.Error(), ErrVersionIsDraining) + var applicationError *temporal.ApplicationError + s.Require().ErrorAs(err, &applicationError) + s.Equal(errVersionIsDraining, applicationError.Type()) }, }, deleteArgs) }, 1*time.Millisecond) @@ -683,7 +686,9 @@ func (s *VersionWorkflowSuite) Test_DeleteVersion_FailsWithActivePollers() { OnAccept: func() {}, OnComplete: func(result interface{}, err error) { s.Require().Error(err, "delete version should fail when version has active pollers") - s.Contains(err.Error(), ErrVersionHasPollers) + var applicationError *temporal.ApplicationError + s.Require().ErrorAs(err, &applicationError) + s.Equal(errVersionHasPollers, applicationError.Type()) }, }, deleteArgs) }, 1*time.Millisecond) diff --git a/service/worker/workerdeployment/workflow.go b/service/worker/workerdeployment/workflow.go index c44109a52e..06a7cb03cc 100644 --- a/service/worker/workerdeployment/workflow.go +++ b/service/worker/workerdeployment/workflow.go @@ -148,6 +148,7 @@ func (d *WorkflowRunner) handlePropagationComplete(completion *deploymentspb.Pro d.logger.Error("Received propagation complete signal, but no in-progress propagations found", "revision", revisionNumber, "build_id", buildID) + return } // Remove this revision from in-progress tracking for this build @@ -304,18 +305,6 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { return err } - // to-be-deprecated - if err := workflow.SetUpdateHandlerWithOptions( - ctx, - AddVersionToWorkerDeployment, - d.handleAddVersionToWorkerDeployment, - workflow.UpdateHandlerOptions{ - Validator: d.validateAddVersionToWorkerDeployment, - }, - ); err != nil { - return err - } - if err := workflow.SetUpdateHandlerWithOptions( ctx, DeleteVersion, @@ -393,6 +382,27 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error { return workflow.NewContinueAsNewError(ctx, WorkerDeploymentWorkflowType, d.WorkerDeploymentWorkflowArgs) } +func (d *WorkflowRunner) preUpdateChecks(ctx workflow.Context) error { + err := d.ensureNotDeleted() + if err != nil { + return err + } + + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + // History is too large, do not accept new updates until wf CaNs. + // Since this needs workflow context we cannot do it in validators. + return temporal.NewApplicationError(errLongHistory, errLongHistory) + } + return nil +} + +func (d *WorkflowRunner) ensureNotDeleted() error { + if d.deleteDeployment { + return temporal.NewNonRetryableApplicationError(errDeploymentDeleted, errDeploymentDeleted, nil) + } + return nil +} + func (d *WorkflowRunner) addVersionToWorkerDeployment(ctx workflow.Context, args *deploymentspb.AddVersionUpdateArgs) error { if d.State.Versions == nil { return nil @@ -424,6 +434,15 @@ func (d *WorkflowRunner) addVersionToWorkerDeployment(ctx workflow.Context, args } func (d *WorkflowRunner) handleRegisterWorker(ctx workflow.Context, args *deploymentspb.RegisterWorkerInWorkerDeploymentArgs) error { + // TODO: there is a small race condition where the deployment is just deleted and got a register update before closing itself. + // In that case, we should ideally not reject the update, but revive the workflow so that the caller does not need to retry. + // In practice this should be fine because the polls will retry and Deployment workflows are short-lived. + // Same principle applies for Version workflows, but they can be slightly more long-lived of they are handling long propagations. + // Hence, the revive logic is implemented in Version workflow but not here yet. + if err := d.ensureNotDeleted(); err != nil { + return err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -483,8 +502,17 @@ func (d *WorkflowRunner) validateDeleteDeployment() error { } func (d *WorkflowRunner) handleDeleteDeployment(ctx workflow.Context) error { - // Even if the update doesn't change the state we mark it as dirty because of created history events. - defer d.setStateChanged() + // use lock to enforce only one update at a time + err := d.lock.Lock(ctx) + if err != nil { + d.logger.Error("Could not acquire workflow lock") + return serviceerror.NewDeadlineExceeded("Could not acquire workflow lock") + } + defer func() { + // Even if the update doesn't change the state we mark it as dirty because of created history events. + d.setStateChanged() + d.lock.Unlock() + }() if len(d.State.Versions) == 0 { d.deleteDeployment = true @@ -530,12 +558,19 @@ func (d *WorkflowRunner) validateStateBeforeAcceptingRampingUpdate(args *deploym } func (d *WorkflowRunner) validateSetRampingVersion(args *deploymentspb.SetRampingVersionArgs) error { + if err := d.ensureNotDeleted(); err != nil { + return err + } return d.validateStateBeforeAcceptingRampingUpdate(args) } //revive:disable-next-line:cognitive-complexity //nolint:staticcheck // deprecated stuff will be cleaned func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *deploymentspb.SetRampingVersionArgs) (*deploymentspb.SetRampingVersionResponse, error) { + if err := d.preUpdateChecks(ctx); err != nil { + return nil, err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -746,9 +781,10 @@ func (d *WorkflowRunner) setRamp( ) } } - - // Erase summary drainage status immediately, so it is not draining/drained. - d.setDrainageStatus(newRampingVersion, enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED, routingUpdateTime) + if !asyncMode { + // Erase summary drainage status immediately, so it is not draining/drained. + d.setDrainageStatus(newRampingVersion, enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED, routingUpdateTime) + } } setRampUpdateArgs := &deploymentspb.SyncVersionStateUpdateArgs{ @@ -811,9 +847,11 @@ func (d *WorkflowRunner) unsetPreviousRamp( return err } } - // Set summary drainage status immediately to draining. - // We know prevRampingVersion cannot have been current, so it must now be draining - d.setDrainageStatus(prevRampingVersion, enumspb.VERSION_DRAINAGE_STATUS_DRAINING, routingUpdateTime) + if !asyncMode { + // Set summary drainage status immediately to draining. + // We know prevRampingVersion cannot have been current, so it must now be draining + d.setDrainageStatus(prevRampingVersion, enumspb.VERSION_DRAINAGE_STATUS_DRAINING, routingUpdateTime) + } return nil } @@ -829,6 +867,10 @@ func (d *WorkflowRunner) setDrainageStatus(version string, status enumspb.Versio } func (d *WorkflowRunner) validateDeleteVersion(args *deploymentspb.DeleteVersionArgs) error { + if err := d.ensureNotDeleted(); err != nil { + return err + } + if _, ok := d.State.Versions[args.Version]; !ok { return temporal.NewApplicationError("version not found in deployment", errVersionNotFound) } @@ -862,6 +904,12 @@ func (d *WorkflowRunner) deleteVersion(ctx workflow.Context, args *deploymentspb AsyncPropagation: d.hasMinVersion(AsyncSetCurrentAndRamping), }).Get(ctx, &res) if err != nil { + var activityError *temporal.ActivityError + var applicationError *temporal.ApplicationError + if errors.As(err, &activityError) && errors.As(activityError.Unwrap(), &applicationError) && + (applicationError.Type() == errVersionHasPollers || applicationError.Type() == errVersionIsDraining) { + return serviceerror.NewFailedPrecondition(applicationError.Message()) + } return err } // update local state @@ -876,6 +924,10 @@ func (d *WorkflowRunner) deleteVersion(ctx workflow.Context, args *deploymentspb } func (d *WorkflowRunner) handleDeleteVersion(ctx workflow.Context, args *deploymentspb.DeleteVersionArgs) error { + if err := d.preUpdateChecks(ctx); err != nil { + return err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -912,10 +964,17 @@ func (d *WorkflowRunner) validateStateBeforeAcceptingSetManager(args *deployment } func (d *WorkflowRunner) validateSetManager(args *deploymentspb.SetManagerIdentityArgs) error { + if err := d.ensureNotDeleted(); err != nil { + return err + } return d.validateStateBeforeAcceptingSetManager(args) } func (d *WorkflowRunner) handleSetManager(ctx workflow.Context, args *deploymentspb.SetManagerIdentityArgs) (*deploymentspb.SetManagerIdentityResponse, error) { + if err := d.preUpdateChecks(ctx); err != nil { + return nil, err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -968,11 +1027,19 @@ func (d *WorkflowRunner) validateStateBeforeAcceptingSetCurrent(args *deployment } func (d *WorkflowRunner) validateSetCurrent(args *deploymentspb.SetCurrentVersionArgs) error { + if err := d.ensureNotDeleted(); err != nil { + return err + } + return d.validateStateBeforeAcceptingSetCurrent(args) } //nolint:staticcheck // deprecated stuff will be cleaned func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deploymentspb.SetCurrentVersionArgs) (*deploymentspb.SetCurrentVersionResponse, error) { + if err := d.preUpdateChecks(ctx); err != nil { + return nil, err + } + // use lock to enforce only one update at a time err := d.lock.Lock(ctx) if err != nil { @@ -1118,9 +1185,11 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment if _, err := d.syncVersion(ctx, prevCurrentVersion, prevUpdateArgs); err != nil { return nil, err } - // Set summary drainage status immediately to draining. - // We know prevCurrentVersion cannot have been ramping, so it must now be draining - d.setDrainageStatus(prevCurrentVersion, enumspb.VERSION_DRAINAGE_STATUS_DRAINING, updateTime) + if !asyncMode { + // Set summary drainage status immediately to draining. + // We know prevCurrentVersion cannot have been ramping, so it must now be draining + d.setDrainageStatus(prevCurrentVersion, enumspb.VERSION_DRAINAGE_STATUS_DRAINING, updateTime) + } } //nolint:staticcheck // deprecated stuff will be cleaned @@ -1192,28 +1261,6 @@ func (d *WorkflowRunner) getMaxVersions(ctx workflow.Context) int { return maxVersions } -// to-be-deprecated -func (d *WorkflowRunner) handleAddVersionToWorkerDeployment(ctx workflow.Context, args *deploymentspb.AddVersionUpdateArgs) error { - // Even if the update doesn't change the state we mark it as dirty because of created history events. - defer d.setStateChanged() - - maxVersions := d.getMaxVersions(ctx) - - if len(d.State.Versions) >= maxVersions { - err := d.tryDeleteVersion(ctx) - if err != nil { - return temporal.NewApplicationError(fmt.Sprintf("cannot add version, already at max versions %d", maxVersions), errTooManyVersions) - } - } - - d.State.Versions[args.Version] = &deploymentspb.WorkerDeploymentVersionSummary{ - Version: args.Version, - CreateTime: args.CreateTime, - } - - return nil -} - func (d *WorkflowRunner) tryDeleteVersion(ctx workflow.Context) error { sortedSummaries := d.sortedSummaries() for _, v := range sortedSummaries { diff --git a/tests/worker_deployment_test.go b/tests/worker_deployment_test.go index 05a2205eee..be637f0440 100644 --- a/tests/worker_deployment_test.go +++ b/tests/worker_deployment_test.go @@ -19,6 +19,8 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" sdkworker "go.temporal.io/sdk/worker" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/testing/testhooks" "go.temporal.io/server/common/testing/testvars" @@ -71,7 +73,6 @@ func (s *WorkerDeploymentSuite) SetupSuite() { dynamicconfig.MatchingMaxTaskQueuesInDeploymentVersion.Key(): 1000, dynamicconfig.VisibilityPersistenceSlowQueryThreshold.Key(): 60 * time.Second, - dynamicconfig.WorkflowExecutionMaxInFlightUpdates.Key(): 1000, })) } @@ -1872,8 +1873,8 @@ func (s *WorkerDeploymentSuite) TestConcurrentPollers_DifferentTaskQueues_SameVe // start 10 different pollers each polling on a different task queue but belonging to the same version tv := testvars.New(s) - versions := 10 - for i := 0; i < versions; i++ { + tqs := 10 + for i := 0; i < tqs; i++ { go s.startVersionWorkflow(ctx, tv.WithTaskQueueNumber(i)) } @@ -1881,7 +1882,7 @@ func (s *WorkerDeploymentSuite) TestConcurrentPollers_DifferentTaskQueues_SameVe s.setCurrentVersion(ctx, tv, false, "") // verify that the task queues, eventually, have this version as the current version in their versioning info - for i := 0; i < versions; i++ { + for i := 0; i < tqs; i++ { s.verifyTaskQueueVersioningInfo(ctx, tv.WithTaskQueueNumber(i).TaskQueue(), tv.DeploymentVersionString(), "", 0) } } @@ -1987,6 +1988,164 @@ func (s *WorkerDeploymentSuite) TestSetRampingVersion_Concurrent_SameVersion_NoU s.Equal(tv.DeploymentVersionString(), resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingVersion()) } +// TODO: this test reliably produces a rare error seemingly about speculative tasks. use it for debugging the error +// ex: "history_events: premature end of stream, expectedLastEventID=13 but no more events after eventID=11" +// (error does not seems to be related to versioning) +func (s *WorkerDeploymentSuite) TestConcurrentPollers_ManyTaskQueues_RapidRoutingUpdates_RevisionConsistency() { + // This test should work for InitialVersion, but it takes much longer (4m vs 1m15s vs 55s, in order, for 50 TQs). + // Also skipping for AsyncSetCurrentAndRampingVersion, to reduce flake chance. + s.skipBeforeVersion(workerdeployment.VersionDataRevisionNumber) + + // Start pollers on many task queues across 3 versions + numTaskQueues := 50 + numVersions := 3 + syncBatchSize := 2 // reducing batch size to cause more delay + numOperations := 20 + + s.OverrideDynamicConfig(dynamicconfig.MatchingMaxTaskQueuesInDeploymentVersion, numTaskQueues) + s.InjectHook(testhooks.TaskQueuesInDeploymentSyncBatchSize, syncBatchSize) + s.InjectHook(testhooks.MatchingDeploymentRegisterErrorBackoff, time.Millisecond*500) + + // Need to increase max pending activities because it is set only to 10 for functional tests. it's 2000 by default. + s.OverrideDynamicConfig(dynamicconfig.NumPendingActivitiesLimitError, numOperations) + + tv := testvars.New(s) + dn := tv.DeploymentVersion().GetDeploymentName() + start := time.Now() + + // For each version send pollers regularly until all TQs are registered from DescribeVersion POV + for i := 0; i < numVersions; i++ { + pollCtx, cancelPollers := context.WithTimeout(context.Background(), 5*time.Minute) + + sendPollers := func() { + for j := 0; j < numTaskQueues; j++ { + go s.pollFromDeployment(pollCtx, tv.WithBuildIDNumber(i).WithTaskQueueNumber(j)) + } + } + + sendPollers() + + // Send new pollers regularly, this is needed because the registration might take more time than initial pollers + t := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-t.C: + sendPollers() + case <-pollCtx.Done(): + return + } + } + }() + + // Wait for all task queues to be added to versions + + // Wait for the versions to be created + s.EventuallyWithT(func(t *assert.CollectT) { + a := require.New(t) + resp, err := s.FrontendClient().DescribeWorkerDeployment(pollCtx, &workflowservice.DescribeWorkerDeploymentRequest{ + Namespace: s.Namespace().String(), + DeploymentName: dn, + }) + a.NoError(err) + a.NotNil(resp.GetWorkerDeploymentInfo()) + a.Len(resp.GetWorkerDeploymentInfo().GetVersionSummaries(), i+1) + }, 3*time.Minute, 500*time.Millisecond) + + fmt.Printf(">>> Time taken version %d added: %v\n", i, time.Since(start)) + + s.EventuallyWithT(func(t *assert.CollectT) { + a := require.New(t) + resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(pollCtx, &workflowservice.DescribeWorkerDeploymentVersionRequest{ + Namespace: s.Namespace().String(), + DeploymentVersion: tv.WithBuildIDNumber(i).ExternalDeploymentVersion(), + }) + a.NoError(err) + a.Len(resp.GetVersionTaskQueues(), numTaskQueues) + }, 5*time.Minute, 1000*time.Millisecond) + + t.Stop() + cancelPollers() + + fmt.Printf(">>> Time taken registration for version %d: %v\n", i, time.Since(start)) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + // Rapidly perform 20 setCurrent and setRamping operations, each targeting one of the 3 versions + for i := 0; i < numOperations; i++ { + // Alternate between setCurrent and setRamping + targetVersion := i % numVersions + versionTV := tv.WithBuildIDNumber(targetVersion) + for { + var err error + if i%2 == 0 { + // setCurrent operation + _, err = s.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, &workflowservice.SetWorkerDeploymentCurrentVersionRequest{ + Namespace: s.Namespace().String(), + DeploymentName: dn, + BuildId: versionTV.DeploymentVersion().GetBuildId(), + IgnoreMissingTaskQueues: true, + Identity: tv.ClientIdentity(), + }) + } else { + // setRamping operation + _, err = s.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, &workflowservice.SetWorkerDeploymentRampingVersionRequest{ + Namespace: s.Namespace().String(), + DeploymentName: dn, + BuildId: versionTV.DeploymentVersion().GetBuildId(), + IgnoreMissingTaskQueues: true, + Identity: tv.ClientIdentity(), + Percentage: float32(50 + (i % 50)), + }) + } + if common.IsResourceExhausted(err) { + fmt.Printf("ResourceExhausted error, retrying operation %d\n", i) + time.Sleep(100 * time.Millisecond) //nolint:forbidigo // throttling requests + continue + } + s.NoError(err) + break + } + fmt.Printf(">>> Time taken operation %d: %v\n", i, time.Since(start)) + } + + fmt.Printf(">>> Time taken operations: %v\n", time.Since(start)) + + // Wait for the routing update status to be completed + var latestRoutingConfig *deploymentpb.RoutingConfig + s.EventuallyWithT(func(t *assert.CollectT) { + a := require.New(t) + resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{ + Namespace: s.Namespace().String(), + DeploymentName: dn, + }) + a.NoError(err) + a.NotNil(resp.GetWorkerDeploymentInfo()) + a.Equal(enumspb.ROUTING_CONFIG_UPDATE_STATE_COMPLETED, resp.GetWorkerDeploymentInfo().GetRoutingConfigUpdateState()) + latestRoutingConfig = resp.GetWorkerDeploymentInfo().GetRoutingConfig() + }, 120*time.Second, 1*time.Second) + + fmt.Printf(">>> Time taken propagation: %v\n", time.Since(start)) + + // Verify that the routing info revision number in each of the task queues matches the latest revision number + // Note: The public API doesn't expose revision numbers at the task queue level, so we verify that the + // versioning info has been propagated correctly by checking the current/ramping versions + for j := 0; j < numTaskQueues; j++ { + tqTV := tv.WithTaskQueueNumber(j) + tqUD, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueueType: tqTypeWf, + TaskQueue: tqTV.TaskQueue().GetName(), + }) + s.NoError(err) + s.Len(tqUD.GetUserData().GetData().GetPerType(), 1) + s.Len(tqUD.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData().GetDeploymentsData(), 1) + s.ProtoEqual(latestRoutingConfig, tqUD.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData().GetDeploymentsData()[dn].GetRoutingConfig()) + } +} + func (s *WorkerDeploymentSuite) TestResourceExhaustedErrors_Converted_To_ReadableMessage() { s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxInFlightUpdates, 2) // Lowering the limit to encounter ResourceExhausted errors @@ -3365,3 +3524,15 @@ func (s *WorkerDeploymentSuite) Name() string { ) return strings.Replace(short, ".", "|", -1) } + +func (s *WorkerDeploymentSuite) skipBeforeVersion(version workerdeployment.DeploymentWorkflowVersion) { + if s.workflowVersion < version { + s.T().Skipf("test supports version %v and newer", version) + } +} + +func (s *WorkerDeploymentSuite) skipFromVersion(version workerdeployment.DeploymentWorkflowVersion) { + if s.workflowVersion >= version { + s.T().Skipf("test supports version older than %v", version) + } +}