diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index c24487c839..f114d8a6d2 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -622,7 +622,17 @@ func (e *ExecutableTaskImpl) SyncState( } targetClusterInfo := e.ClusterMetadata.GetAllClusterInfo()[e.ClusterMetadata.GetCurrentClusterName()] - resp, err := remoteAdminClient.SyncWorkflowState(ctx, &adminservice.SyncWorkflowStateRequest{ + + // Remove branch tokens from version histories to reduce request size + versionHistories := syncStateErr.VersionHistories + if versionHistories != nil { + versionHistories = versionhistory.CopyVersionHistories(versionHistories) + for _, history := range versionHistories.Histories { + history.BranchToken = nil + } + } + + req := &adminservice.SyncWorkflowStateRequest{ NamespaceId: syncStateErr.NamespaceId, Execution: &commonpb.WorkflowExecution{ WorkflowId: syncStateErr.WorkflowId, @@ -630,10 +640,15 @@ func (e *ExecutableTaskImpl) SyncState( }, ArchetypeId: syncStateErr.ArchetypeId, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, + VersionHistories: versionHistories, TargetClusterId: int32(targetClusterInfo.InitialFailoverVersion), - }) + } + resp, err := remoteAdminClient.SyncWorkflowState(ctx, req) if err != nil { + var resourceExhaustedError *serviceerror.ResourceExhausted + if errors.As(err, &resourceExhaustedError) { + return false, serviceerror.NewInvalidArgumentf("sync workflow state failed due to resource exhausted: %v, request payload size: %v", err, req.Size()) + } logger := log.With(e.Logger, tag.WorkflowNamespaceID(syncStateErr.NamespaceId), tag.WorkflowID(syncStateErr.WorkflowId), diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go index 7f1113bf50..831e8845b2 100644 --- a/service/history/replication/executable_task_test.go +++ b/service/history/replication/executable_task_test.go @@ -580,8 +580,17 @@ func (s *executableTaskSuite) TestResend_TransitionHistoryDisabled() { }, ArchetypeId: syncStateErr.ArchetypeId, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(nil, consts.ErrTransitionHistoryDisabled).Times(1) @@ -636,8 +645,17 @@ func (s *executableTaskSuite) TestSyncState_SourceMutableStateHasUnFlushedBuffer }, ArchetypeId: chasm.WorkflowArchetypeID, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(nil, serviceerror.NewWorkflowNotReady("workflow not ready")).Times(1) @@ -1134,8 +1152,17 @@ func (s *executableTaskSuite) TestSyncState() { }, ArchetypeId: chasm.WorkflowArchetypeID, VersionedTransition: syncStateErr.VersionedTransition, - VersionHistories: syncStateErr.VersionHistories, - TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + // BranchToken is removed in the actual implementation + Items: []*historyspb.VersionHistoryItem{ + {EventId: 102, Version: 1234}, + }, + }, + }, + }, + TargetClusterId: int32(s.clusterMetadata.GetAllClusterInfo()[s.clusterMetadata.GetCurrentClusterName()].InitialFailoverVersion), }, ).Return(&adminservice.SyncWorkflowStateResponse{ VersionedTransitionArtifact: versionedTransitionArtifact,