Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions api/deployment/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions config/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
### Worker Versioning Replay Test configs
### uncomment and set the right deploymentWorkflowVersion
#
#matching.deploymentWorkflowVersion:
# - value: 2 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion)
#matching.PollerHistoryTTL:
# - value: 1s
#matching.wv.VersionDrainageStatusVisibilityGracePeriod:
# - value: 5s
#matching.wv.VersionDrainageStatusRefreshInterval:
# - value: 5s
matching.deploymentWorkflowVersion:
- value: 2 # put the integer value of the workflow version you want to target (enum DeploymentWorkflowVersion)
matching.PollerHistoryTTL:
- value: 1s
matching.wv.VersionDrainageStatusVisibilityGracePeriod:
- value: 5s
matching.wv.VersionDrainageStatusRefreshInterval:
- value: 5s
#
### END of Worker Versioning Replay Test configs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ message VersionLocalState {
// Arbitrary user-provided metadata attached to this version.
temporal.api.deployment.v1.VersionMetadata metadata = 8;

bool started_deployment_workflow = 9;
// Deployment workflow should always be running before starting the version workflow.
// We should not start the deployment workflow. If we cannot find the deployment workflow when signaling, it means a bug and we should fix it.
// Deprecated.
bool started_deployment_workflow = 9 [deprecated = true];

// Key: Task Queue Name
map<string, TaskQueueFamilyData> task_queue_families = 10;
Expand Down
21 changes: 0 additions & 21 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ var (
)

const (
errTooManySetCurrentVersionRequests = "Too many SetWorkerDeploymentCurrentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
errTooManySetRampingVersionRequests = "Too many SetWorkerDeploymentRampingVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
errTooManyDeleteDeploymentRequests = "Too many DeleteWorkerDeployment requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
errTooManyDeleteVersionRequests = "Too many DeleteWorkerDeploymentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
errTooManyVersionMetadataRequests = "Too many UpdateWorkerDeploymentVersionMetadata requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."

maxReasonLength = 1000 // Maximum length for the reason field in RateLimitUpdate configurations.
defaultUserTerminateReason = "terminated by user via frontend"
defaultUserTerminateIdentity = "frontend-service"
Expand Down Expand Up @@ -3444,9 +3438,6 @@ func (wh *WorkflowHandler) SetWorkerDeploymentCurrentVersion(ctx context.Context

resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.Identity, request.IgnoreMissingTaskQueues, request.GetConflictToken(), request.GetAllowNoPollers())
if err != nil {
if common.IsResourceExhausted(err) {
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetCurrentVersionRequests)
}
return nil, err
}

Expand Down Expand Up @@ -3508,9 +3499,6 @@ func (wh *WorkflowHandler) SetWorkerDeploymentRampingVersion(ctx context.Context

resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.GetPercentage(), request.GetIdentity(), request.IgnoreMissingTaskQueues, request.GetConflictToken(), request.GetAllowNoPollers())
if err != nil {
if common.IsResourceExhausted(err) {
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetRampingVersionRequests)
}
return nil, err
}

Expand Down Expand Up @@ -3634,9 +3622,6 @@ func (wh *WorkflowHandler) DeleteWorkerDeployment(ctx context.Context, request *

err = wh.workerDeploymentClient.DeleteWorkerDeployment(ctx, namespaceEntry, request.DeploymentName, request.Identity)
if err != nil {
if common.IsResourceExhausted(err) {
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteDeploymentRequests)
}
return nil, err
}

Expand All @@ -3663,9 +3648,6 @@ func (wh *WorkflowHandler) DeleteWorkerDeploymentVersion(ctx context.Context, re

err = wh.workerDeploymentClient.DeleteWorkerDeploymentVersion(ctx, namespaceEntry, versionStr, request.SkipDrainage, request.Identity)
if err != nil {
if common.IsResourceExhausted(err) {
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteVersionRequests)
}
return nil, err
}

Expand Down Expand Up @@ -3697,9 +3679,6 @@ func (wh *WorkflowHandler) UpdateWorkerDeploymentVersionMetadata(ctx context.Con
identity := uuid.NewString()
updatedMetadata, err := wh.workerDeploymentClient.UpdateVersionMetadata(ctx, namespaceEntry, versionStr, request.UpsertEntries, request.RemoveEntries, identity)
if err != nil {
if common.IsResourceExhausted(err) {
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyVersionMetadataRequests)
}
return nil, err
}

Expand Down
16 changes: 9 additions & 7 deletions service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"time"

"github.com/nexus-rpc/sdk-go/nexus"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common"
backoff2 "go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/contextutil"
Expand Down Expand Up @@ -712,8 +714,11 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion(
return nil
}

// we need to update the deployment workflow to tell it about this task queue
// TODO: add some backoff here if we got an error last time
backoff, ok := testhooks.Get[time.Duration](c.partitionMgr.engine.testHooks, testhooks.MatchingDeploymentRegisterErrorBackoff)
if !ok {
backoff = deploymentRegisterErrorBackoff
}
backoff = backoff2.Jitter(backoff, .5)

err = c.partitionMgr.engine.workerDeploymentClient.RegisterTaskQueueWorker(
ctx, namespaceEntry, workerDeployment.SeriesName, workerDeployment.BuildId, c.queue.TaskQueueFamily().Name(), c.queue.TaskType(),
Expand All @@ -724,17 +729,14 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion(
return err
}
var errResourceExhausted *serviceerror.ResourceExhausted
if !errors.As(err, &errResourceExhausted) {
if !errors.As(err, &errResourceExhausted) || errResourceExhausted.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_WORKER_DEPLOYMENT_LIMITS {
// Do not surface low level error to user
// Also, we don't surface resource exhausted errors that are not about deployment limits as they are caused by our workflow-based implementation.
c.logger.Error("error while registering version", tag.Error(err))
err = errDeploymentVersionNotReady
}
// Before retrying the error, hold the poller for some time so it does not retry immediately
// Parallel polls are already serialized using the lock.
backoff := deploymentRegisterErrorBackoff
if testBackoff, ok := testhooks.Get[time.Duration](c.partitionMgr.engine.testHooks, testhooks.MatchingDeploymentRegisterErrorBackoff); ok {
backoff = testBackoff
}
time.Sleep(backoff)
return err
}
Expand Down
42 changes: 35 additions & 7 deletions service/worker/workerdeployment/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ package workerdeployment
import (
"cmp"
"context"
"fmt"
"sync"

enumspb "go.temporal.io/api/enums/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/sdk/activity"
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/worker_versioning"
)

type (
Activities struct {
namespace *namespace.Namespace
deploymentClient Client
matchingClient resource.MatchingClient
historyClient historyservice.HistoryServiceClient
}
)

Expand Down Expand Up @@ -141,20 +147,42 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo
}

func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error {
fmt.Printf("act Deleting version %s from deployment %s\n", args.Version, args.DeploymentName)
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.DeleteVersionFromWorkerDeployment(
versionObj, err := worker_versioning.WorkerDeploymentVersionFromStringV31(args.Version)
if err != nil {
return err
}

workflowID := GenerateVersionWorkflowID(args.DeploymentName, versionObj.GetBuildId())
updatePayload, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.DeleteVersionArgs{
Identity: identity,
Version: args.Version,
SkipDrainage: args.SkipDrainage,
AsyncPropagation: args.AsyncPropagation,
})
if err != nil {
return err
}

outcome, err := updateWorkflow(
ctx,
a.historyClient,
a.namespace,
args.DeploymentName,
args.Version,
identity,
args.RequestId,
args.SkipDrainage,
args.AsyncPropagation,
workflowID,
&updatepb.Request{
Input: &updatepb.Input{Name: DeleteVersion, Args: updatePayload},
Meta: &updatepb.Meta{UpdateId: args.RequestId, Identity: identity},
},
)
if err != nil {
return err
}

err = extractApplicationErrorOrInternal(outcome.GetFailure())
if err != nil {
return err
}
return nil
}

Expand Down
Loading
Loading