diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 4c79139121..8991f7e660 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -1101,10 +1101,14 @@ func (*WorkflowExecutionInfo_LastWorkflowTaskTimedOutType) isWorkflowExecutionIn } type ExecutionStats struct { - state protoimpl.MessageState `protogen:"open.v1"` - HistorySize int64 `protobuf:"varint,1,opt,name=history_size,json=historySize,proto3" json:"history_size,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + HistorySize int64 `protobuf:"varint,1,opt,name=history_size,json=historySize,proto3" json:"history_size,omitempty"` + // Total size in bytes of all external payloads referenced in the workflow history. + ExternalPayloadSize int64 `protobuf:"varint,2,opt,name=external_payload_size,json=externalPayloadSize,proto3" json:"external_payload_size,omitempty"` + // Total count of external payloads referenced in the workflow history. + ExternalPayloadCount int64 `protobuf:"varint,3,opt,name=external_payload_count,json=externalPayloadCount,proto3" json:"external_payload_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ExecutionStats) Reset() { @@ -1144,6 +1148,20 @@ func (x *ExecutionStats) GetHistorySize() int64 { return 0 } +func (x *ExecutionStats) GetExternalPayloadSize() int64 { + if x != nil { + return x.ExternalPayloadSize + } + return 0 +} + +func (x *ExecutionStats) GetExternalPayloadCount() int64 { + if x != nil { + return x.ExternalPayloadCount + } + return 0 +} + // execution_state column type WorkflowExecutionState struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -4620,9 +4638,11 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "&ChildrenInitializedPostResetPointEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12H\n" + "\x05value\x18\x02 \x01(\v22.temporal.server.api.persistence.v1.ResetChildInfoR\x05value:\x028\x01B\x1c\n" + - "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"3\n" + + "\x1alast_workflow_task_failureJ\x04\b\b\x10\tJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11J\x04\b,\x10-J\x04\b-\x10.J\x04\b/\x100J\x04\b0\x101J\x04\b1\x102J\x04\b2\x103\"\x9d\x01\n" + "\x0eExecutionStats\x12!\n" + - "\fhistory_size\x18\x01 \x01(\x03R\vhistorySize\"\x8c\x05\n" + + "\fhistory_size\x18\x01 \x01(\x03R\vhistorySize\x122\n" + + "\x15external_payload_size\x18\x02 \x01(\x03R\x13externalPayloadSize\x124\n" + + "\x16external_payload_count\x18\x03 \x01(\x03R\x14externalPayloadCount\"\x8c\x05\n" + "\x16WorkflowExecutionState\x12*\n" + "\x11create_request_id\x18\x01 \x01(\tR\x0fcreateRequestId\x12\x15\n" + "\x06run_id\x18\x02 \x01(\tR\x05runId\x12J\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 444599d5cd..635db2ba2f 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -291,6 +291,10 @@ message WorkflowExecutionInfo { message ExecutionStats { int64 history_size = 1; + // Total size in bytes of all external payloads referenced in the workflow history. + int64 external_payload_size = 2; + // Total count of external payloads referenced in the workflow history. + int64 external_payload_count = 3; } // execution_state column diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index c22b1dfc75..d124eb99e5 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -293,6 +293,12 @@ type ( GetHistorySize() int64 AddHistorySize(size int64) + GetExternalPayloadSize() int64 + AddExternalPayloadSize(size int64) + + GetExternalPayloadCount() int64 + AddExternalPayloadCount(count int64) + AddTasks(tasks ...tasks.Task) PopTasks() map[tasks.Category][]tasks.Task DeleteCHASMPureTasks(maxScheduledTime time.Time) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 9512c3f3c4..a48c11b0cd 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -298,6 +298,30 @@ func (mr *MockMutableStateMockRecorder) AddContinueAsNewEvent(arg0, arg1, arg2, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddContinueAsNewEvent", reflect.TypeOf((*MockMutableState)(nil).AddContinueAsNewEvent), arg0, arg1, arg2, arg3, arg4, arg5) } +// AddExternalPayloadCount mocks base method. +func (m *MockMutableState) AddExternalPayloadCount(count int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddExternalPayloadCount", count) +} + +// AddExternalPayloadCount indicates an expected call of AddExternalPayloadCount. +func (mr *MockMutableStateMockRecorder) AddExternalPayloadCount(count any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalPayloadCount", reflect.TypeOf((*MockMutableState)(nil).AddExternalPayloadCount), count) +} + +// AddExternalPayloadSize mocks base method. +func (m *MockMutableState) AddExternalPayloadSize(size int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddExternalPayloadSize", size) +} + +// AddExternalPayloadSize indicates an expected call of AddExternalPayloadSize. +func (mr *MockMutableStateMockRecorder) AddExternalPayloadSize(size any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddExternalPayloadSize", reflect.TypeOf((*MockMutableState)(nil).AddExternalPayloadSize), size) +} + // AddExternalWorkflowExecutionCancelRequested mocks base method. func (m *MockMutableState) AddExternalWorkflowExecutionCancelRequested(arg0 int64, arg1 namespace.Name, arg2 namespace.ID, arg3, arg4 string) (*history.HistoryEvent, error) { m.ctrl.T.Helper() @@ -2213,6 +2237,34 @@ func (mr *MockMutableStateMockRecorder) GetExecutionState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExecutionState", reflect.TypeOf((*MockMutableState)(nil).GetExecutionState)) } +// GetExternalPayloadCount mocks base method. +func (m *MockMutableState) GetExternalPayloadCount() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExternalPayloadCount") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetExternalPayloadCount indicates an expected call of GetExternalPayloadCount. +func (mr *MockMutableStateMockRecorder) GetExternalPayloadCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExternalPayloadCount", reflect.TypeOf((*MockMutableState)(nil).GetExternalPayloadCount)) +} + +// GetExternalPayloadSize mocks base method. +func (m *MockMutableState) GetExternalPayloadSize() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExternalPayloadSize") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetExternalPayloadSize indicates an expected call of GetExternalPayloadSize. +func (mr *MockMutableStateMockRecorder) GetExternalPayloadSize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExternalPayloadSize", reflect.TypeOf((*MockMutableState)(nil).GetExternalPayloadSize)) +} + // GetFirstRunID mocks base method. func (m *MockMutableState) GetFirstRunID(ctx context.Context) (string, error) { m.ctrl.T.Helper() diff --git a/service/history/ndc/conflict_resolver.go b/service/history/ndc/conflict_resolver.go index f56fd9b089..719bd72b9e 100644 --- a/service/history/ndc/conflict_resolver.go +++ b/service/history/ndc/conflict_resolver.go @@ -138,6 +138,8 @@ func (r *ConflictResolverImpl) rebuild( executionState.RunId, ) historySize := r.mutableState.GetHistorySize() + externalPayloadSize := r.mutableState.GetExternalPayloadSize() + externalPayloadCount := r.mutableState.GetExternalPayloadCount() rebuildMutableState, _, err := r.stateRebuilder.Rebuild( ctx, @@ -173,6 +175,8 @@ func (r *ConflictResolverImpl) rebuild( } rebuildMutableState.GetExecutionInfo().VersionHistories = versionHistories rebuildMutableState.AddHistorySize(historySize) + rebuildMutableState.AddExternalPayloadSize(externalPayloadSize) + rebuildMutableState.AddExternalPayloadCount(externalPayloadCount) // set the update condition from original mutable state rebuildMutableState.SetUpdateCondition(r.mutableState.GetUpdateCondition()) diff --git a/service/history/ndc/conflict_resolver_test.go b/service/history/ndc/conflict_resolver_test.go index 5440b420b1..d075c2ed72 100644 --- a/service/history/ndc/conflict_resolver_test.go +++ b/service/history/ndc/conflict_resolver_test.go @@ -89,6 +89,8 @@ func (s *conflictResolverSuite) TestRebuild() { requestID := uuid.NewString() version := int64(12) historySize := int64(12345) + externalPayloadSize := int64(6789) + externalPayloadCount := int64(42) branchToken0 := []byte("some random branch token") lastEventID0 := int64(5) @@ -116,6 +118,8 @@ func (s *conflictResolverSuite) TestRebuild() { RunId: s.runID, }).AnyTimes() s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes() + s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes() + s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes() workflowKey := definition.NewWorkflowKey( s.namespaceID, @@ -134,6 +138,8 @@ func (s *conflictResolverSuite) TestRebuild() { }, ).AnyTimes() mockRebuildMutableState.EXPECT().AddHistorySize(historySize) + mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize) + mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount) mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion) s.mockStateBuilder.EXPECT().Rebuild( @@ -146,7 +152,11 @@ func (s *conflictResolverSuite) TestRebuild() { workflowKey, branchToken1, requestID, - ).Return(mockRebuildMutableState, rand.Int63(), nil) + ).Return(mockRebuildMutableState, RebuildStats{ + HistorySize: rand.Int63(), + ExternalPayloadSize: rand.Int63(), + ExternalPayloadCount: rand.Int63(), + }, nil) s.mockContext.EXPECT().Clear() rebuiltMutableState, err := s.nDCConflictResolver.rebuild(ctx, 1, requestID) @@ -242,6 +252,10 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() { RunId: s.runID, }).AnyTimes() s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes() + externalPayloadSize := int64(6789) + externalPayloadCount := int64(42) + s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes() + s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes() workflowKey := definition.NewWorkflowKey( s.namespaceID, @@ -260,6 +274,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() { }, ).AnyTimes() mockRebuildMutableState.EXPECT().AddHistorySize(historySize) + mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize) + mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount) mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion) s.mockStateBuilder.EXPECT().Rebuild( @@ -272,7 +288,11 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() { workflowKey, branchToken1, gomock.Any(), - ).Return(mockRebuildMutableState, rand.Int63(), nil) + ).Return(mockRebuildMutableState, RebuildStats{ + HistorySize: rand.Int63(), + ExternalPayloadSize: rand.Int63(), + ExternalPayloadCount: rand.Int63(), + }, nil) s.mockContext.EXPECT().Clear() rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.GetOrRebuildCurrentMutableState(ctx, 1, incomingVersion) @@ -305,6 +325,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() { dbVersion := int64(1444) version := int64(12) historySize := int64(12345) + externalPayloadSize := int64(6789) + externalPayloadCount := int64(42) // current branch branchToken0 := []byte("some random branch token") @@ -339,6 +361,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() { RunId: s.runID, }).AnyTimes() s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes() + s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes() + s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes() workflowKey := definition.NewWorkflowKey( s.namespaceID, @@ -357,6 +381,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() { }, ).AnyTimes() mockRebuildMutableState.EXPECT().AddHistorySize(historySize) + mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize) + mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount) mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion) s.mockStateBuilder.EXPECT().Rebuild( @@ -369,7 +395,11 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() { workflowKey, branchToken1, gomock.Any(), - ).Return(mockRebuildMutableState, rand.Int63(), nil) + ).Return(mockRebuildMutableState, RebuildStats{ + HistorySize: rand.Int63(), + ExternalPayloadSize: rand.Int63(), + ExternalPayloadCount: rand.Int63(), + }, nil) s.mockContext.EXPECT().Clear() rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.GetOrRebuildMutableState(ctx, 1) diff --git a/service/history/ndc/resetter.go b/service/history/ndc/resetter.go index 7240486f16..0b4f2e5d5f 100644 --- a/service/history/ndc/resetter.go +++ b/service/history/ndc/resetter.go @@ -105,7 +105,7 @@ func (r *resetterImpl) resetWorkflow( } requestID := uuid.NewString() - rebuildMutableState, rebuiltHistorySize, err := r.stateRebuilder.Rebuild( + rebuildMutableState, rebuildStats, err := r.stateRebuilder.Rebuild( ctx, now, definition.NewWorkflowKey( @@ -127,7 +127,9 @@ func (r *resetterImpl) resetWorkflow( if err != nil { return nil, err } - rebuildMutableState.AddHistorySize(rebuiltHistorySize) + rebuildMutableState.AddHistorySize(rebuildStats.HistorySize) + rebuildMutableState.AddExternalPayloadSize(rebuildStats.ExternalPayloadSize) + rebuildMutableState.AddExternalPayloadCount(rebuildStats.ExternalPayloadCount) if err := rebuildMutableState.RefreshExpirationTimeoutTask(ctx); err != nil { return nil, err diff --git a/service/history/ndc/resetter_test.go b/service/history/ndc/resetter_test.go index ed80683c3f..9fe2c4a228 100644 --- a/service/history/ndc/resetter_test.go +++ b/service/history/ndc/resetter_test.go @@ -126,7 +126,11 @@ func (s *resetterSuite) TestResetWorkflow_NoError() { incomingFirstEventID := baseEventID + 12 incomingVersion := baseVersion + 3 - rebuiltHistorySize := int64(9999) + rebuildStats := RebuildStats{ + HistorySize: 9999, + ExternalPayloadSize: 1234, + ExternalPayloadCount: 56, + } newBranchToken := []byte("other random branch token") s.mockBaseMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() @@ -165,8 +169,10 @@ func (s *resetterSuite) TestResetWorkflow_NoError() { ), newBranchToken, gomock.Any(), - ).Return(s.mockRebuiltMutableState, rebuiltHistorySize, nil) - s.mockRebuiltMutableState.EXPECT().AddHistorySize(rebuiltHistorySize) + ).Return(s.mockRebuiltMutableState, rebuildStats, nil) + s.mockRebuiltMutableState.EXPECT().AddHistorySize(rebuildStats.HistorySize) + s.mockRebuiltMutableState.EXPECT().AddExternalPayloadSize(rebuildStats.ExternalPayloadSize) + s.mockRebuiltMutableState.EXPECT().AddExternalPayloadCount(rebuildStats.ExternalPayloadCount) shardID := s.mockShard.GetShardID() s.mockExecManager.EXPECT().ForkHistoryBranch(gomock.Any(), &persistence.ForkHistoryBranchRequest{ diff --git a/service/history/ndc/state_rebuilder.go b/service/history/ndc/state_rebuilder.go index d46e2cb496..9baa414e36 100644 --- a/service/history/ndc/state_rebuilder.go +++ b/service/history/ndc/state_rebuilder.go @@ -37,7 +37,7 @@ type ( targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string, - ) (historyi.MutableState, int64, error) + ) (historyi.MutableState, RebuildStats, error) RebuildWithCurrentMutableState( ctx context.Context, now time.Time, @@ -49,7 +49,13 @@ type ( targetBranchToken []byte, requestID string, currentMutableState *persistencespb.WorkflowMutableState, - ) (historyi.MutableState, int64, error) + ) (historyi.MutableState, RebuildStats, error) + } + + RebuildStats struct { + HistorySize int64 + ExternalPayloadSize int64 + ExternalPayloadCount int64 } StateRebuilderImpl struct { @@ -60,8 +66,10 @@ type ( executionMgr persistence.ExecutionManager taskRefresher workflow.TaskRefresher - rebuiltHistorySize int64 - logger log.Logger + rebuiltHistorySize int64 + rebuiltExternalPayloadSize int64 + rebuiltExternalPayloadCount int64 + logger log.Logger } HistoryBlobsPaginationItem struct { @@ -78,14 +86,16 @@ func NewStateRebuilder( ) *StateRebuilderImpl { return &StateRebuilderImpl{ - shard: shard, - namespaceRegistry: shard.GetNamespaceRegistry(), - eventsCache: shard.GetEventsCache(), - clusterMetadata: shard.GetClusterMetadata(), - executionMgr: shard.GetExecutionManager(), - taskRefresher: workflow.NewTaskRefresher(shard), - rebuiltHistorySize: 0, - logger: logger, + shard: shard, + namespaceRegistry: shard.GetNamespaceRegistry(), + eventsCache: shard.GetEventsCache(), + clusterMetadata: shard.GetClusterMetadata(), + executionMgr: shard.GetExecutionManager(), + taskRefresher: workflow.NewTaskRefresher(shard), + rebuiltHistorySize: 0, + rebuiltExternalPayloadSize: 0, + rebuiltExternalPayloadCount: 0, + logger: logger, } } @@ -99,7 +109,7 @@ func (r *StateRebuilderImpl) Rebuild( targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string, -) (historyi.MutableState, int64, error) { +) (historyi.MutableState, RebuildStats, error) { rebuiltMutableState, lastTxnId, err := r.buildMutableStateFromEvent( ctx, now, @@ -112,13 +122,13 @@ func (r *StateRebuilderImpl) Rebuild( requestID, ) if err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } // close rebuilt mutable state transaction clearing all generated tasks, etc. _, _, err = rebuiltMutableState.CloseTransactionAsSnapshot(historyi.TransactionPolicyPassive) if err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } rebuiltMutableState.GetExecutionInfo().LastFirstEventTxnId = lastTxnId @@ -128,10 +138,14 @@ func (r *StateRebuilderImpl) Rebuild( // from the base run. However, RefreshTasks always resets that field and // force regenerates the execution timeout timer task. if err := r.taskRefresher.Refresh(ctx, rebuiltMutableState, false); err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } - return rebuiltMutableState, r.rebuiltHistorySize, nil + return rebuiltMutableState, RebuildStats{ + HistorySize: r.rebuiltHistorySize, + ExternalPayloadSize: r.rebuiltExternalPayloadSize, + ExternalPayloadCount: r.rebuiltExternalPayloadCount, + }, nil } func (r *StateRebuilderImpl) RebuildWithCurrentMutableState( @@ -145,7 +159,7 @@ func (r *StateRebuilderImpl) RebuildWithCurrentMutableState( targetBranchToken []byte, requestID string, currentMutableState *persistencespb.WorkflowMutableState, -) (historyi.MutableState, int64, error) { +) (historyi.MutableState, RebuildStats, error) { rebuiltMutableState, lastTxnId, err := r.buildMutableStateFromEvent( ctx, now, @@ -158,13 +172,13 @@ func (r *StateRebuilderImpl) RebuildWithCurrentMutableState( requestID, ) if err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } copyToRebuildMutableState(rebuiltMutableState, currentMutableState) versionHistories := rebuiltMutableState.GetExecutionInfo().GetVersionHistories() currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(versionHistories) if err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } items := versionhistory.CopyVersionHistoryItems(currentVersionHistory.Items) @@ -176,7 +190,7 @@ func (r *StateRebuilderImpl) RebuildWithCurrentMutableState( // close rebuilt mutable state transaction clearing all generated tasks, etc. _, _, err = rebuiltMutableState.CloseTransactionAsSnapshot(historyi.TransactionPolicyActive) if err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } currentVersionHistory.Items = items @@ -187,10 +201,14 @@ func (r *StateRebuilderImpl) RebuildWithCurrentMutableState( // from the base run. However, RefreshTasks always resets that field and // force regenerates the execution timeout timer task. if err := r.taskRefresher.Refresh(ctx, rebuiltMutableState, false); err != nil { - return nil, 0, err + return nil, RebuildStats{}, err } - return rebuiltMutableState, r.rebuiltHistorySize, nil + return rebuiltMutableState, RebuildStats{ + HistorySize: r.rebuiltHistorySize, + ExternalPayloadSize: r.rebuiltExternalPayloadSize, + ExternalPayloadCount: r.rebuiltExternalPayloadCount, + }, nil } func copyToRebuildMutableState( @@ -360,6 +378,14 @@ func (r *StateRebuilderImpl) getPaginationFn( TransactionID: resp.TransactionIDs[i], } paginateItems = append(paginateItems, nextBatch) + + // Calculate and accumulate external payload size and count for this batch of history events + externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(history.Events) + if err != nil { + return nil, nil, err + } + r.rebuiltExternalPayloadSize += externalPayloadSize + r.rebuiltExternalPayloadCount += externalPayloadCount } return paginateItems, resp.NextPageToken, nil } diff --git a/service/history/ndc/state_rebuilder_mock.go b/service/history/ndc/state_rebuilder_mock.go index 2079cb6331..7594d8bbf2 100644 --- a/service/history/ndc/state_rebuilder_mock.go +++ b/service/history/ndc/state_rebuilder_mock.go @@ -45,11 +45,11 @@ func (m *MockStateRebuilder) EXPECT() *MockStateRebuilderMockRecorder { } // Rebuild mocks base method. -func (m *MockStateRebuilder) Rebuild(ctx context.Context, now time.Time, baseWorkflowIdentifier definition.WorkflowKey, baseBranchToken []byte, baseLastEventID int64, baseLastEventVersion *int64, targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string) (interfaces.MutableState, int64, error) { +func (m *MockStateRebuilder) Rebuild(ctx context.Context, now time.Time, baseWorkflowIdentifier definition.WorkflowKey, baseBranchToken []byte, baseLastEventID int64, baseLastEventVersion *int64, targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string) (interfaces.MutableState, RebuildStats, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Rebuild", ctx, now, baseWorkflowIdentifier, baseBranchToken, baseLastEventID, baseLastEventVersion, targetWorkflowIdentifier, targetBranchToken, requestID) ret0, _ := ret[0].(interfaces.MutableState) - ret1, _ := ret[1].(int64) + ret1, _ := ret[1].(RebuildStats) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } @@ -61,11 +61,11 @@ func (mr *MockStateRebuilderMockRecorder) Rebuild(ctx, now, baseWorkflowIdentifi } // RebuildWithCurrentMutableState mocks base method. -func (m *MockStateRebuilder) RebuildWithCurrentMutableState(ctx context.Context, now time.Time, baseWorkflowIdentifier definition.WorkflowKey, baseBranchToken []byte, baseLastEventID int64, baseLastEventVersion *int64, targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string, currentMutableState *persistence.WorkflowMutableState) (interfaces.MutableState, int64, error) { +func (m *MockStateRebuilder) RebuildWithCurrentMutableState(ctx context.Context, now time.Time, baseWorkflowIdentifier definition.WorkflowKey, baseBranchToken []byte, baseLastEventID int64, baseLastEventVersion *int64, targetWorkflowIdentifier definition.WorkflowKey, targetBranchToken []byte, requestID string, currentMutableState *persistence.WorkflowMutableState) (interfaces.MutableState, RebuildStats, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RebuildWithCurrentMutableState", ctx, now, baseWorkflowIdentifier, baseBranchToken, baseLastEventID, baseLastEventVersion, targetWorkflowIdentifier, targetBranchToken, requestID, currentMutableState) ret0, _ := ret[0].(interfaces.MutableState) - ret1, _ := ret[1].(int64) + ret1, _ := ret[1].(RebuildStats) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } diff --git a/service/history/ndc/state_rebuilder_test.go b/service/history/ndc/state_rebuilder_test.go index b312e7d373..72582c93cf 100644 --- a/service/history/ndc/state_rebuilder_test.go +++ b/service/history/ndc/state_rebuilder_test.go @@ -256,6 +256,12 @@ func (s *stateRebuilderSuite) TestRebuild() { firstEventID := common.FirstEventID nextEventID := lastEventID + 1 + payloadsWithExternalReference1 := payloads.EncodeString("some random input") + payloadsWithExternalReference1.Payloads[0].ExternalPayloads = []*commonpb.Payload_ExternalPayloadDetails{ + { + SizeBytes: 1024, + }, + } events1 := []*historypb.HistoryEvent{{ EventId: 1, Version: version, @@ -263,20 +269,26 @@ func (s *stateRebuilderSuite) TestRebuild() { Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ WorkflowType: &commonpb.WorkflowType{Name: "some random workflow type"}, TaskQueue: &taskqueuepb.TaskQueue{Name: "some random workflow type"}, - Input: payloads.EncodeString("some random input"), + Input: payloadsWithExternalReference1, WorkflowExecutionTimeout: durationpb.New(123 * time.Second), WorkflowRunTimeout: durationpb.New(233 * time.Second), WorkflowTaskTimeout: durationpb.New(45 * time.Second), Identity: "some random identity", }}, }} + payloadsWithExternalReference2 := payloads.EncodeString("some random input") + payloadsWithExternalReference2.Payloads[0].ExternalPayloads = []*commonpb.Payload_ExternalPayloadDetails{ + { + SizeBytes: 2048, + }, + } events2 := []*historypb.HistoryEvent{{ EventId: 2, Version: version, EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, Attributes: &historypb.HistoryEvent_WorkflowExecutionSignaledEventAttributes{WorkflowExecutionSignaledEventAttributes: &historypb.WorkflowExecutionSignaledEventAttributes{ SignalName: "some random signal name", - Input: payloads.EncodeString("some random signal input"), + Input: payloadsWithExternalReference2, Identity: "some random identity", }}, }} @@ -329,7 +341,7 @@ func (s *stateRebuilderSuite) TestRebuild() { ), nil).AnyTimes() s.mockTaskRefresher.EXPECT().Refresh(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - rebuildMutableState, rebuiltHistorySize, err := s.nDCStateRebuilder.Rebuild( + rebuildMutableState, rebuildStats, err := s.nDCStateRebuilder.Rebuild( context.Background(), s.now, definition.NewWorkflowKey(s.namespaceID.String(), s.workflowID, s.runID), @@ -346,7 +358,9 @@ func (s *stateRebuilderSuite) TestRebuild() { s.Equal(targetNamespaceID, namespace.ID(rebuildExecutionInfo.NamespaceId)) s.Equal(targetWorkflowID, rebuildExecutionInfo.WorkflowId) s.Equal(targetRunID, rebuildMutableState.GetExecutionState().RunId) - s.Equal(int64(historySize1+historySize2), rebuiltHistorySize) + s.Equal(int64(historySize1+historySize2), rebuildStats.HistorySize) + s.Equal(int64(1024+2048), rebuildStats.ExternalPayloadSize) + s.Equal(int64(2), rebuildStats.ExternalPayloadCount) s.ProtoEqual(versionhistory.NewVersionHistories( versionhistory.NewVersionHistory( targetBranchToken, @@ -455,7 +469,7 @@ func (s *stateRebuilderSuite) TestRebuildWithCurrentMutableState() { }, } s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, int64(12)).Return(cluster.TestCurrentClusterName).AnyTimes() - rebuildMutableState, rebuiltHistorySize, err := s.nDCStateRebuilder.RebuildWithCurrentMutableState( + rebuildMutableState, rebuildStats, err := s.nDCStateRebuilder.RebuildWithCurrentMutableState( context.Background(), s.now, definition.NewWorkflowKey(s.namespaceID.String(), s.workflowID, s.runID), @@ -473,7 +487,7 @@ func (s *stateRebuilderSuite) TestRebuildWithCurrentMutableState() { s.Equal(targetNamespaceID, namespace.ID(rebuildExecutionInfo.NamespaceId)) s.Equal(targetWorkflowID, rebuildExecutionInfo.WorkflowId) s.Equal(targetRunID, rebuildMutableState.GetExecutionState().RunId) - s.Equal(int64(historySize1+historySize2), rebuiltHistorySize) + s.Equal(int64(historySize1+historySize2), rebuildStats.HistorySize) s.ProtoEqual(versionhistory.NewVersionHistories( versionhistory.NewVersionHistory( targetBranchToken, diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index b52b0cdf6f..d2df412512 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -457,7 +457,7 @@ func (r *workflowResetterImpl) replayResetWorkflow( r.shardContext.GetMetricsHandler(), ) - resetMutableState, resetHistorySize, err := r.stateRebuilder.Rebuild( + resetMutableState, resetStats, err := r.stateRebuilder.Rebuild( ctx, r.shardContext.GetTimeSource().Now(), definition.NewWorkflowKey( @@ -485,7 +485,9 @@ func (r *workflowResetterImpl) replayResetWorkflow( baseRebuildLastEventID, baseRebuildLastEventVersion, ) - resetMutableState.AddHistorySize(resetHistorySize) + resetMutableState.AddHistorySize(resetStats.HistorySize) + resetMutableState.AddExternalPayloadSize(resetStats.ExternalPayloadSize) + resetMutableState.AddExternalPayloadCount(resetStats.ExternalPayloadCount) return NewWorkflow( r.clusterMetadata, resetContext, diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 4fec71a62e..183c718314 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -286,7 +286,11 @@ func (s *workflowResetterSuite) TestReplayResetWorkflow() { resetBranchToken := []byte("some random reset branch token") resetRequestID := uuid.NewString() - resetHistorySize := int64(4411) + resetStats := RebuildStats{ + HistorySize: 4411, + ExternalPayloadSize: 1234, + ExternalPayloadCount: 56, + } resetMutableState := historyi.NewMockMutableState(s.controller) s.mockExecutionMgr.EXPECT().ForkHistoryBranch(gomock.Any(), gomock.Any()).Return( @@ -311,13 +315,15 @@ func (s *workflowResetterSuite) TestReplayResetWorkflow() { ), resetBranchToken, resetRequestID, - ).Return(resetMutableState, resetHistorySize, nil) + ).Return(resetMutableState, resetStats, nil) resetMutableState.EXPECT().SetBaseWorkflow( s.baseRunID, baseRebuildLastEventID, baseRebuildLastEventVersion, ) - resetMutableState.EXPECT().AddHistorySize(resetHistorySize) + resetMutableState.EXPECT().AddHistorySize(resetStats.HistorySize) + resetMutableState.EXPECT().AddExternalPayloadSize(resetStats.ExternalPayloadSize) + resetMutableState.EXPECT().AddExternalPayloadCount(resetStats.ExternalPayloadCount) resetWorkflow, err := s.workflowResetter.replayResetWorkflow( ctx, @@ -1472,7 +1478,11 @@ func (s *workflowResetterSuite) TestWorkflowRestartAfterExecutionTimeout() { resetBranchToken := []byte("some random reset branch token") resetRequestID := uuid.NewString() - resetHistorySize := int64(4411) + resetStats := RebuildStats{ + HistorySize: 4411, + ExternalPayloadSize: 1234, + ExternalPayloadCount: 56, + } resetMutableState := historyi.NewMockMutableState(s.controller) executionInfos := make(map[int64]*persistencespb.ChildExecutionInfo) @@ -1500,10 +1510,12 @@ func (s *workflowResetterSuite) TestWorkflowRestartAfterExecutionTimeout() { definition.NewWorkflowKey(s.namespaceID.String(), s.workflowID, s.resetRunID), resetBranchToken, resetRequestID, - ).Return(resetMutableState, resetHistorySize, nil) + ).Return(resetMutableState, resetStats, nil) resetMutableState.EXPECT().SetBaseWorkflow(s.baseRunID, baseRebuildLastEventID, baseRebuildLastEventVersion) - resetMutableState.EXPECT().AddHistorySize(resetHistorySize) + resetMutableState.EXPECT().AddHistorySize(resetStats.HistorySize) + resetMutableState.EXPECT().AddExternalPayloadSize(resetStats.ExternalPayloadSize) + resetMutableState.EXPECT().AddExternalPayloadCount(resetStats.ExternalPayloadCount) resetMutableState.EXPECT().GetCurrentVersion().Return(resetWorkflowVersion).AnyTimes() resetMutableState.EXPECT().UpdateCurrentVersion(resetWorkflowVersion, false).Return(nil).AnyTimes() resetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 32da9a84fe..b686c9ba37 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -1117,6 +1117,12 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch( isNewBranch = false localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data)) + externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events) + if err != nil { + return err + } + localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize + localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount } return nil } @@ -1175,6 +1181,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch( startEventID = events[len(events)-1].EventId startEventVersion = events[len(events)-1].Version localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(eventBlobs[i].Data)) + externalPayloadSize, externalPayloadCount, err := workflow.CalculateExternalPayloadSize(events) + if err != nil { + return newBranchToken, err + } + localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadSize += externalPayloadSize + localMutableState.GetExecutionInfo().ExecutionStats.ExternalPayloadCount += externalPayloadCount + } // add more events if there is any if startEventID < endEventID { diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index a75d50733f..4f42ce8bdf 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -1621,3 +1621,115 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_C s.Equal(int32(2), localVersionHistoryies.CurrentVersionHistoryIndex) s.NotNil(newRunBranch) } + +func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_ExternalPayloadStats() { + // Test that the external payload stats are correctly updated when bringLocalEventsUpToSourceCurrentBranch is invoked + namespaceID := uuid.NewString() + versionHistories := &historyspb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("branchToken"), + Items: []*historyspb.VersionHistoryItem{ + { + EventId: int64(2), + Version: int64(1), + }, + }, + }, + }, + } + localVersionHistories := &historyspb.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("local-branchToken"), + }, + }, + } + + historyEvents := []*historypb.HistoryEvent{ + { + EventId: 1, + Version: 1, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{ + WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ + Input: &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + Data: []byte("test"), + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: 1024}, + {SizeBytes: 2048}, + }, + }, + }, + }, + }, + }, + }, + { + EventId: 2, + Version: 1, + EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, + Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ + WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{}, + }, + }, + } + + serializer := serialization.NewSerializer() + eventBlobs, err := serializer.SerializeEvents(historyEvents) + s.NoError(err) + + executionStats := &persistencespb.ExecutionStats{ + HistorySize: 0, + ExternalPayloadSize: 0, + ExternalPayloadCount: 0, + } + mockMutableState := historyi.NewMockMutableState(s.controller) + mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + VersionHistories: localVersionHistories, + ExecutionStats: executionStats, + }).AnyTimes() + mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: s.runID, + }).AnyTimes() + mockMutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID)).AnyTimes() + mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any()).Times(1) + mockMutableState.EXPECT().AddReapplyCandidateEvent(gomock.Any()).AnyTimes() + + mockWeCtx := historyi.NewMockWorkflowContext(s.controller) + sourceClusterName := "test-cluster" + + s.mockNamespaceCache.EXPECT().GetNamespaceName(namespace.ID(namespaceID)).Return(namespace.Name("test-namespace"), nil).AnyTimes() + + mockShard := historyi.NewMockShardContext(s.controller) + taskId := int64(100) + mockShard.EXPECT().GenerateTaskID().Return(taskId, nil).Times(1) + mockShard.EXPECT().GetRemoteAdminClient(sourceClusterName).Return(s.mockRemoteAdminClient, nil).AnyTimes() + mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes() + mockEventsCache := events.NewMockCache(s.controller) + mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).AnyTimes() + s.workflowStateReplicator.shardContext = mockShard + + s.mockExecutionManager.EXPECT().AppendRawHistoryNodes(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) + + _, err = s.workflowStateReplicator.bringLocalEventsUpToSourceCurrentBranch( + context.Background(), + namespace.ID(namespaceID), + s.workflowID, + s.runID, + sourceClusterName, + mockWeCtx, + mockMutableState, + versionHistories, + []*commonpb.DataBlob{eventBlobs}, + true) + s.NoError(err) + + s.Equal(int64(1024+2048), executionStats.ExternalPayloadSize) + s.Equal(int64(2), executionStats.ExternalPayloadCount) +} diff --git a/service/history/workflow/external_payload_size.go b/service/history/workflow/external_payload_size.go new file mode 100644 index 0000000000..0908459d8e --- /dev/null +++ b/service/history/workflow/external_payload_size.go @@ -0,0 +1,37 @@ +package workflow + +import ( + "context" + + commonpb "go.temporal.io/api/common/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/proxy" +) + +// CalculateExternalPayloadSize calculates the total size and count of all external payloads in the given history events. +func CalculateExternalPayloadSize(events []*historypb.HistoryEvent) (size int64, count int64, err error) { + var totalSize int64 + var totalCount int64 + + visitor := func(vpc *proxy.VisitPayloadsContext, payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + for _, p := range payloads { + totalCount += int64(len(p.ExternalPayloads)) + for _, extPayload := range p.ExternalPayloads { + totalSize += extPayload.SizeBytes + } + } + return payloads, nil + } + + for _, event := range events { + err := proxy.VisitPayloads(context.Background(), event, proxy.VisitPayloadsOptions{ + Visitor: visitor, + SkipSearchAttributes: true, + }) + if err != nil { + return 0, 0, err + } + } + + return totalSize, totalCount, nil +} diff --git a/service/history/workflow/external_payload_size_test.go b/service/history/workflow/external_payload_size_test.go new file mode 100644 index 0000000000..0a7749ac56 --- /dev/null +++ b/service/history/workflow/external_payload_size_test.go @@ -0,0 +1,95 @@ +package workflow + +import ( + "testing" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCalculateExternalPayloadSize_NoExternalPayloads(t *testing.T) { + events := []*historypb.HistoryEvent{ + { + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{ + WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ + Input: &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + Data: []byte("test data"), + }, + }, + }, + }, + }, + }, + } + + size, count, err := CalculateExternalPayloadSize(events) + require.NoError(t, err) + assert.Equal(t, int64(0), size) + assert.Equal(t, int64(0), count) +} + +func TestCalculateExternalPayloadSize_WithExternalPayloads(t *testing.T) { + events := []*historypb.HistoryEvent{ + { + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{ + WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ + Input: &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + Data: []byte("reference"), + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + { + SizeBytes: 1024, + }, + { + SizeBytes: 2048, + }, + }, + }, + }, + }, + }, + }, + }, + { + EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED, + Attributes: &historypb.HistoryEvent_ActivityTaskCompletedEventAttributes{ + ActivityTaskCompletedEventAttributes: &historypb.ActivityTaskCompletedEventAttributes{ + Result: &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + Data: []byte("result"), + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + { + SizeBytes: 512, + }, + }, + }, + }, + }, + }, + }, + }, + } + + size, count, err := CalculateExternalPayloadSize(events) + require.NoError(t, err) + assert.Equal(t, int64(1024+2048+512), size) + assert.Equal(t, int64(3), count) +} + +func TestCalculateExternalPayloadSize_EmptyEvents(t *testing.T) { + events := []*historypb.HistoryEvent{} + + size, count, err := CalculateExternalPayloadSize(events) + require.NoError(t, err) + assert.Equal(t, int64(0), size) + assert.Equal(t, int64(0), count) +} diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index cdfd2801c3..6211ef81de 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6379,6 +6379,28 @@ func (ms *MutableStateImpl) AddHistorySize(size int64) { ms.executionInfo.ExecutionStats.HistorySize += size } +func (ms *MutableStateImpl) GetExternalPayloadSize() int64 { + return ms.executionInfo.ExecutionStats.ExternalPayloadSize +} + +func (ms *MutableStateImpl) AddExternalPayloadSize(size int64) { + if ms.executionInfo.ExecutionStats == nil { + ms.executionInfo.ExecutionStats = &persistencespb.ExecutionStats{} + } + ms.executionInfo.ExecutionStats.ExternalPayloadSize += size +} + +func (ms *MutableStateImpl) GetExternalPayloadCount() int64 { + return ms.executionInfo.ExecutionStats.ExternalPayloadCount +} + +func (ms *MutableStateImpl) AddExternalPayloadCount(count int64) { + if ms.executionInfo.ExecutionStats == nil { + ms.executionInfo.ExecutionStats = &persistencespb.ExecutionStats{} + } + ms.executionInfo.ExecutionStats.ExternalPayloadCount += count +} + // processCloseCallbacks triggers "WorkflowClosed" callbacks, applying the state machine transition that schedules // callback tasks. func (ms *MutableStateImpl) processCloseCallbacks() error { @@ -7512,6 +7534,14 @@ func (ms *MutableStateImpl) closeTransactionPrepareEvents( } ms.executionInfo.LastFirstEventId = eventBatch[0].GetEventId() ms.executionInfo.LastFirstEventTxnId = historyNodeTxnIDs[index] + + // Calculate and add the external payload size and count for this batch + externalPayloadSize, externalPayloadCount, err := CalculateExternalPayloadSize(eventBatch) + if err != nil { + return nil, nil, nil, false, err + } + ms.AddExternalPayloadSize(externalPayloadSize) + ms.AddExternalPayloadCount(externalPayloadCount) } if err := ms.validateNoEventsAfterWorkflowFinish( diff --git a/service/history/workflow_rebuilder.go b/service/history/workflow_rebuilder.go index e85aa4b175..d86366553c 100644 --- a/service/history/workflow_rebuilder.go +++ b/service/history/workflow_rebuilder.go @@ -161,7 +161,7 @@ func (r *workflowRebuilderImpl) replayResetWorkflow( requestID string, mutableState *persistencespb.WorkflowMutableState, ) (historyi.MutableState, error) { - rebuildMutableState, rebuildHistorySize, err := ndc.NewStateRebuilder(r.shard, r.logger).RebuildWithCurrentMutableState( + rebuildMutableState, rebuildStats, err := ndc.NewStateRebuilder(r.shard, r.logger).RebuildWithCurrentMutableState( ctx, r.shard.GetTimeSource().Now(), workflowKey, @@ -180,7 +180,9 @@ func (r *workflowRebuilderImpl) replayResetWorkflow( // note: this is an admin API, for operator to recover a corrupted mutable state, so state transition count // should remain the same, the -= 1 exists here since later CloseTransactionAsSnapshot will += 1 to state transition count rebuildMutableState.GetExecutionInfo().StateTransitionCount = stateTransitionCount - 1 - rebuildMutableState.AddHistorySize(rebuildHistorySize) + rebuildMutableState.AddHistorySize(rebuildStats.HistorySize) + rebuildMutableState.AddExternalPayloadSize(rebuildStats.ExternalPayloadSize) + rebuildMutableState.AddExternalPayloadCount(rebuildStats.ExternalPayloadCount) rebuildMutableState.SetUpdateCondition(rebuildMutableState.GetNextEventID(), dbRecordVersion) return rebuildMutableState, nil } diff --git a/tests/reset_workflow_test.go b/tests/reset_workflow_test.go index 6005c512c9..506f015a41 100644 --- a/tests/reset_workflow_test.go +++ b/tests/reset_workflow_test.go @@ -990,3 +990,159 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflow_ResetAfterContinueAsNew() { }) s.NoError(err) } + +func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() { + // This test verifies that ExternalPayloadSize and ExternalPayloadCount are correctly + // tracked when a workflow is reset. It resets to a point before the activity completes, + // so only the workflow input external payload should be counted. + workflowID := "functional-reset-workflow-external-payload-test" + workflowType := "functional-reset-workflow-external-payload-test-type" + taskQueue := "functional-reset-workflow-external-payload-test-taskqueue" + identity := "worker1" + + // External payload in workflow input + workflowExternalPayloadSize := int64(1024) + workflowInputPayload := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: workflowExternalPayloadSize}, + }, + }, + }, + } + + activityExternalPayloadSize := int64(2048) + activityInputPayload := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + { + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: activityExternalPayloadSize}, + }, + }, + }, + } + + we, err0 := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.NewString(), + Namespace: s.Namespace().String(), + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: workflowInputPayload, + WorkflowRunTimeout: durationpb.New(100 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + Identity: identity, + }) + s.NoError(err0) + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId)) + + // Workflow handler - schedules activity on first task, completes on second task + isFirstTaskProcessed := false + workflowComplete := false + wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + if !isFirstTaskProcessed { + isFirstTaskProcessed = true + // Schedule an activity + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "activity1", + ActivityType: &commonpb.ActivityType{Name: "TestActivity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: activityInputPayload, + ScheduleToCloseTimeout: durationpb.New(100 * time.Second), + ScheduleToStartTimeout: durationpb.New(100 * time.Second), + StartToCloseTimeout: durationpb.New(50 * time.Second), + HeartbeatTimeout: durationpb.New(5 * time.Second), + }, + }, + }}, nil + } + workflowComplete = true + // Complete workflow after activity + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }, + }, + }}, nil + } + + // activity handler + atHandler := func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error) { + return payloads.EncodeString("Activity Result"), false, nil + } + poller := &testcore.TaskPoller{ + Client: s.FrontendClient(), + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: identity, + WorkflowTaskHandler: wtHandler, + ActivityTaskHandler: atHandler, + Logger: s.Logger, + T: s.T(), + } + + // Process first workflow task to schedule activities + _, err := poller.PollAndProcessWorkflowTask() + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + + // Process one activity task which also creates second workflow task + err = poller.PollAndProcessActivityTask(false) + s.Logger.Info("Poll and process first activity", tag.Error(err)) + s.NoError(err) + + // Process second workflow task which checks activity completion + _, err = poller.PollAndProcessWorkflowTask() + s.Logger.Info("Poll and process second workflow task", tag.Error(err)) + s.NoError(err) + + s.True(workflowComplete) + + mutableState := s.GetDatabaseMutableState(s.Namespace().String(), workflowID, we.GetRunId()) + executionStats := mutableState.GetExecutionInfo().GetExecutionStats() + s.NotNil(executionStats) + s.Equal(int64(2), executionStats.GetExternalPayloadCount()) + s.Equal(workflowExternalPayloadSize+activityExternalPayloadSize, executionStats.GetExternalPayloadSize()) + + // Get history to find reset point (first completed workflow task) + events := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.GetRunId(), + }) + + var resetToEventID int64 + for _, event := range events { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + resetToEventID = event.GetEventId() + break + } + } + s.Greater(resetToEventID, int64(0), "Should have found first completed workflow task") + + resetResp, err := s.FrontendClient().ResetWorkflowExecution(testcore.NewContext(), &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.RunId, + }, + Reason: "reset execution from test", + WorkflowTaskFinishEventId: resetToEventID, + RequestId: uuid.NewString(), + }) + s.NoError(err) + s.Logger.Info("Workflow reset complete", tag.WorkflowRunID(resetResp.GetRunId()), tag.NewInt64("ResetToEventID", resetToEventID)) + + // Verify external payload stats after reset + resetMutableState := s.GetDatabaseMutableState(s.Namespace().String(), workflowID, resetResp.GetRunId()) + resetExecutionStats := resetMutableState.GetExecutionInfo().GetExecutionStats() + + s.NotNil(resetExecutionStats) + s.Equal(int64(1), resetExecutionStats.GetExternalPayloadCount()) + s.Equal(workflowExternalPayloadSize, resetExecutionStats.GetExternalPayloadSize()) +} diff --git a/tests/testcore/functional_test_base.go b/tests/testcore/functional_test_base.go index a6eba38078..f055b23c20 100644 --- a/tests/testcore/functional_test_base.go +++ b/tests/testcore/functional_test_base.go @@ -657,3 +657,15 @@ func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.Workf return err } + +func (s *FunctionalTestBase) GetDatabaseMutableState(namespace string, workflowID string, runId string) *persistencespb.WorkflowMutableState { + describeMSResp, err := s.AdminClient().DescribeMutableState(NewContext(), &adminservice.DescribeMutableStateRequest{ + Namespace: namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runId, + }, + }) + s.NoError(err) + return describeMSResp.GetDatabaseMutableState() +}