Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// ---------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ---------------------------------------------------------------

namespace DurableTask.Core.Exceptions
{
using System;

/// <summary>
/// Represents a work item that is poisoned and should not be retried.
/// </summary>
public class WorkItemPoisonedException : Exception
{
/// <summary>
/// Represents a work item that is poisoned and should not be retried.
/// </summary>
public WorkItemPoisonedException(
string message = "Work item is poisoned",
Exception innerException = null
) : base(message, innerException)
{
}
}
}
2 changes: 2 additions & 0 deletions src/DurableTask.Core/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static class EventIds
public const int EntityBatchExecuted = 56;
public const int EntityLockAcquired = 57;
public const int EntityLockReleased = 58;
public const int OrchestrationPoisoned = 59;

public const int TaskActivityStarting = 60;
public const int TaskActivityCompleted = 61;
Expand All @@ -60,6 +61,7 @@ static class EventIds
public const int RenewActivityMessageStarting = 65;
public const int RenewActivityMessageCompleted = 66;
public const int RenewActivityMessageFailed = 67;
public const int TaskActivityPoisoned = 68;

public const int SuspendingInstance = 68;
public const int ResumingInstance = 69;
Expand Down
87 changes: 86 additions & 1 deletion src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ----------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1137,6 +1137,45 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}

/// <summary>
/// Log event representing an orchestration becoming poisoned and being marked as failed.
/// </summary>
internal class OrchestrationPoisoned : StructuredLogEvent, IEventSourceEvent
{
public OrchestrationPoisoned(OrchestrationInstance instance, string reason)
{
this.InstanceId = instance.InstanceId;
this.ExecutionId = instance.ExecutionId;
this.Details = reason;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.OrchestrationPoisoned,
nameof(EventIds.OrchestrationPoisoned));

public override LogLevel Level => LogLevel.Warning;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Orchestration execution is poisoned and was marked as failed: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.OrchestrationPoisoned(
this.InstanceId,
this.ExecutionId,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

/// <summary>
/// Log event representing the discarding of an orchestration message that cannot be processed.
/// </summary>
Expand Down Expand Up @@ -1600,6 +1639,52 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}

internal class TaskActivityPoisoned : StructuredLogEvent, IEventSourceEvent
{
public TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details)
{
this.InstanceId = instance.InstanceId;
this.ExecutionId = instance.ExecutionId;
this.Name = taskEvent.Name;
this.TaskEventId = taskEvent.EventId;
this.Details = details;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string Name { get; }

[StructuredLogField]
public int TaskEventId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.TaskActivityPoisoned,
nameof(EventIds.TaskActivityPoisoned));

public override LogLevel Level => LogLevel.Warning;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Task activity {GetEventDescription(this.Name, this.TaskEventId)} is poisoned and was marked as failed: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.TaskActivityPoisoned(
this.InstanceId,
this.ExecutionId,
this.Name,
this.TaskEventId,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

internal class TaskActivityDispatcherError : StructuredLogEvent, IEventSourceEvent
{
public TaskActivityDispatcherError(TaskActivityWorkItem workItem, string details)
Expand Down
32 changes: 32 additions & 0 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.Core.Logging
{
Expand All @@ -34,6 +35,7 @@ public LogHelper(ILogger? log)
bool IsStructuredLoggingEnabled => this.log != null;

#region TaskHubWorker

/// <summary>
/// Logs that a <see cref="TaskHubWorker"/> is starting.
/// </summary>
Expand Down Expand Up @@ -79,6 +81,7 @@ internal void TaskHubWorkerStopped(TimeSpan latency)
this.WriteStructuredLog(new LogEvents.TaskHubWorkerStopped(latency));
}
}

#endregion

#region WorkItemDispatcher traces
Expand Down Expand Up @@ -497,6 +500,19 @@ internal void OrchestrationAborted(OrchestrationInstance instance, string reason
}
}

/// <summary>
/// Logs a warning indicating that the activity execution is poisoned and was canceled.
/// </summary>
/// <param name="instance">The orchestration instance that failed.</param>
/// <param name="reason">The reason for the orchestration execution becoming poisoned.</param>
internal void OrchestrationPoisoned(OrchestrationInstance instance, string reason)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.OrchestrationPoisoned(instance, reason));
}
}

/// <summary>
/// Helper method for logging the dropping of all messages associated with the specified work item.
/// </summary>
Expand Down Expand Up @@ -616,6 +632,7 @@ internal void EntityLockReleased(string entityId, Core.Entities.EventFormat.Rele
#endregion

#region Activity dispatcher

/// <summary>
/// Logs that a task activity is about to begin execution.
/// </summary>
Expand Down Expand Up @@ -678,6 +695,20 @@ internal void TaskActivityAborted(OrchestrationInstance instance, TaskScheduledE
}
}

/// <summary>
/// Logs a warning indicating that the activity execution is poisoned and was canceled.
/// </summary>
/// <param name="instance">The orchestration instance that scheduled this task activity.</param>
/// <param name="taskEvent">The history event associated with this activity execution.</param>
/// <param name="details">More information about why the execution failed.</param>
internal void TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.TaskActivityPoisoned(instance, taskEvent, details));
}
}

/// <summary>
/// Logs that an error occurred when attempting to dispatch an activity work item.
/// </summary>
Expand Down Expand Up @@ -728,6 +759,7 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio
this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception);
}
}

#endregion

internal void OrchestrationDebugTrace(string instanceId, string executionId, string details)
Expand Down
48 changes: 47 additions & 1 deletion src/DurableTask.Core/Logging/StructuredEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,26 @@ internal void OrchestrationAborted(
}
}

[Event(EventIds.OrchestrationPoisoned, Level = EventLevel.Warning, Version = 1)]
internal void OrchestrationPoisoned(
string InstanceId,
string ExecutionId,
string Details,
string AppName,
string ExtensionVersion)
{
if (this.IsEnabled(EventLevel.Warning))
{
this.WriteEvent(
EventIds.OrchestrationPoisoned,
InstanceId,
ExecutionId,
Details,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check for null here? Passing null to WriteEvent for any parameter will cause the entire log event to be lost. If I'm reading the code correctly, this comes from the WorkItemPoisonedException.Message property, which doesn't have any guards against null values.

AppName,
ExtensionVersion);
}
}

[Event(EventIds.DiscardingMessage, Level = EventLevel.Warning, Version = 1)]
internal void DiscardingMessage(
string InstanceId,
Expand Down Expand Up @@ -712,7 +732,7 @@ internal void EntityLockReleased(
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.EntityLockReleased,
EntityId,
EntityId,
InstanceId,
Id,
AppName,
Expand Down Expand Up @@ -818,6 +838,32 @@ internal void TaskActivityAborted(
}
}

[Event(EventIds.TaskActivityPoisoned, Level = EventLevel.Warning, Version = 1)]
internal void TaskActivityPoisoned(
string InstanceId,
string ExecutionId,
string Name,
int TaskEventId,
string Details,
string AppName,
string ExtensionVersion
)
{
if (this.IsEnabled(EventLevel.Warning))
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.TaskActivityPoisoned,
InstanceId,
ExecutionId,
Name,
TaskEventId,
Details,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about null checks.

AppName,
ExtensionVersion);
}
}

[Event(EventIds.TaskActivityDispatcherError, Level = EventLevel.Error, Version = 1)]
internal void TaskActivityDispatcherError(
string InstanceId,
Expand Down
21 changes: 17 additions & 4 deletions src/DurableTask.Core/OrchestrationRuntimeState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ public OrchestrationRuntimeState(IList<HistoryEvent>? events)
}
}

/// <summary>
/// Returns a deep copy of the object.
/// </summary>
/// <returns>Cloned object</returns>
public OrchestrationRuntimeState Clone()
{
return new OrchestrationRuntimeState(this.Events)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is technically a deep clone operation. The state of the two objects may potentially be different. For example, if the object being cloned as a set of history events in both the NewEvents and PastEvents lists, then the cloned object will instead have all the history event objects only in the PastEvents list.

I suggest either we carefully document that this is not an exact clone or give this method a different name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you reckon the issue is with the description/comment only, or should the behavior be changed too?

Copy link
Member

@cgillum cgillum Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. Here's what I think the code would need to be to do a proper deep clone*.

var cloned = new OrchestrationRuntimeState(this.PastEvents)
{
    CompressedSize = this.CompressedSize,
    Size = this.Size,
    Status = this.Status,
};

foreach (HistoryEvent e in this.NewEvents)
{
    cloned.AddEvent(e);
}

return cloned;

*this is not technically a "full deep clone" since we're not cloning the history events and I expect some history events may be mutable. Might be worth mentioning that in the comments.

{
Size = this.Size,
Status = this.Status,
};
}

/// <summary>
/// Gets the execution started event
/// </summary>
Expand Down Expand Up @@ -188,7 +201,7 @@ public OrchestrationStatus OrchestrationStatus
/// <remarks>
/// An invalid orchestration runtime state means that the history is somehow corrupted.
/// </remarks>
public bool IsValid =>
public bool IsValid =>
this.Events.Count == 0 ||
this.Events.Count == 1 && this.Events[0].EventType == EventType.OrchestratorStarted ||
this.ExecutionStartedEvent != null;
Expand Down Expand Up @@ -253,8 +266,8 @@ bool IsDuplicateEvent(HistoryEvent historyEvent)
historyEvent.EventType == EventType.TaskCompleted &&
!completedEventIds.Add(historyEvent.EventId))
{
TraceHelper.Trace(TraceEventType.Warning,
"OrchestrationRuntimeState-DuplicateEvent",
TraceHelper.Trace(TraceEventType.Warning,
"OrchestrationRuntimeState-DuplicateEvent",
"The orchestration '{0}' has already seen a completed task with id {1}.",
this.OrchestrationInstance?.InstanceId ?? "",
historyEvent.EventId);
Expand Down Expand Up @@ -287,7 +300,7 @@ void SetMarkerEvents(HistoryEvent historyEvent)
// It's not generally expected to receive multiple execution completed events for a given orchestrator, but it's possible under certain race conditions.
// For example: when an orchestrator is signaled to terminate at the same time as it attempts to continue-as-new.
var log = $"Received new {completedEvent.GetType().Name} event despite the orchestration being already in the {orchestrationStatus} state.";

if (orchestrationStatus == OrchestrationStatus.ContinuedAsNew && completedEvent.OrchestrationStatus == OrchestrationStatus.Terminated)
{
// If the orchestration planned to continue-as-new but termination is requested, we transition to the terminated state.
Expand Down
24 changes: 20 additions & 4 deletions src/DurableTask.Core/TaskActivityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.Core
{
Expand Down Expand Up @@ -117,13 +118,12 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
if (taskMessage.Event.EventType != EventType.TaskScheduled)
{
this.logHelper.TaskActivityDispatcherError(
workItem,
workItem,
$"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported.");
throw TraceHelper.TraceException(
TraceEventType.Critical,
"TaskActivityDispatcher-UnsupportedEventType",
new NotSupportedException("Activity worker does not support event of type: " +
taskMessage.Event.EventType));
new NotSupportedException("Activity worker does not support event of type: " + taskMessage.Event.EventType));
}

scheduledEvent = (TaskScheduledEvent)taskMessage.Event;
Expand Down Expand Up @@ -268,6 +268,22 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message);
await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
}
catch (WorkItemPoisonedException poisonedException) when (scheduledEvent is not null)
{
// The task activity is poisoned and should be marked as failed
this.logHelper.TaskActivityPoisoned(orchestrationInstance, scheduledEvent!, poisonedException.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionPoisoned", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", poisonedException.Message);
await this.orchestrationService.CompleteTaskActivityWorkItemAsync(
workItem, new TaskMessage()
{
Event = new TaskFailedEvent(
-1,
// Guaranteed to be not null because of the "when" clause in the catch block
scheduledEvent!.EventId,
poisonedException.Message, string.Empty),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to also populate the failureDetails parameter of the TaskFailedEvent constructor since there will be some consumers that use that instead of the (legacy) reason + details values.

OrchestrationInstance = orchestrationInstance,
});
}
finally
{
diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out.
Expand Down Expand Up @@ -349,4 +365,4 @@ DateTime AdjustRenewAt(DateTime renewAt)
return renewAt > maxRenewAt ? maxRenewAt : renewAt;
}
}
}
}
Loading
Loading