Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions api/matchingservice/v1/request_response.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

696 changes: 410 additions & 286 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

161 changes: 83 additions & 78 deletions api/matchingservice/v1/service.pb.go

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions api/matchingservice/v1/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions api/matchingservicemock/v1/service_grpc.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions client/matching/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/matching/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/matching/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,21 +265,19 @@ func GetIsWFTaskQueueInVersionDetector(matchingClient resource.MatchingClient) I
return func(ctx context.Context,
namespaceID, tq string,
version *deploymentpb.WorkerDeploymentVersion) (bool, error) {
resp, err := matchingClient.GetTaskQueueUserData(ctx,
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: namespaceID,
TaskQueue: tq,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
})
resp, err := matchingClient.CheckTaskQueueVersionMembership(ctx, &matchingservice.CheckTaskQueueVersionMembershipRequest{
NamespaceId: namespaceID,
TaskQueue: tq,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
Version: &deploymentspb.WorkerDeploymentVersion{
DeploymentName: version.GetDeploymentName(),
BuildId: version.GetBuildId(),
},
})
if err != nil {
return false, err
}
tqData, ok := resp.GetUserData().GetData().GetPerType()[int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW)]
if !ok {
// The TQ is unversioned
return false, nil
}
return HasDeploymentVersion(tqData.GetDeploymentData(), DeploymentVersionFromDeployment(DeploymentFromExternalDeploymentVersion(version))), nil
return resp.GetIsMember(), nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,14 @@ message DescribeWorkerRequest {
message DescribeWorkerResponse {
temporal.api.worker.v1.WorkerInfo worker_info = 1;
}

message CheckTaskQueueVersionMembershipRequest {
string namespace_id = 1;
string task_queue = 2;
temporal.api.enums.v1.TaskQueueType task_queue_type = 3;
Copy link
Member Author

@Shivs11 Shivs11 Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we don't need this (right now) since all the version presence checks for a task queue go through the workflow task queue type. However, I just wanted to be consistent with other matching API's since those that accept task-queue also accept the type. Moreover, we could have a future use-case of involving the activity task queue type of a task queue.

temporal.server.api.deployment.v1.WorkerDeploymentVersion version = 4;
}

message CheckTaskQueueVersionMembershipResponse {
bool is_member = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,8 @@ service MatchingService {
// Returns an error if the namespace or worker doesn't exist.
rpc DescribeWorker (DescribeWorkerRequest) returns (DescribeWorkerResponse) {}

// CheckTaskQueueVersionMembership checks if a task queue is part of a specific deployment version.
rpc CheckTaskQueueVersionMembership(CheckTaskQueueVersionMembershipRequest) returns (CheckTaskQueueVersionMembershipResponse) {}

}

8 changes: 8 additions & 0 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,14 @@ func (h *Handler) CheckTaskQueueUserDataPropagation(
return h.engine.CheckTaskQueueUserDataPropagation(ctx, request)
}

func (h *Handler) CheckTaskQueueVersionMembership(
ctx context.Context,
request *matchingservice.CheckTaskQueueVersionMembershipRequest,
) (_ *matchingservice.CheckTaskQueueVersionMembershipResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
return h.engine.CheckTaskQueueVersionMembership(ctx, request)
}

func (h *Handler) DispatchNexusTask(ctx context.Context, request *matchingservice.DispatchNexusTaskRequest) (_ *matchingservice.DispatchNexusTaskResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)
return h.engine.DispatchNexusTask(ctx, request)
Expand Down
26 changes: 26 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3139,6 +3139,32 @@ func prepareTaskQueueUserData(
return data
}

func (e *matchingEngineImpl) CheckTaskQueueVersionMembership(
ctx context.Context,
request *matchingservice.CheckTaskQueueVersionMembershipRequest,
) (*matchingservice.CheckTaskQueueVersionMembershipResponse, error) {
partition, err := tqid.PartitionFromProto(&taskqueuepb.TaskQueue{
Name: request.GetTaskQueue(),
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}, request.GetNamespaceId(), request.GetTaskQueueType())
if err != nil {
return nil, err
}
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, true, loadCauseOtherRead)
if err != nil {
return nil, err
}

userData, _, err := pm.GetUserDataManager().GetUserData()
if err != nil {
return nil, err
}

typedUserData := userData.GetData().GetPerType()[int32(request.GetTaskQueueType())]
present := worker_versioning.HasDeploymentVersion(typedUserData.GetDeploymentData(), request.GetVersion())
return &matchingservice.CheckTaskQueueVersionMembershipResponse{IsMember: present}, nil
}

func (e *matchingEngineImpl) UpdateTaskQueueConfig(
ctx context.Context,
request *matchingservice.UpdateTaskQueueConfigRequest,
Expand Down
Loading
Loading