Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.58.1-0.20251128181858-703071215042
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5
go.temporal.io/sdk v1.35.0
go.uber.org/fx v1.24.0
go.uber.org/mock v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.58.1-0.20251128181858-703071215042 h1:44+nPe+rGhYUwA1oDi46rkXEYEVfoAxOmb0myvTm4Es=
go.temporal.io/api v1.58.1-0.20251128181858-703071215042/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5 h1:7lFIrLVM+NPVcqFMrEwv5d8D9meA7n/Xl9GtCl8Gyhc=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ message WorkflowExecutionInfo {

message ExecutionStats {
int64 history_size = 1;
// Total size in bytes of all external payloads referenced in the workflow history.
int64 external_payload_size = 2;
// Total count of external payloads referenced in the workflow history.
int64 external_payload_count = 3;
}

// execution_state column
Expand Down
6 changes: 6 additions & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ type (
GetHistorySize() int64
AddHistorySize(size int64)

GetExternalPayloadSize() int64
AddExternalPayloadSize(size int64)

GetExternalPayloadCount() int64
AddExternalPayloadCount(count int64)

AddTasks(tasks ...tasks.Task)
PopTasks() map[tasks.Category][]tasks.Task
DeleteCHASMPureTasks(maxScheduledTime time.Time)
Expand Down
52 changes: 52 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions service/history/ndc/conflict_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (r *ConflictResolverImpl) rebuild(
executionState.RunId,
)
historySize := r.mutableState.GetHistorySize()
externalPayloadSize := r.mutableState.GetExternalPayloadSize()
externalPayloadCount := r.mutableState.GetExternalPayloadCount()

rebuildMutableState, _, err := r.stateRebuilder.Rebuild(
ctx,
Expand Down Expand Up @@ -173,6 +175,8 @@ func (r *ConflictResolverImpl) rebuild(
}
rebuildMutableState.GetExecutionInfo().VersionHistories = versionHistories
rebuildMutableState.AddHistorySize(historySize)
rebuildMutableState.AddExternalPayloadSize(externalPayloadSize)
rebuildMutableState.AddExternalPayloadCount(externalPayloadCount)
// set the update condition from original mutable state
rebuildMutableState.SetUpdateCondition(r.mutableState.GetUpdateCondition())

Expand Down
36 changes: 33 additions & 3 deletions service/history/ndc/conflict_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (s *conflictResolverSuite) TestRebuild() {
requestID := uuid.NewString()
version := int64(12)
historySize := int64(12345)
externalPayloadSize := int64(6789)
externalPayloadCount := int64(42)

branchToken0 := []byte("some random branch token")
lastEventID0 := int64(5)
Expand Down Expand Up @@ -116,6 +118,8 @@ func (s *conflictResolverSuite) TestRebuild() {
RunId: s.runID,
}).AnyTimes()
s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes()
s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes()
s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes()

workflowKey := definition.NewWorkflowKey(
s.namespaceID,
Expand All @@ -134,6 +138,8 @@ func (s *conflictResolverSuite) TestRebuild() {
},
).AnyTimes()
mockRebuildMutableState.EXPECT().AddHistorySize(historySize)
mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize)
mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount)
mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion)

s.mockStateBuilder.EXPECT().Rebuild(
Expand All @@ -146,7 +152,11 @@ func (s *conflictResolverSuite) TestRebuild() {
workflowKey,
branchToken1,
requestID,
).Return(mockRebuildMutableState, rand.Int63(), nil)
).Return(mockRebuildMutableState, RebuildStats{
HistorySize: rand.Int63(),
ExternalPayloadSize: rand.Int63(),
ExternalPayloadCount: rand.Int63(),
}, nil)

s.mockContext.EXPECT().Clear()
rebuiltMutableState, err := s.nDCConflictResolver.rebuild(ctx, 1, requestID)
Expand Down Expand Up @@ -242,6 +252,10 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() {
RunId: s.runID,
}).AnyTimes()
s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes()
externalPayloadSize := int64(6789)
externalPayloadCount := int64(42)
s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes()
s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes()

workflowKey := definition.NewWorkflowKey(
s.namespaceID,
Expand All @@ -260,6 +274,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() {
},
).AnyTimes()
mockRebuildMutableState.EXPECT().AddHistorySize(historySize)
mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize)
mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount)
mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion)

s.mockStateBuilder.EXPECT().Rebuild(
Expand All @@ -272,7 +288,11 @@ func (s *conflictResolverSuite) TestGetOrRebuildCurrentMutableState_Rebuild() {
workflowKey,
branchToken1,
gomock.Any(),
).Return(mockRebuildMutableState, rand.Int63(), nil)
).Return(mockRebuildMutableState, RebuildStats{
HistorySize: rand.Int63(),
ExternalPayloadSize: rand.Int63(),
ExternalPayloadCount: rand.Int63(),
}, nil)

s.mockContext.EXPECT().Clear()
rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.GetOrRebuildCurrentMutableState(ctx, 1, incomingVersion)
Expand Down Expand Up @@ -305,6 +325,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() {
dbVersion := int64(1444)
version := int64(12)
historySize := int64(12345)
externalPayloadSize := int64(6789)
externalPayloadCount := int64(42)

// current branch
branchToken0 := []byte("some random branch token")
Expand Down Expand Up @@ -339,6 +361,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() {
RunId: s.runID,
}).AnyTimes()
s.mockMutableState.EXPECT().GetHistorySize().Return(historySize).AnyTimes()
s.mockMutableState.EXPECT().GetExternalPayloadSize().Return(externalPayloadSize).AnyTimes()
s.mockMutableState.EXPECT().GetExternalPayloadCount().Return(externalPayloadCount).AnyTimes()

workflowKey := definition.NewWorkflowKey(
s.namespaceID,
Expand All @@ -357,6 +381,8 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() {
},
).AnyTimes()
mockRebuildMutableState.EXPECT().AddHistorySize(historySize)
mockRebuildMutableState.EXPECT().AddExternalPayloadSize(externalPayloadSize)
mockRebuildMutableState.EXPECT().AddExternalPayloadCount(externalPayloadCount)
mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion)

s.mockStateBuilder.EXPECT().Rebuild(
Expand All @@ -369,7 +395,11 @@ func (s *conflictResolverSuite) TestGetOrRebuildMutableState_Rebuild() {
workflowKey,
branchToken1,
gomock.Any(),
).Return(mockRebuildMutableState, rand.Int63(), nil)
).Return(mockRebuildMutableState, RebuildStats{
HistorySize: rand.Int63(),
ExternalPayloadSize: rand.Int63(),
ExternalPayloadCount: rand.Int63(),
}, nil)

s.mockContext.EXPECT().Clear()
rebuiltMutableState, isRebuilt, err := s.nDCConflictResolver.GetOrRebuildMutableState(ctx, 1)
Expand Down
6 changes: 4 additions & 2 deletions service/history/ndc/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *resetterImpl) resetWorkflow(
}

requestID := uuid.NewString()
rebuildMutableState, rebuiltHistorySize, err := r.stateRebuilder.Rebuild(
rebuildMutableState, rebuildStats, err := r.stateRebuilder.Rebuild(
ctx,
now,
definition.NewWorkflowKey(
Expand All @@ -127,7 +127,9 @@ func (r *resetterImpl) resetWorkflow(
if err != nil {
return nil, err
}
rebuildMutableState.AddHistorySize(rebuiltHistorySize)
rebuildMutableState.AddHistorySize(rebuildStats.HistorySize)
rebuildMutableState.AddExternalPayloadSize(rebuildStats.ExternalPayloadSize)
rebuildMutableState.AddExternalPayloadCount(rebuildStats.ExternalPayloadCount)

if err := rebuildMutableState.RefreshExpirationTimeoutTask(ctx); err != nil {
return nil, err
Expand Down
12 changes: 9 additions & 3 deletions service/history/ndc/resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ func (s *resetterSuite) TestResetWorkflow_NoError() {
incomingFirstEventID := baseEventID + 12
incomingVersion := baseVersion + 3

rebuiltHistorySize := int64(9999)
rebuildStats := RebuildStats{
HistorySize: 9999,
ExternalPayloadSize: 1234,
ExternalPayloadCount: 56,
}
newBranchToken := []byte("other random branch token")

s.mockBaseMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
Expand Down Expand Up @@ -165,8 +169,10 @@ func (s *resetterSuite) TestResetWorkflow_NoError() {
),
newBranchToken,
gomock.Any(),
).Return(s.mockRebuiltMutableState, rebuiltHistorySize, nil)
s.mockRebuiltMutableState.EXPECT().AddHistorySize(rebuiltHistorySize)
).Return(s.mockRebuiltMutableState, rebuildStats, nil)
s.mockRebuiltMutableState.EXPECT().AddHistorySize(rebuildStats.HistorySize)
s.mockRebuiltMutableState.EXPECT().AddExternalPayloadSize(rebuildStats.ExternalPayloadSize)
s.mockRebuiltMutableState.EXPECT().AddExternalPayloadCount(rebuildStats.ExternalPayloadCount)

shardID := s.mockShard.GetShardID()
s.mockExecManager.EXPECT().ForkHistoryBranch(gomock.Any(), &persistence.ForkHistoryBranchRequest{
Expand Down
Loading
Loading