From 6db1a1281029e92dd5276b3132d4008a35bf06e9 Mon Sep 17 00:00:00 2001 From: Will Duan Date: Thu, 11 Dec 2025 17:35:03 -0800 Subject: [PATCH 1/2] Trim SyncWorkflowState request branch token and log request size when error (#8794) ## What changed? Trim SyncWorkflowState request branch token and log request size when error ## Why? 1. To reduce the request size 2. help on debug when payload size is greater than grpc max msg size ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks No risk. --- .../history/replication/executable_task.go | 21 ++++++++-- .../replication/executable_task_test.go | 39 ++++++++++++++++--- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index c24487c839..f114d8a6d2 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -622,7 +622,17 @@ func (e *ExecutableTaskImpl) SyncState( } targetClusterInfo := e.ClusterMetadata.GetAllClusterInfo()[e.ClusterMetadata.GetCurrentClusterName()] - resp, err := remoteAdminClient.SyncWorkflowState(ctx, &adminservice.SyncWorkflowStateRequest{ + + // Remove branch tokens from version histories to reduce request size + versionHistories := syncStateErr.VersionHistories + if versionHistories != nil { + versionHistories = versionhistory.CopyVersionHistories(versionHistories) + for _, history := range versionHistories.Histories { + history.BranchToken = nil + } + } + + req := &adminservice.SyncWorkflowStateRequest{ NamespaceId: syncStateErr.NamespaceId, Execution: &commonpb.WorkflowExecution{ WorkflowId: syncStateErr.WorkflowId, @@ -630,10 +640,15 @@ func (e *ExecutableTaskImpl) SyncState( }, ArchetypeId: syncStateErr.ArchetypeId, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, + VersionHistories: versionHistories, TargetClusterId: int32(targetClusterInfo.InitialFailoverVersion), - }) + } + resp, err := remoteAdminClient.SyncWorkflowState(ctx, req) if err != nil { + var resourceExhaustedError *serviceerror.ResourceExhausted + if errors.As(err, &resourceExhaustedError) { + return false, serviceerror.NewInvalidArgumentf("sync workflow state failed due to resource exhausted: %v, request payload size: %v", err, req.Size()) + } logger := log.With(e.Logger, tag.WorkflowNamespaceID(syncStateErr.NamespaceId), tag.WorkflowID(syncStateErr.WorkflowId), diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go index 4f01f863b1..be333cebfc 100644 --- a/service/history/replication/executable_task_test.go +++ b/service/history/replication/executable_task_test.go @@ -580,8 +580,17 @@ func (s *executableTaskSuite) TestResend_TransitionHistoryDisabled() { }, ArchetypeId: syncStateErr.ArchetypeId, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(nil, consts.ErrTransitionHistoryDisabled).Times(1) @@ -636,8 +645,17 @@ func (s *executableTaskSuite) TestSyncState_SourceMutableStateHasUnFlushedBuffer }, ArchetypeId: chasm.WorkflowArchetypeID, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(nil, serviceerror.NewWorkflowNotReady("workflow not ready")).Times(1) @@ -1117,8 +1135,17 @@ func (s *executableTaskSuite) TestSyncState() { }, ArchetypeId: chasm.WorkflowArchetypeID, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(&adminservice.SyncWorkflowStateResponse{ VersionedTransitionArtifact: versionedTransitionArtifact, From 6ee57fbf55891394850c34c570cda3da93df922c Mon Sep 17 00:00:00 2001 From: "temporal-cicd[bot]" Date: Fri, 12 Dec 2025 19:04:04 +0000 Subject: [PATCH 2/2] Bump Server version to 1.30.0-147.4 --- common/headers/version_checker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/headers/version_checker.go b/common/headers/version_checker.go index 7a93f3a3e7..9e0d3b7f31 100644 --- a/common/headers/version_checker.go +++ b/common/headers/version_checker.go @@ -23,7 +23,7 @@ const ( // ServerVersion value can be changed by the create-tag Github workflow. // If you change the var name or move it, be sure to update the workflow. - ServerVersion = "1.30.0-147.3" + ServerVersion = "1.30.0-147.4" // SupportedServerVersions is used by CLI and inter role communication. SupportedServerVersions = ">=1.0.0 <2.0.0"