diff --git a/go.mod b/go.mod index 90babd6a6f..62507af744 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5 + go.temporal.io/api v1.59.0 go.temporal.io/sdk v1.35.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 @@ -77,8 +77,10 @@ require ( modernc.org/sqlite v1.39.1 ) +require github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect + require ( - cel.dev/expr v0.20.0 // indirect + cel.dev/expr v0.23.1 // indirect cloud.google.com/go v0.118.3 // indirect; indirect e cloud.google.com/go/auth v0.15.0 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect @@ -114,7 +116,6 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.5 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect - github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -170,3 +171,8 @@ require ( modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect ) + +replace ( + go.temporal.io/api => ../api-go + go.temporal.io/sdk => ../sdk-go +) diff --git a/go.sum b/go.sum index e1902b4526..3dbebf30f7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -cel.dev/expr v0.20.0 h1:OunBvVCfvpWlt4dN7zg3FM6TDkzOePe1+foGJ9AXeeI= -cel.dev/expr v0.20.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= +cel.dev/expr v0.23.1 h1:K4KOtPCJQjVggkARsjG9RWXP6O4R73aHeJMa/dmCQQg= +cel.dev/expr v0.23.1/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.118.3 h1:jsypSnrE/w4mJysioGdMBg4MiW/hHx/sArFpaBWHdME= cloud.google.com/go v0.118.3/go.mod h1:Lhs3YLnBlwJ4KA6nuObNMZ/fCbOQBPuWKPoE0Wa/9Vc= @@ -390,10 +390,6 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5 h1:7lFIrLVM+NPVcqFMrEwv5d8D9meA7n/Xl9GtCl8Gyhc= -go.temporal.io/api v1.59.1-0.20251203230651-7773526824c5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= -go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ= -go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index 58366925c8..474057a87e 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -135,6 +135,8 @@ func Invoke( VersioningInfo: common.CloneProto(executionInfo.VersioningInfo), WorkerDeploymentName: executionInfo.WorkerDeploymentName, Priority: executionInfo.Priority, + ExternalPayloadSizeBytes: executionInfo.ExecutionStats.ExternalPayloadSize, + ExternalPayloadCount: executionInfo.ExecutionStats.ExternalPayloadCount, }, WorkflowExtendedInfo: &workflowpb.WorkflowExecutionExtendedInfo{ ExecutionExpirationTime: executionInfo.WorkflowExecutionExpirationTime, diff --git a/tests/gethistory_test.go b/tests/gethistory_test.go index 97f545904e..d4292c0574 100644 --- a/tests/gethistory_test.go +++ b/tests/gethistory_test.go @@ -815,3 +815,175 @@ func (s *RawHistoryClientSuite) getHistoryReverse(namespace string, execution *c return events } + +func (s *GetHistoryFunctionalSuite) TestGetWorkflowExecutionHistory_ExternalPayloadStats() { + workflowID := "functional-history-external-payload-stats-test" + workflowType := "functional-history-external-payload-stats-test-type" + taskQueue := "functional-history-external-payload-stats-test-taskqueue" + identity := "worker1" + + workflowExternalPayloadSize := int64(1024) + workflowInputPayload := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{{ + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: workflowExternalPayloadSize}, + }, + }}, + } + + activity1ExternalPayloadSize := int64(2048) + activity1InputPayload := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{{ + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: activity1ExternalPayloadSize}, + }, + }}, + } + + activity2ExternalPayloadSize := int64(4096) + activity2InputPayload := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{{ + ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ + {SizeBytes: activity2ExternalPayloadSize}, + }, + }}, + } + + we, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.NewString(), + Namespace: s.Namespace().String(), + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: workflowInputPayload, + WorkflowRunTimeout: durationpb.New(100 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + Identity: identity, + }) + s.NoError(err) + + completedActivities := 0 + wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + if completedActivities == 0 { + completedActivities++ + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "activity1", + ActivityType: &commonpb.ActivityType{Name: "TestActivity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: activity1InputPayload, + ScheduleToCloseTimeout: durationpb.New(100 * time.Second), + ScheduleToStartTimeout: durationpb.New(100 * time.Second), + StartToCloseTimeout: durationpb.New(50 * time.Second), + HeartbeatTimeout: durationpb.New(5 * time.Second), + }, + }, + }}, nil + } + if completedActivities == 1 { + completedActivities++ + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "activity2", + ActivityType: &commonpb.ActivityType{Name: "TestActivity"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: activity2InputPayload, + ScheduleToCloseTimeout: durationpb.New(100 * time.Second), + ScheduleToStartTimeout: durationpb.New(100 * time.Second), + StartToCloseTimeout: durationpb.New(50 * time.Second), + HeartbeatTimeout: durationpb.New(5 * time.Second), + }, + }, + }}, nil + } + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }, + }, + }}, nil + } + + atHandler := func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error) { + return payloads.EncodeString("Activity Result"), false, nil + } + + poller := &testcore.TaskPoller{ + Client: s.FrontendClient(), + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: identity, + WorkflowTaskHandler: wtHandler, + ActivityTaskHandler: atHandler, + Logger: s.Logger, + T: s.T(), + } + + // Process first workflow task (schedules activity1) + _, err = poller.PollAndProcessWorkflowTask() + s.NoError(err) + + // Process activity1 + err = poller.PollAndProcessActivityTask(false) + s.NoError(err) + + // Process second workflow task (schedules activity2) + _, err = poller.PollAndProcessWorkflowTask() + s.NoError(err) + + // Process activity2 + err = poller.PollAndProcessActivityTask(false) + s.NoError(err) + + // Process third workflow task (completes workflow) + _, err = poller.PollAndProcessWorkflowTask() + s.NoError(err) + + // Get history and verify WorkflowTaskStartedEventAttributes + events := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.GetRunId(), + }) + + // Collect WorkflowTaskStarted events and verify external payload stats + var workflowTaskStartedEvents []*historypb.HistoryEvent + for _, event := range events { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED { + workflowTaskStartedEvents = append(workflowTaskStartedEvents, event) + } + } + s.Len(workflowTaskStartedEvents, 3) + + // First WFT started: only workflow input external payload + attrs1 := workflowTaskStartedEvents[0].GetWorkflowTaskStartedEventAttributes() + s.Equal(int64(1), attrs1.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize, attrs1.ExternalPayloadSizeBytes) + + // Second WFT started: workflow input + activity1 input + attrs2 := workflowTaskStartedEvents[1].GetWorkflowTaskStartedEventAttributes() + s.Equal(int64(2), attrs2.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize, attrs2.ExternalPayloadSizeBytes) + + // Third WFT started: workflow input + activity1 input + activity2 input + attrs3 := workflowTaskStartedEvents[2].GetWorkflowTaskStartedEventAttributes() + s.Equal(int64(3), attrs3.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize+activity2ExternalPayloadSize, attrs3.ExternalPayloadSizeBytes) + + // Verify DescribeWorkflowExecution returns correct values + descResp, err := s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.GetRunId(), + }, + }) + s.NoError(err) + s.Equal(int64(3), descResp.WorkflowExecutionInfo.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize+activity1ExternalPayloadSize+activity2ExternalPayloadSize, descResp.WorkflowExecutionInfo.ExternalPayloadSizeBytes) +} diff --git a/tests/reset_workflow_test.go b/tests/reset_workflow_test.go index 506f015a41..fb69c780db 100644 --- a/tests/reset_workflow_test.go +++ b/tests/reset_workflow_test.go @@ -1104,11 +1104,16 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() { s.True(workflowComplete) - mutableState := s.GetDatabaseMutableState(s.Namespace().String(), workflowID, we.GetRunId()) - executionStats := mutableState.GetExecutionInfo().GetExecutionStats() - s.NotNil(executionStats) - s.Equal(int64(2), executionStats.GetExternalPayloadCount()) - s.Equal(workflowExternalPayloadSize+activityExternalPayloadSize, executionStats.GetExternalPayloadSize()) + descResp, descErr := s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: we.GetRunId(), + }, + }) + s.NoError(descErr) + s.Equal(int64(2), descResp.WorkflowExecutionInfo.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize+activityExternalPayloadSize, descResp.WorkflowExecutionInfo.ExternalPayloadSizeBytes) // Get history to find reset point (first completed workflow task) events := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{ @@ -1138,11 +1143,17 @@ func (s *ResetWorkflowTestSuite) TestResetWorkflowWithExternalPayloads() { s.NoError(err) s.Logger.Info("Workflow reset complete", tag.WorkflowRunID(resetResp.GetRunId()), tag.NewInt64("ResetToEventID", resetToEventID)) - // Verify external payload stats after reset - resetMutableState := s.GetDatabaseMutableState(s.Namespace().String(), workflowID, resetResp.GetRunId()) - resetExecutionStats := resetMutableState.GetExecutionInfo().GetExecutionStats() + descResp, descErr = s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: resetResp.GetRunId(), + }, + }) + s.NoError(descErr) - s.NotNil(resetExecutionStats) - s.Equal(int64(1), resetExecutionStats.GetExternalPayloadCount()) - s.Equal(workflowExternalPayloadSize, resetExecutionStats.GetExternalPayloadSize()) + // Verify external payload stats after reset + s.NotNil(descResp.WorkflowExecutionInfo.ExternalPayloadCount) + s.Equal(int64(1), descResp.WorkflowExecutionInfo.ExternalPayloadCount) + s.Equal(workflowExternalPayloadSize, descResp.WorkflowExecutionInfo.ExternalPayloadSizeBytes) } diff --git a/tests/testcore/functional_test_base.go b/tests/testcore/functional_test_base.go index f055b23c20..a6eba38078 100644 --- a/tests/testcore/functional_test_base.go +++ b/tests/testcore/functional_test_base.go @@ -657,15 +657,3 @@ func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.Workf return err } - -func (s *FunctionalTestBase) GetDatabaseMutableState(namespace string, workflowID string, runId string) *persistencespb.WorkflowMutableState { - describeMSResp, err := s.AdminClient().DescribeMutableState(NewContext(), &adminservice.DescribeMutableStateRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runId, - }, - }) - s.NoError(err) - return describeMSResp.GetDatabaseMutableState() -}