Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ message WorkflowExecutionInfo {
// It increases monotonically when the workflow's options are modified.
// It is used to check if a workflow task is still relevant to the corresponding workflow state machine.
int32 workflow_task_stamp = 109;
// AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task.
// This is carried over when buffered events are applied after workflow task failures.
// Used by the TemporalReportedProblems search attribute to track continuous failure count.
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: "since" is needed here. --)
int32 workflow_task_attempts_since_last_success = 110;

bool cancel_requested = 29;
string cancel_request_id = 32;
Expand Down
11 changes: 10 additions & 1 deletion service/history/interfaces/workflow_task_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ type WorkflowTaskInfo struct {
WorkflowTaskTimeout time.Duration
// This is only needed to communicate task queue used after AddWorkflowTaskScheduledEvent.
TaskQueue *taskqueuepb.TaskQueue
Attempt int32

// Attempt is the number of attempts for this workflow task.
Attempt int32
// AttemptsSinceLastSuccess is the number of attempts since the last successful workflow task.
// This is used by the `TemporalReportedProblems` search attribute to check latest WFT failure count,
// this will only differ from attempts above when the previous workflow tasks failed and there was a
// buffered event (like a signal or activity finishing) applied to the workflow which causes the
// new workflow task to have an attempt of 1 again.
AttemptsSinceLastSuccess int32

// Scheduled and Started timestamps are useful for transient workflow task: when transient workflow task finally completes,
// use these Timestamp to create scheduled/started events.
// Also used for recording latency metrics
Expand Down
19 changes: 10 additions & 9 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8444,15 +8444,16 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
var workflowTaskVersionUpdated bool
if transitionhistory.Compare(current.WorkflowTaskLastUpdateVersionedTransition, incoming.WorkflowTaskLastUpdateVersionedTransition) != 0 {
ms.workflowTaskManager.UpdateWorkflowTask(&historyi.WorkflowTaskInfo{
Version: incoming.WorkflowTaskVersion,
ScheduledEventID: incoming.WorkflowTaskScheduledEventId,
StartedEventID: incoming.WorkflowTaskStartedEventId,
RequestID: incoming.WorkflowTaskRequestId,
WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(),
Attempt: incoming.WorkflowTaskAttempt,
Stamp: incoming.WorkflowTaskStamp,
StartedTime: incoming.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(),
Version: incoming.WorkflowTaskVersion,
ScheduledEventID: incoming.WorkflowTaskScheduledEventId,
StartedEventID: incoming.WorkflowTaskStartedEventId,
RequestID: incoming.WorkflowTaskRequestId,
WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(),
Attempt: incoming.WorkflowTaskAttempt,
AttemptsSinceLastSuccess: incoming.WorkflowTaskAttemptsSinceLastSuccess,
Stamp: incoming.WorkflowTaskStamp,
StartedTime: incoming.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(),

OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(),
Type: incoming.WorkflowTaskType,
Expand Down
123 changes: 67 additions & 56 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,21 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskScheduledEvent(
}

workflowTask := &historyi.WorkflowTaskInfo{
Version: version,
ScheduledEventID: scheduledEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: startToCloseTimeout.AsDuration(),
TaskQueue: taskQueue,
Attempt: attempt,
ScheduledTime: scheduledTime.AsTime(),
StartedTime: time.Time{},
OriginalScheduledTime: originalScheduledTimestamp.AsTime(),
Type: workflowTaskType,
SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started
HistorySizeBytes: 0, // reset, will be recomputed on workflow task started
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
Version: version,
ScheduledEventID: scheduledEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: startToCloseTimeout.AsDuration(),
TaskQueue: taskQueue,
Attempt: attempt,
AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess,
ScheduledTime: scheduledTime.AsTime(),
StartedTime: time.Time{},
OriginalScheduledTime: originalScheduledTimestamp.AsTime(),
Type: workflowTaskType,
SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started
HistorySizeBytes: 0, // reset, will be recomputed on workflow task started
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
}

m.retainWorkflowTaskBuildIdInfo(workflowTask)
Expand Down Expand Up @@ -145,14 +146,15 @@ func (m *workflowTaskStateMachine) ApplyTransientWorkflowTaskScheduled() (*histo
WorkflowTaskTimeout: m.ms.GetExecutionInfo().DefaultWorkflowTaskTimeout.AsDuration(),
// Task queue is always normal (not sticky) because transient workflow task is created only for
// failed/timed out workflow task and fail/timeout clears sticky task queue.
TaskQueue: m.ms.CurrentTaskQueue(),
Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt,
ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(),
StartedTime: time.Unix(0, 0).UTC(),
Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started
HistorySizeBytes: 0, // reset, will be recomputed on workflow task started
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
TaskQueue: m.ms.CurrentTaskQueue(),
Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt,
AttemptsSinceLastSuccess: m.ms.GetExecutionInfo().WorkflowTaskAttemptsSinceLastSuccess,
ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(),
StartedTime: time.Unix(0, 0).UTC(),
Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started
HistorySizeBytes: 0, // reset, will be recomputed on workflow task started
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
}

m.retainWorkflowTaskBuildIdInfo(workflowTask)
Expand Down Expand Up @@ -195,21 +197,22 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent(
}

workflowTask = &historyi.WorkflowTaskInfo{
Version: version,
ScheduledEventID: scheduledEventID,
StartedEventID: startedEventID,
RequestID: requestID,
WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout,
Attempt: workflowTask.Attempt,
StartedTime: startedTime,
ScheduledTime: workflowTask.ScheduledTime,
TaskQueue: workflowTask.TaskQueue,
OriginalScheduledTime: workflowTask.OriginalScheduledTime,
Type: workflowTask.Type,
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
BuildIdRedirectCounter: redirectCounter,
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
Version: version,
ScheduledEventID: scheduledEventID,
StartedEventID: startedEventID,
RequestID: requestID,
WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout,
Attempt: workflowTask.Attempt,
AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess,
StartedTime: startedTime,
ScheduledTime: workflowTask.ScheduledTime,
TaskQueue: workflowTask.TaskQueue,
OriginalScheduledTime: workflowTask.OriginalScheduledTime,
Type: workflowTask.Type,
SuggestContinueAsNew: suggestContinueAsNew,
HistorySizeBytes: historySizeBytes,
BuildIdRedirectCounter: redirectCounter,
Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(),
}

if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" {
Expand Down Expand Up @@ -924,34 +927,40 @@ func (m *workflowTaskStateMachine) failWorkflowTask(
m.ms.ClearStickyTaskQueue()
}

failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: time.Duration(0),
StartedTime: time.Unix(0, 0).UTC(),
TaskQueue: nil,
OriginalScheduledTime: time.Unix(0, 0).UTC(),
Attempt: 1,
Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED,
SuggestContinueAsNew: false,
HistorySizeBytes: 0,
// need to retain Build ID of failed WF task to compare it with the build ID of next attempt
BuildId: m.ms.executionInfo.WorkflowTaskBuildId,
}
newAttemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess
newAttempt := int32(1)
if incrementAttempt {
failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1
failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC()
// Increment the failure counter for this WFT failure
newAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1
// Also increment Attempt for transient workflow task tracking
newAttempt = m.ms.executionInfo.WorkflowTaskAttempt + 1
if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() {
m.ms.executionInfo.WorkflowTaskStamp += 1
}
}

failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: time.Duration(0),
StartedTime: time.Unix(0, 0).UTC(),
TaskQueue: nil,
OriginalScheduledTime: time.Unix(0, 0).UTC(),
Attempt: newAttempt,
AttemptsSinceLastSuccess: newAttemptsSinceLastSuccess,
Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED,
SuggestContinueAsNew: false,
HistorySizeBytes: 0,
// need to retain Build ID of failed WF task to compare it with the build ID of next attempt
BuildId: m.ms.executionInfo.WorkflowTaskBuildId,
}
m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo)
m.UpdateWorkflowTask(failWorkflowTaskInfo)

consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String())
if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.Attempt >= int32(consecutiveFailuresRequired) {
if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.AttemptsSinceLastSuccess >= int32(consecutiveFailuresRequired) {
if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil {
return err
}
Expand Down Expand Up @@ -1016,6 +1025,7 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask(
m.ms.executionInfo.WorkflowTaskRequestId = workflowTask.RequestID
m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout)
m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt
m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess
if !workflowTask.StartedTime.IsZero() {
m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime)
}
Expand Down Expand Up @@ -1141,6 +1151,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskI
RequestID: m.ms.executionInfo.WorkflowTaskRequestId,
WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(),
Attempt: m.ms.executionInfo.WorkflowTaskAttempt,
AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess,
StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(),
TaskQueue: m.ms.CurrentTaskQueue(),
Expand Down
Loading
Loading