Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5
go.temporal.io/api v1.59.0
go.temporal.io/sdk v1.35.0
go.uber.org/fx v1.24.0
go.uber.org/mock v0.6.0
Expand All @@ -77,8 +77,10 @@ require (
modernc.org/sqlite v1.39.1
)

require github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect

require (
cel.dev/expr v0.20.0 // indirect
cel.dev/expr v0.23.1 // indirect
cloud.google.com/go v0.118.3 // indirect; indirect e
cloud.google.com/go/auth v0.15.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect
Expand Down Expand Up @@ -114,7 +116,6 @@ require (
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.5 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down Expand Up @@ -170,3 +171,8 @@ require (
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API change is not released yet. Will remove this before merging this PR.

replace (
go.temporal.io/api => ../api-go
go.temporal.io/sdk => ../sdk-go
)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cel.dev/expr v0.20.0 h1:OunBvVCfvpWlt4dN7zg3FM6TDkzOePe1+foGJ9AXeeI=
cel.dev/expr v0.20.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
cel.dev/expr v0.23.1 h1:K4KOtPCJQjVggkARsjG9RWXP6O4R73aHeJMa/dmCQQg=
cel.dev/expr v0.23.1/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.118.3 h1:jsypSnrE/w4mJysioGdMBg4MiW/hHx/sArFpaBWHdME=
cloud.google.com/go v0.118.3/go.mod h1:Lhs3YLnBlwJ4KA6nuObNMZ/fCbOQBPuWKPoE0Wa/9Vc=
Expand Down Expand Up @@ -390,12 +390,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.58.1-0.20251128181858-703071215042 h1:44+nPe+rGhYUwA1oDi46rkXEYEVfoAxOmb0myvTm4Es=
go.temporal.io/api v1.58.1-0.20251128181858-703071215042/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5 h1:7lFIrLVM+NPVcqFMrEwv5d8D9meA7n/Xl9GtCl8Gyhc=
go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func Invoke(
VersioningInfo: common.CloneProto(executionInfo.VersioningInfo),
WorkerDeploymentName: executionInfo.WorkerDeploymentName,
Priority: executionInfo.Priority,
ExternalPayloadSizeBytes: executionInfo.ExecutionStats.ExternalPayloadSize,
ExternalPayloadCount: executionInfo.ExecutionStats.ExternalPayloadCount,
},
WorkflowExtendedInfo: &workflowpb.WorkflowExecutionExtendedInfo{
ExecutionExpirationTime: executionInfo.WorkflowExecutionExpirationTime,
Expand Down
18 changes: 11 additions & 7 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,21 @@ func (b *EventFactory) CreateWorkflowTaskStartedEvent(
historySizeBytes int64,
versioningStamp *commonpb.WorkerVersionStamp,
buildIdRedirectCounter int64,
externalPayloadSizeBytes int64,
externalPayloadCount int64,
) *historypb.HistoryEvent {
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
ScheduledEventId: scheduledEventID,
Identity: identity,
RequestId: requestID,
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
WorkerVersion: versioningStamp,
BuildIdRedirectCounter: buildIdRedirectCounter,
ScheduledEventId: scheduledEventID,
Identity: identity,
RequestId: requestID,
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
WorkerVersion: versioningStamp,
BuildIdRedirectCounter: buildIdRedirectCounter,
ExternalPayloadSizeBytes: externalPayloadSizeBytes,
ExternalPayloadCount: externalPayloadCount,
},
}
return event
Expand Down
4 changes: 4 additions & 0 deletions service/history/historybuilder/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
historySizeBytes int64,
versioningStamp *commonpb.WorkerVersionStamp,
buildIdRedirectCounter int64,
externalPayloadSizeBytes int64,
externalPayloadCount int64,
) *historypb.HistoryEvent {
event := b.EventFactory.CreateWorkflowTaskStartedEvent(
scheduledEventID,
Expand All @@ -207,6 +209,8 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
historySizeBytes,
versioningStamp,
buildIdRedirectCounter,
externalPayloadSizeBytes,
externalPayloadCount,
)
event, _ = b.EventStore.add(event)
return event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ func (s *sutTestingAdapter) AddWorkflowExecutionStartedEvent(_ ...eventConfig) *
}

func (s *sutTestingAdapter) AddWorkflowTaskStartedEvent(_ ...eventConfig) *historypb.HistoryEvent {
return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0)
return s.HistoryBuilder.AddWorkflowTaskStartedEvent(64, "request-1", "identity-1", s.today, false, 100, nil, 0, 0, 0)
}

func (s *sutTestingAdapter) AddWorkflowTaskCompletedEvent(_ ...eventConfig) *historypb.HistoryEvent {
Expand Down
14 changes: 9 additions & 5 deletions service/history/historybuilder/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,8 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() {
123678,
nil,
int64(0),
int64(1234),
int64(12),
)
s.Equal(event, s.flush())
s.Equal(&historypb.HistoryEvent{
Expand All @@ -667,11 +669,13 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() {
Version: s.version,
Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
ScheduledEventId: scheduledEventID,
Identity: testIdentity,
RequestId: testRequestID,
SuggestContinueAsNew: false,
HistorySizeBytes: 123678,
ScheduledEventId: scheduledEventID,
Identity: testIdentity,
RequestId: testRequestID,
SuggestContinueAsNew: false,
HistorySizeBytes: 123678,
ExternalPayloadSizeBytes: 1234,
ExternalPayloadCount: 12,
},
},
}, event)
Expand Down
10 changes: 10 additions & 0 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
historySizeBytes,
versioningStamp,
redirectCounter,
m.ms.GetExternalPayloadSize(),
m.ms.GetExternalPayloadCount(),
)
m.ms.hBuilder.FlushAndCreateNewBatch()
startedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -708,6 +710,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
workflowTask.HistorySizeBytes,
request.WorkerVersionStamp,
workflowTask.BuildIdRedirectCounter,
m.ms.GetExternalPayloadSize(),
m.ms.GetExternalPayloadCount(),
)
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -796,6 +800,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
workflowTask.HistorySizeBytes,
versioningStamp,
workflowTask.BuildIdRedirectCounter,
m.ms.GetExternalPayloadSize(),
m.ms.GetExternalPayloadCount(),
)
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -867,6 +873,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent(
workflowTask.HistorySizeBytes,
nil,
workflowTask.BuildIdRedirectCounter,
m.ms.GetExternalPayloadSize(),
m.ms.GetExternalPayloadCount(),
)
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
Expand Down Expand Up @@ -1390,6 +1398,8 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro
wt.HistorySizeBytes,
nil,
wt.BuildIdRedirectCounter,
m.ms.GetExternalPayloadSize(),
m.ms.GetExternalPayloadCount(),
)
m.ms.hBuilder.FlushAndCreateNewBatch()

Expand Down
172 changes: 172 additions & 0 deletions tests/gethistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,3 +815,175 @@ func (s *RawHistoryClientSuite) getHistoryReverse(namespace string, execution *c

return events
}

func (s *GetHistoryFunctionalSuite) TestGetWorkflowExecutionHistory_ExternalPayloadStats() {
workflowID := "functional-history-external-payload-stats-test"
workflowType := "functional-history-external-payload-stats-test-type"
taskQueue := "functional-history-external-payload-stats-test-taskqueue"
identity := "worker1"

workflowExternalPayloadSize := int64(1024)
workflowInputPayload := &commonpb.Payloads{
Payloads: []*commonpb.Payload{{
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
{SizeBytes: workflowExternalPayloadSize},
},
}},
}

activity1ExternalPayloadSize := int64(2048)
activity1InputPayload := &commonpb.Payloads{
Payloads: []*commonpb.Payload{{
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
{SizeBytes: activity1ExternalPayloadSize},
},
}},
}

activity2ExternalPayloadSize := int64(4096)
activity2InputPayload := &commonpb.Payloads{
Payloads: []*commonpb.Payload{{
ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{
{SizeBytes: activity2ExternalPayloadSize},
},
}},
}

we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.NewString(),
Namespace: s.Namespace().String(),
WorkflowId: workflowID,
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: workflowInputPayload,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
Identity: identity,
})
s.NoError(err)

completedActivities := 0
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if completedActivities == 0 {
completedActivities++
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "activity1",
ActivityType: &commonpb.ActivityType{Name: "TestActivity"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: activity1InputPayload,
ScheduleToCloseTimeout: durationpb.New(100 * time.Second),
ScheduleToStartTimeout: durationpb.New(100 * time.Second),
StartToCloseTimeout: durationpb.New(50 * time.Second),
HeartbeatTimeout: durationpb.New(5 * time.Second),
},
},
}}, nil
}
if completedActivities == 1 {
completedActivities++
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "activity2",
ActivityType: &commonpb.ActivityType{Name: "TestActivity"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: activity2InputPayload,
ScheduleToCloseTimeout: durationpb.New(100 * time.Second),
ScheduleToStartTimeout: durationpb.New(100 * time.Second),
StartToCloseTimeout: durationpb.New(50 * time.Second),
HeartbeatTimeout: durationpb.New(5 * time.Second),
},
},
}}, nil
}
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
},
},
}}, nil
}

atHandler := func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error) {
return payloads.EncodeString("Activity Result"), false, nil
}

poller := &testcore.TaskPoller{
Client: s.FrontendClient(),
Namespace: s.Namespace().String(),
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

// Process first workflow task (schedules activity1)
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)

// Process activity1
err = poller.PollAndProcessActivityTask(false)
s.NoError(err)

// Process second workflow task (schedules activity2)
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)

// Process activity2
err = poller.PollAndProcessActivityTask(false)
s.NoError(err)

// Process third workflow task (completes workflow)
_, err = poller.PollAndProcessWorkflowTask()
s.NoError(err)

// Get history and verify WorkflowTaskStartedEventAttributes
events := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: we.GetRunId(),
})

// Collect WorkflowTaskStarted events and verify external payload stats
var workflowTaskStartedEvents []*historypb.HistoryEvent
for _, event := range events {
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
workflowTaskStartedEvents = append(workflowTaskStartedEvents, event)
}
}
s.Len(workflowTaskStartedEvents, 3)

// First WFT started: only workflow input external payload
attrs1 := workflowTaskStartedEvents[0].GetWorkflowTaskStartedEventAttributes()
s.Equal(int64(1), attrs1.ExternalPayloadCount)
s.Equal(workflowExternalPayloadSize, attrs1.ExternalPayloadSizeBytes)

// Second WFT started: workflow input + activity1 input
attrs2 := workflowTaskStartedEvents[1].GetWorkflowTaskStartedEventAttributes()
s.Equal(int64(2), attrs2.ExternalPayloadCount)
s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize, attrs2.ExternalPayloadSizeBytes)

// Third WFT started: workflow input + activity1 input + activity2 input
attrs3 := workflowTaskStartedEvents[2].GetWorkflowTaskStartedEventAttributes()
s.Equal(int64(3), attrs3.ExternalPayloadCount)
s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize+activity2ExternalPayloadSize, attrs3.ExternalPayloadSizeBytes)

// Verify DescribeWorkflowExecution returns correct values
descResp, err := s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.Namespace().String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: we.GetRunId(),
},
})
s.NoError(err)
s.Equal(int64(3), descResp.WorkflowExecutionInfo.ExternalPayloadCount)
s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize+activity2ExternalPayloadSize, descResp.WorkflowExecutionInfo.ExternalPayloadSizeBytes)
}
Loading
Loading