Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
777 changes: 0 additions & 777 deletions api/deployment/v1/message.go-helpers.pb.go

Large diffs are not rendered by default.

1,766 changes: 119 additions & 1,647 deletions api/deployment/v1/message.pb.go

Large diffs are not rendered by default.

197 changes: 78 additions & 119 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

164 changes: 42 additions & 122 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

28 changes: 0 additions & 28 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,6 @@ func GetIsWFTaskQueueInVersionDetector(matchingClient resource.MatchingClient) I
}
}

// [cleanup-wv-pre-release]
func FindDeployment(deployments *persistencespb.DeploymentData, deployment *deploymentpb.Deployment) int {
for i, d := range deployments.GetDeployments() { //nolint:staticcheck // SA1019: worker versioning v0.30
if d.Deployment.Equal(deployment) {
return i
}
}
return -1
}

func FindDeploymentVersion(deployments *persistencespb.DeploymentData, v *deploymentspb.WorkerDeploymentVersion) int {
for i, vd := range deployments.GetVersions() {
if proto.Equal(v, vd.GetVersion()) {
Expand All @@ -304,11 +294,6 @@ func HasDeploymentVersion(deployments *persistencespb.DeploymentData, v *deploym
return false
}

for _, d := range deployments.GetDeployments() {
if d.Deployment.Equal(DeploymentFromDeploymentVersion(v)) {
return true
}
}
for _, vd := range deployments.GetVersions() {
if proto.Equal(v, vd.GetVersion()) {
return true
Expand Down Expand Up @@ -700,19 +685,6 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
var current *deploymentspb.DeploymentVersionData
ramping := deployments.GetUnversionedRampData() // nil if there is no unversioned ramp

// Find old current
for _, d := range deployments.GetDeployments() {
// [cleanup-old-wv]
if d.Data.LastBecameCurrentTime != nil {
if t := d.Data.LastBecameCurrentTime.AsTime(); t.After(current.GetRoutingUpdateTime().AsTime()) {
current = &deploymentspb.DeploymentVersionData{
Version: DeploymentVersionFromDeployment(d.Deployment),
RoutingUpdateTime: d.Data.LastBecameCurrentTime,
}
}
}
}

// Find current and ramping
// [cleanup-pp-wv]
for _, v := range deployments.GetVersions() {
Expand Down
16 changes: 0 additions & 16 deletions common/worker_versioning/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ func TestCalculateTaskQueueVersioningInfo(t *testing.T) {
}{
{name: "nil data"},
{name: "empty data", data: &persistencespb.DeploymentData{}},
{name: "old data", wantCurrent: v1,
data: &persistencespb.DeploymentData{
Deployments: []*persistencespb.DeploymentData_DeploymentDataItem{
{Deployment: DeploymentFromDeploymentVersion(v1), Data: &deploymentspb.TaskQueueData{LastBecameCurrentTime: t1}},
}},
},
{name: "old deployment data: two current + two ramping",
wantCurrent: v2,
wantRamping: v3,
Expand Down Expand Up @@ -109,16 +103,6 @@ func TestCalculateTaskQueueVersioningInfo(t *testing.T) {
UnversionedRampData: &deploymentspb.DeploymentVersionData{Version: nil, RampPercentage: 100, RoutingUpdateTime: t2, RampingSinceTime: t2},
},
},
{name: "mix of prerelease and public preview deployment data: one current", wantCurrent: v2,
data: &persistencespb.DeploymentData{
Deployments: []*persistencespb.DeploymentData_DeploymentDataItem{
{Deployment: DeploymentFromDeploymentVersion(v1), Data: &deploymentspb.TaskQueueData{LastBecameCurrentTime: t1}},
},
Versions: []*deploymentspb.DeploymentVersionData{
{Version: v2, CurrentSinceTime: t2, RoutingUpdateTime: t2},
},
},
},
// Membership related tests
{name: "mixed: new RoutingConfig current overrides old when newer in membership", wantCurrent: v2,
data: &persistencespb.DeploymentData{
Expand Down
162 changes: 0 additions & 162 deletions proto/internal/temporal/server/api/deployment/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -474,165 +474,3 @@ message WorkerDeploymentSummary {
temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary current_version_summary = 5;
temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary ramping_version_summary = 6;
}

// Used as Worker Deployment Version workflow activity input:
message AddVersionToWorkerDeploymentRequest {
string deployment_name = 1;
AddVersionUpdateArgs update_args = 2;
string request_id = 3;
}

// Used as Worker Deployment Version workflow activity output:
message AddVersionToWorkerDeploymentResponse {
}

//////////////////////////////////////////////////////////////////////////
// Below this line are the messages for the pre-release Deployment APIs //
//////////////////////////////////////////////////////////////////////////
// Data for each deployment+task queue pair. This is stored in each deployment (for each task
// queue), and synced to task queue user data (for each deployment).
message TaskQueueData {
google.protobuf.Timestamp first_poller_time = 1;
google.protobuf.Timestamp last_became_current_time = 2;
}

message DeploymentLocalState {
temporal.api.deployment.v1.Deployment deployment = 1;
map<string, TaskQueueFamilyData> task_queue_families = 2;

bool is_current = 3;
google.protobuf.Timestamp last_became_current_time = 6;

google.protobuf.Timestamp create_time = 4;
map<string, temporal.api.common.v1.Payload> metadata = 5;

bool started_series_workflow = 7;

message TaskQueueFamilyData {
// Key: enums.TaskQueueType, Value: TaskQueueData
// All fields in TaskQueueData are valid except last_became_current_time, the source of
// truth for that is the value at the root of DeploymentLocalState.
map<int32, TaskQueueData> task_queues = 1;
}
}

// used as deployment workflow input:
message DeploymentWorkflowArgs {
string namespace_name = 1;
string namespace_id = 2;
DeploymentLocalState state = 3;
}

// used as deployment series workflow input:
message DeploymentSeriesWorkflowArgs {
string namespace_name = 1;
string namespace_id = 2;
string series_name = 3;
SeriesLocalState state = 4;
}

message SeriesLocalState {
string current_build_id = 1;
google.protobuf.Timestamp current_changed_time = 2;
}

// used as deployment workflow update input:
message RegisterWorkerInDeploymentArgs {
string task_queue_name = 1;
temporal.api.enums.v1.TaskQueueType task_queue_type = 2;
google.protobuf.Timestamp first_poller_time = 3;
int32 max_task_queues = 4;
}

// used as deployment series workflow update input (sent from series workflow):
message SyncDeploymentStateArgs {
SetCurrent set_current = 1;
// currently, update_metadata will only be set if set_current.last_became_current_time is
// present. this could change in the future.
temporal.api.deployment.v1.UpdateDeploymentMetadata update_metadata = 2;

message SetCurrent {
// If last_became_current_time is present, then set the deployment's
// last_became_current_time to it and set is_current true. If it's missing,
// set is_current false.
google.protobuf.Timestamp last_became_current_time = 1;
}
}

// used as deployment series workflow update response (sent from series workflow):
message SyncDeploymentStateResponse {
DeploymentLocalState deployment_local_state = 1;
}

// used as deployment workflow query response:
message QueryDescribeDeploymentResponse {
DeploymentLocalState deployment_local_state = 1;
}

// used as deployment workflow memo:
message DeploymentWorkflowMemo {
temporal.api.deployment.v1.Deployment deployment = 1;
google.protobuf.Timestamp create_time = 2;
bool is_current_deployment = 3;
}

// used as deployment workflow activity input:
message StartDeploymentSeriesRequest {
string series_name = 1;
string request_id = 2;
}

// used as deployment workflow activity input:
message SyncUserDataRequest {
temporal.api.deployment.v1.Deployment deployment = 1;
repeated SyncUserData sync= 2;

message SyncUserData {
string name = 1;
temporal.api.enums.v1.TaskQueueType type = 2;
TaskQueueData data = 3;
}
}

// used as deployment workflow activity output:
message SyncUserDataResponse {
map<string, int64> task_queue_max_versions = 1;
}

// used as deployment workflow activity input:
message CheckUserDataPropagationRequest {
map<string, int64> task_queue_max_versions = 1;
}

// used as deployment series workflow update input:
message SetCurrentDeploymentArgs {
string identity = 1;
string build_id = 2;
temporal.api.deployment.v1.UpdateDeploymentMetadata update_metadata = 3;
string request_id = 4;
}

// used as deployment series update response:
message SetCurrentDeploymentResponse {
DeploymentLocalState current_deployment_state = 1;
DeploymentLocalState previous_deployment_state = 2;
}

// used as deployment series activity input:
message SyncDeploymentStateActivityArgs {
temporal.api.deployment.v1.Deployment deployment = 1;
SyncDeploymentStateArgs args = 2;
string request_id = 3;
}

// used as deployment series activity result:
message SyncDeploymentStateActivityResult {
DeploymentLocalState state = 1;
}

// used as deployment series workflow memo:
message DeploymentSeriesWorkflowMemo {
string series_name = 1;
string current_build_id = 2;
google.protobuf.Timestamp current_changed_time = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,19 +346,9 @@ message SyncDeploymentUserDataRequest {
// (-- api-linter: core::0203::required=disabled
// aip.dev/not-precedent: Not following Google API format --)
string deployment_name = 9;
// Note: this is the task queue type being modified, but this field should not be used for
// routing, the user data is owned by the WORKFLOW task queue.
// Deprecated. Use `task_queue_types`.
temporal.api.enums.v1.TaskQueueType task_queue_type = 3;
reserved 3, 4, 5;
repeated temporal.api.enums.v1.TaskQueueType task_queue_types = 8;

// This is the deployment being modified.
// Deprecated.
temporal.api.deployment.v1.Deployment deployment = 4;
// Data for this deployment.
// Deprecated.
temporal.server.api.deployment.v1.TaskQueueData data = 5;

oneof operation {
// The deployment version and its data that is being updated.
temporal.server.api.deployment.v1.DeploymentVersionData update_version_data = 6 [deprecated = true];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,7 @@ message VersioningData {
}

message DeploymentData {
// Set of deployments that this task queue belongs to.
// Current deployment is defined implicitly as the deployment with the most recent
// TaskQueueData.last_became_current_time.
// Deprecated.
repeated DeploymentDataItem deployments = 1 [deprecated = true];

reserved 1;
// Set of worker deployment versions that this task queue belongs to.
// Current Version is defined implicitly as the version with `current_since_time!=nil` and the most
// recent `routing_update_time`.
Expand All @@ -111,13 +106,6 @@ message DeploymentData {
// Routing and version membership data for all worker deployments that this task queue belongs to.
// Key is the deployment name.
map<string, WorkerDeploymentData> deployments_data = 4;


// Deprecated.
message DeploymentDataItem {
temporal.api.deployment.v1.Deployment deployment = 1;
temporal.server.api.deployment.v1.TaskQueueData data = 2;
}
}

// Routing config and version membership data for a given worker deployment that a TQ should know.
Expand Down
6 changes: 1 addition & 5 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/gorilla/mux"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/chasm"
schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -42,7 +42,6 @@ import (
"go.temporal.io/server/service"
"go.temporal.io/server/service/frontend/configs"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/worker/deployment"
"go.temporal.io/server/service/worker/scheduler"
"go.temporal.io/server/service/worker/workerdeployment"
"go.uber.org/fx"
Expand All @@ -63,7 +62,6 @@ type (
var Module = fx.Options(
resource.Module,
scheduler.Module,
deployment.Module,
workerdeployment.Module,
// Note that with this approach routes may be registered in arbitrary order.
// This is okay because our routes don't have overlapping matches.
Expand Down Expand Up @@ -749,7 +747,6 @@ func HandlerProvider(
clientBean client.Bean,
historyClient resource.HistoryClient,
matchingClient resource.MatchingClient,
deploymentStoreClient deployment.DeploymentStoreClient,
workerDeploymentStoreClient workerdeployment.Client,
schedulerClient schedulerpb.SchedulerServiceClient,
archiverProvider provider.ArchiverProvider,
Expand Down Expand Up @@ -777,7 +774,6 @@ func HandlerProvider(
persistenceMetadataManager,
historyClient,
matchingClient,
deploymentStoreClient,
workerDeploymentStoreClient,
schedulerClient,
archiverProvider,
Expand Down
6 changes: 1 addition & 5 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
schedulespb "go.temporal.io/server/api/schedule/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/client/frontend"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -73,7 +73,6 @@ import (
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/worker/batcher"
"go.temporal.io/server/service/worker/deployment"
"go.temporal.io/server/service/worker/scheduler"
"go.temporal.io/server/service/worker/workerdeployment"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -129,7 +128,6 @@ type (
clusterMetadata cluster.Metadata
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
deploymentStoreClient deployment.DeploymentStoreClient
workerDeploymentClient workerdeployment.Client
schedulerClient schedulerpb.SchedulerServiceClient
archiverProvider provider.ArchiverProvider
Expand Down Expand Up @@ -161,7 +159,6 @@ func NewWorkflowHandler(
persistenceMetadataManager persistence.MetadataManager,
historyClient historyservice.HistoryServiceClient,
matchingClient matchingservice.MatchingServiceClient,
deploymentStoreClient deployment.DeploymentStoreClient,
workerDeploymentClient workerdeployment.Client,
schedulerClient schedulerpb.SchedulerServiceClient,
archiverProvider provider.ArchiverProvider,
Expand Down Expand Up @@ -202,7 +199,6 @@ func NewWorkflowHandler(
clusterMetadata: clusterMetadata,
historyClient: historyClient,
matchingClient: matchingClient,
deploymentStoreClient: deploymentStoreClient,
workerDeploymentClient: workerDeploymentClient,
schedulerClient: schedulerClient,
archiverProvider: archiverProvider,
Expand Down
1 change: 0 additions & 1 deletion service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (s *WorkflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl
s.mockResource.GetHistoryClient(),
s.mockResource.GetMatchingClient(),
nil,
nil,
nil, // Not initializing the scheduler client here.
s.mockResource.GetArchiverProvider(),
s.mockResource.GetPayloadSerializer(),
Expand Down
2 changes: 0 additions & 2 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.temporal.io/server/service"
"go.temporal.io/server/service/matching/configs"
"go.temporal.io/server/service/matching/workers"
"go.temporal.io/server/service/worker/deployment"
"go.temporal.io/server/service/worker/workerdeployment"
"go.uber.org/fx"
"google.golang.org/grpc"
Expand All @@ -30,7 +29,6 @@ import (

var Module = fx.Options(
resource.Module,
deployment.Module,
workerdeployment.Module,
fx.Provide(ConfigProvider),
fx.Provide(PersistenceRateLimitingParamsProvider),
Expand Down
Loading
Loading