Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,12 @@ and deployment interaction in matching and history.`,
`EnableDeploymentVersions enables deployment versions (versioning v3) in all services,
including deployment-related RPCs in the frontend, deployment version entity workflows in the worker,
and deployment interaction in matching and history.`,
)
FrontendWorkerVersionExistsCacheTTL = NewGlobalDurationSetting(
"frontend.workerVersionExistsCacheTTL",
time.Second,
`FrontendWorkerVersionExistsCacheTTL is the TTL for caching worker version existence checks
when validating versioning overrides.`,
)
UseRevisionNumberForWorkerVersioning = NewNamespaceBoolSetting(
"system.useRevisionNumberForWorkerVersioning",
Expand Down
171 changes: 70 additions & 101 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,18 @@ const (
// WorkerDeploymentVersionIdDelimiterV31 will be deleted once we stop supporting v31 version string fields
// in external and internal APIs. Until then, both delimiters are banned in deployment name. All
// deprecated version string fields in APIs keep using the old delimiter. Workflow SA uses new delimiter.
WorkerDeploymentVersionIdDelimiterV31 = "."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review this PR when this one gets in: #8706

WorkerDeploymentVersionIdDelimiter = ":"
WorkerDeploymentVersionWorkflowIDPrefix = "temporal-sys-worker-deployment-version"
WorkerDeploymentWorkflowIDPrefix = "temporal-sys-worker-deployment"
WorkerDeploymentVersionWorkflowIDDelimeter = ":"
WorkerDeploymentVersionWorkflowIDEscape = "|"
WorkerDeploymentVersionIDDelimiterV31 = "."
WorkerDeploymentVersionWorkflowIDEscape = "|"

// Prefixes, Delimeters and Keys that are used in the internal entity workflows backing worker-versioning
WorkerDeploymentWorkflowIDPrefix = "temporal-sys-worker-deployment"
WorkerDeploymentVersionWorkflowIDPrefix = "temporal-sys-worker-deployment-version"
WorkerDeploymentVersionWorkflowIDDelimeter = ":"
WorkerDeploymentVersionWorkflowIDInitialSize = len(WorkerDeploymentVersionWorkflowIDPrefix) + len(WorkerDeploymentVersionWorkflowIDDelimeter) // 39
WorkerDeploymentNameFieldName = "WorkerDeploymentName"
WorkerDeploymentBuildIDFieldName = "BuildID"
)

// EscapeChar is a helper which escapes the BuildIdSearchAttributeDelimiter character
// in the input string
func escapeChar(s, escape, delimiter string) string {
s = strings.Replace(s, escape, escape+escape, -1)
s = strings.Replace(s, delimiter, escape+delimiter, -1)
return s
}

// PinnedBuildIdSearchAttribute creates the pinned search attribute for the BuildIds list, used as a visibility optimization.
// For pinned workflows using WorkerDeployment APIs (ms.GetEffectiveVersioningBehavior() == PINNED &&
// ms.executionInfo.VersioningInfo.Version != ""), this will be `pinned:<version>`. The version used
Expand Down Expand Up @@ -167,9 +163,9 @@ func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities
if b == "" {
return nil, serviceerror.NewInvalidArgumentf("versioned worker must have build id")
}
if strings.Contains(d, WorkerDeploymentVersionIdDelimiter) || strings.Contains(d, WorkerDeploymentVersionIdDelimiterV31) {
if strings.Contains(d, WorkerDeploymentVersionWorkflowIDDelimeter) || strings.Contains(d, WorkerDeploymentVersionIDDelimiterV31) {
// TODO: allow '.' once we get rid of v31 stuff
return nil, serviceerror.NewInvalidArgumentf("deployment name cannot contain '%s' or '%s'", WorkerDeploymentVersionIdDelimiter, WorkerDeploymentVersionIdDelimiterV31)
return nil, serviceerror.NewInvalidArgumentf("deployment name cannot contain '%s' or '%s'", WorkerDeploymentVersionWorkflowIDDelimeter, WorkerDeploymentVersionIDDelimiterV31)
}
return &deploymentpb.Deployment{
SeriesName: d,
Expand Down Expand Up @@ -428,28 +424,66 @@ func ValidateDeployment(deployment *deploymentpb.Deployment) error {
return serviceerror.NewInvalidArgument("deployment name cannot be empty")
}
// TODO: remove '.' restriction once the v31 version strings are completely cleaned from external and internal API
if strings.Contains(deployment.GetSeriesName(), WorkerDeploymentVersionIdDelimiterV31) ||
strings.Contains(deployment.GetSeriesName(), WorkerDeploymentVersionIdDelimiter) {
return serviceerror.NewInvalidArgumentf("deployment name cannot contain '%s' or '%s'", WorkerDeploymentVersionIdDelimiterV31, WorkerDeploymentVersionIdDelimiter)
if strings.Contains(deployment.GetSeriesName(), WorkerDeploymentVersionIDDelimiterV31) ||
strings.Contains(deployment.GetSeriesName(), WorkerDeploymentVersionWorkflowIDDelimeter) {
return serviceerror.NewInvalidArgumentf("deployment name cannot contain '%s' or '%s'", WorkerDeploymentVersionIDDelimiterV31, WorkerDeploymentVersionWorkflowIDDelimeter)
}
if deployment.GetBuildId() == "" {
return serviceerror.NewInvalidArgument("deployment build ID cannot be empty")
}
return nil
}

// ValidateDeploymentVersion returns error if the deployment version is nil or it has empty version
// or deployment name.
func ValidateDeploymentVersion(version *deploymentspb.WorkerDeploymentVersion) error {
// ValidateDeploymentVersion returns error if the deployment version is not a valid entity.
func ValidateDeploymentVersion(version *deploymentspb.WorkerDeploymentVersion, maxIDLengthLimit int) error {
if version == nil {
return serviceerror.NewInvalidArgument("deployment version cannot be nil")
}
if version.GetDeploymentName() == "" {
return serviceerror.NewInvalidArgument("deployment name cannot be empty")

// Validate deployment name
err := ValidateDeploymentVersionFields(WorkerDeploymentNameFieldName, version.GetDeploymentName(), maxIDLengthLimit)
if err != nil {
return err
}

// Validate build ID
err = ValidateDeploymentVersionFields(WorkerDeploymentBuildIDFieldName, version.GetBuildId(), maxIDLengthLimit)
if err != nil {
return err
}

return nil
}

// ValidateDeploymentVersionFields is a helper that verifies if the fields within a
// Worker Deployment Version are valid
func ValidateDeploymentVersionFields(fieldName string, field string, maxIDLengthLimit int) error {
// Length checks
if field == "" {
return serviceerror.NewInvalidArgumentf("%v cannot be empty", fieldName)
}

// Length of each field should be: (MaxIDLengthLimit - (prefix + delimeter length)) / 2
// Note: Using the same initial size for both the fields since they are used together to generate the version workflow's ID
if len(field) > (maxIDLengthLimit-WorkerDeploymentVersionWorkflowIDInitialSize)/2 {
return serviceerror.NewInvalidArgumentf("size of %v larger than the maximum allowed", fieldName)
}

// deploymentName cannot have "."
// TODO: remove this restriction once the old version strings are completely cleaned from external and internal API
if fieldName == WorkerDeploymentNameFieldName && strings.Contains(field, WorkerDeploymentVersionIDDelimiterV31) {
return serviceerror.NewInvalidArgumentf("worker deployment name cannot contain '%s'", WorkerDeploymentVersionIDDelimiterV31)
}
if version.GetBuildId() == "" {
return serviceerror.NewInvalidArgument("build id cannot be empty")
// deploymentName cannot have ":"
if fieldName == WorkerDeploymentNameFieldName && strings.Contains(field, WorkerDeploymentVersionWorkflowIDDelimeter) {
return serviceerror.NewInvalidArgumentf("worker deployment name cannot contain '%s'", WorkerDeploymentVersionWorkflowIDDelimeter)
}

// buildID or deployment name cannot start with "__"
if strings.HasPrefix(field, "__") {
return serviceerror.NewInvalidArgumentf("%v cannot start with '__'", fieldName)
}

return nil
}

Expand Down Expand Up @@ -494,50 +528,6 @@ func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverri
return override.GetBehavior()
}

func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
if override == nil {
return nil
}

if override.GetAutoUpgrade() { // v0.32
return nil
} else if p := override.GetPinned(); p != nil {
if p.GetVersion() == nil {
return serviceerror.NewInvalidArgument("must provide version if override is pinned.")
}
if p.GetBehavior() == workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_UNSPECIFIED {
return serviceerror.NewInvalidArgument("must specify pinned override behavior if override is pinned.")
}
return nil
}

//nolint:staticcheck // SA1019: worker versioning v0.31
switch override.GetBehavior() {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if override.GetDeployment() != nil {
return ValidateDeployment(override.GetDeployment())
} else if override.GetPinnedVersion() != "" {
_, err := ValidateDeploymentVersionStringV31(override.GetPinnedVersion())
return err
} else {
return serviceerror.NewInvalidArgument("must provide deployment (deprecated) or pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
if override.GetDeployment() != nil {
return serviceerror.NewInvalidArgument("only provide deployment if behavior is 'PINNED'")
}
if override.GetPinnedVersion() != "" {
return serviceerror.NewInvalidArgument("only provide pinned version if behavior is 'PINNED'")
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
return serviceerror.NewInvalidArgument("override behavior is required")
default:
//nolint:staticcheck // SA1019 deprecated stamp will clean up later
return serviceerror.NewInvalidArgumentf("override behavior %s not recognized", override.GetBehavior())
}
return nil
}

// FindTargetDeploymentVersionAndRevisionNumberForWorkflowID returns the deployment version and revision number (if applicable) for
// the particular workflow ID based on the versioning info of the task queue. Nil means unversioned.
func FindTargetDeploymentVersionAndRevisionNumberForWorkflowID(
Expand Down Expand Up @@ -881,32 +871,32 @@ func WorkerDeploymentVersionToStringV31(v *deploymentspb.WorkerDeploymentVersion
if v == nil {
return UnversionedVersionId
}
return v.GetDeploymentName() + WorkerDeploymentVersionIdDelimiterV31 + v.GetBuildId()
return v.GetDeploymentName() + WorkerDeploymentVersionIDDelimiterV31 + v.GetBuildId()
}

func WorkerDeploymentVersionToStringV32(v *deploymentspb.WorkerDeploymentVersion) string {
if v == nil {
return ""
}
return v.GetDeploymentName() + WorkerDeploymentVersionIdDelimiter + v.GetBuildId()
return v.GetDeploymentName() + WorkerDeploymentVersionWorkflowIDDelimeter + v.GetBuildId()
}

func BuildIDToStringV32(deploymentName, buildID string) string {
return deploymentName + WorkerDeploymentVersionIdDelimiter + buildID
return deploymentName + WorkerDeploymentVersionWorkflowIDDelimeter + buildID
}

func ExternalWorkerDeploymentVersionToString(v *deploymentpb.WorkerDeploymentVersion) string {
if v == nil {
return ""
}
return v.GetDeploymentName() + WorkerDeploymentVersionIdDelimiter + v.GetBuildId()
return v.GetDeploymentName() + WorkerDeploymentVersionWorkflowIDDelimeter + v.GetBuildId()
}

func ExternalWorkerDeploymentVersionToStringV31(v *deploymentpb.WorkerDeploymentVersion) string {
if v == nil {
return UnversionedVersionId
}
return v.GetDeploymentName() + WorkerDeploymentVersionIdDelimiterV31 + v.GetBuildId()
return v.GetDeploymentName() + WorkerDeploymentVersionIDDelimiterV31 + v.GetBuildId()
}

func ExternalWorkerDeploymentVersionFromStringV31(s string) *deploymentpb.WorkerDeploymentVersion {
Expand All @@ -927,11 +917,11 @@ func WorkerDeploymentVersionFromStringV31(s string) (*deploymentspb.WorkerDeploy
if s == UnversionedVersionId {
return nil, nil
}
before, after, found := strings.Cut(s, WorkerDeploymentVersionIdDelimiterV31)
before, after, found := strings.Cut(s, WorkerDeploymentVersionIDDelimiterV31)
// Also try parsing via the v32 delimiter in case user is using an old CLI/SDK but passing new version strings.
before32, after32, found32 := strings.Cut(s, WorkerDeploymentVersionIdDelimiter)
before32, after32, found32 := strings.Cut(s, WorkerDeploymentVersionWorkflowIDDelimeter)
if !found && !found32 {
return nil, fmt.Errorf("expected delimiter '%s' or '%s' not found in version string %s", WorkerDeploymentVersionIdDelimiter, WorkerDeploymentVersionIdDelimiterV31, s)
return nil, fmt.Errorf("expected delimiter '%s' or '%s' not found in version string %s", WorkerDeploymentVersionWorkflowIDDelimeter, WorkerDeploymentVersionIDDelimiterV31, s)
}
if found && found32 && len(before32) < len(before) {
// choose the values based on the delimiter appeared first to ensure that deployment name does not contain any of the banned delimiters
Expand All @@ -954,9 +944,9 @@ func WorkerDeploymentVersionFromStringV32(s string) (*deploymentspb.WorkerDeploy
if s == UnversionedVersionId {
return nil, nil
}
before, after, found := strings.Cut(s, WorkerDeploymentVersionIdDelimiter)
before, after, found := strings.Cut(s, WorkerDeploymentVersionWorkflowIDDelimeter)
if !found {
return nil, fmt.Errorf("expected delimiter '%s' not found in version string %s", WorkerDeploymentVersionIdDelimiter, s)
return nil, fmt.Errorf("expected delimiter '%s' not found in version string %s", WorkerDeploymentVersionWorkflowIDDelimeter, s)
}
if len(before) == 0 {
return nil, fmt.Errorf("deployment name is empty in version string %s", s)
Expand All @@ -969,24 +959,3 @@ func WorkerDeploymentVersionFromStringV32(s string) (*deploymentspb.WorkerDeploy
BuildId: after,
}, nil
}

// GenerateDeploymentWorkflowID is a helper that generates a system accepted
// workflowID which are used in our Worker Deployment workflows
func GenerateDeploymentWorkflowID(deploymentName string) string {
return WorkerDeploymentWorkflowIDPrefix + WorkerDeploymentVersionWorkflowIDDelimeter + deploymentName
}

func GetDeploymentNameFromWorkflowID(workflowID string) string {
_, deploymentName, _ := strings.Cut(workflowID, WorkerDeploymentVersionWorkflowIDDelimeter)
return deploymentName
}

// GenerateVersionWorkflowID is a helper that generates a system accepted
// workflowID which are used in our Worker Deployment Version workflows
func GenerateVersionWorkflowID(deploymentName string, buildID string) string {
versionString := ExternalWorkerDeploymentVersionToString(&deploymentpb.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: buildID,
})
return WorkerDeploymentVersionWorkflowIDPrefix + WorkerDeploymentVersionWorkflowIDDelimeter + versionString
}
2 changes: 1 addition & 1 deletion common/worker_versioning/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestWorkerDeploymentVersionFromStringV32(t *testing.T) {
},
{
name: "only delimiter",
input: WorkerDeploymentVersionIdDelimiter,
input: WorkerDeploymentVersionWorkflowIDDelimeter,
expectedErr: "deployment name is empty in version string :",
},
{
Expand Down
8 changes: 6 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type Config struct {
// Enable deployment version RPCs
EnableDeploymentVersions dynamicconfig.BoolPropertyFnWithNamespaceFilter

// Enable worker version existence cache TTL
FrontendWorkerVersionExistsCacheTTL dynamicconfig.DurationPropertyFn

// Enable batcher RPCs
EnableBatcher dynamicconfig.BoolPropertyFnWithNamespaceFilter
// Batch operation dynamic configs
Expand Down Expand Up @@ -332,8 +335,9 @@ func NewConfig(
EnableCHASMSchedulerCreation: dynamicconfig.EnableCHASMSchedulerCreation.Get(dc),

// [cleanup-wv-pre-release]
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
EnableDeploymentVersions: dynamicconfig.EnableDeploymentVersions.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
EnableDeploymentVersions: dynamicconfig.EnableDeploymentVersions.Get(dc),
FrontendWorkerVersionExistsCacheTTL: dynamicconfig.FrontendWorkerVersionExistsCacheTTL.Get(dc),

EnableBatcher: dynamicconfig.FrontendEnableBatcher.Get(dc),
MaxConcurrentBatchOperation: dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace.Get(dc),
Expand Down
Loading
Loading