diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
index 3af54ab89..f1dc2211c 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
@@ -91,6 +91,12 @@ public class AzureStorageOrchestrationServiceSettings
///
public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100;
+ ///
+ /// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message".
+ /// The default value is 20.
+ ///
+ public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
+
///
/// Gets or sets the maximum number of concurrent storage operations that can be executed in the context
/// of a single orchestration instance.
diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
index 9f1c0d2ba..580f349e8 100644
--- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
@@ -106,28 +106,27 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
MessageData messageData;
try
{
+ // Try to de-serialize the message
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
+
+ // If successful, check if it's a poison message. If so, we handle it
+ // and log metadata about it as the de-serialization succeeded.
+ await this.HandleIfPoisonMessageAsync(messageData);
}
- catch (Exception e)
+ catch (Exception exception)
{
- // We have limited information about the details of the message
- // since we failed to deserialize it.
- this.settings.Logger.MessageFailure(
- this.storageAccountName,
- this.settings.TaskHubName,
- queueMessage.MessageId /* MessageId */,
- string.Empty /* InstanceId */,
- string.Empty /* ExecutionId */,
- this.storageQueue.Name,
- string.Empty /* EventType */,
- 0 /* TaskEventId */,
- e.ToString());
-
- // Abandon the message so we can try it again later.
- // Note: We will fetch the message again from the queue before retrying, so no need to read the receipt
- _ = await this.AbandonMessageAsync(queueMessage);
+ // Deserialization errors can be persistent, so we check if this is a poison message.
+ bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
+ if (isPoisonMessage)
+ {
+ // we have already handled the poison message, so we move on.
+ return;
+ }
+
+ // This is not a poison message (at least not yet), so we abandon it to retry later.
+ await this.AbandonMessageAsync(queueMessage, exception);
return;
}
@@ -192,8 +191,21 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
}
// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
- public Task AbandonMessageAsync(QueueMessage queueMessage)
+ public Task AbandonMessageAsync(QueueMessage queueMessage, Exception exception)
{
+ // We have limited information about the details of the message
+ // since we failed to deserialize it.
+ this.settings.Logger.MessageFailure(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ queueMessage.MessageId /* MessageId */,
+ string.Empty /* InstanceId */,
+ string.Empty /* ExecutionId */,
+ this.storageQueue.Name,
+ string.Empty /* EventType */,
+ 0 /* TaskEventId */,
+ exception.ToString());
+
this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.MessageId, out _);
return base.AbandonMessageAsync(
queueMessage,
diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
index d204268bf..2bb5102b4 100644
--- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
@@ -19,11 +19,13 @@ namespace DurableTask.AzureStorage.Messaging
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+ using Azure.Data.Tables;
using Azure.Storage.Queues.Models;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;
using DurableTask.Core.History;
+
abstract class TaskHubQueue
{
static long messageSequenceNumber;
@@ -150,6 +152,96 @@ await this.storageQueue.AddMessageAsync(
return data;
}
+ public async Task HandleIfPoisonMessageAsync(MessageData messageData)
+ {
+ QueueMessage queueMessage = messageData.OriginalQueueMessage;
+ int maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
+
+ if (queueMessage.DequeueCount > maxThreshold)
+ {
+ // Create the poison message table if it doesn't exist
+ string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
+ Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
+ await poisonMessagesTable.CreateIfNotExistsAsync();
+
+ // provide guidance, which is backend-specific
+ string guidance = $"Queue message ID '{queueMessage.MessageId}' was dequeued {queueMessage.DequeueCount} times," +
+ $" which is greater than the threshold poison message threshold ({maxThreshold}). " +
+ $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " +
+ $"This will fail the consuming orchestrator, activity, or entity";
+ messageData.TaskMessage.Event.PoisonGuidance = guidance;
+
+ // Add the message to the poison message table
+ TableEntity tableEntity = new TableEntity(queueMessage.MessageId, this.Name)
+ {
+ ["RawMessage"] = queueMessage.Body,
+ ["Reason"] = guidance
+ };
+
+ await poisonMessagesTable.InsertEntityAsync(tableEntity);
+
+ // Delete the message from the queue
+ await this.storageQueue.DeleteMessageAsync(queueMessage);
+
+ // Since isPoison is `true`, we'll override the deserialized message
+ messageData.TaskMessage.Event.IsPoison = true;
+
+ this.settings.Logger.PoisonMessageDetected(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ messageData.TaskMessage.Event.EventType.ToString(),
+ messageData.TaskMessage.Event.EventId,
+ messageData.OriginalQueueMessage.MessageId,
+ messageData.TaskMessage.OrchestrationInstance.InstanceId,
+ messageData.TaskMessage.OrchestrationInstance.ExecutionId,
+ this.Name,
+ messageData.OriginalQueueMessage.DequeueCount);
+ }
+ }
+
+ public async Task TryHandlingDeserializationPoisonMessage(QueueMessage queueMessage, Exception deserializationException)
+ {
+ var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
+ bool isPoisonMessage = queueMessage.DequeueCount > maxThreshold;
+
+ if (isPoisonMessage)
+ {
+ isPoisonMessage = true;
+ string guidance = $"Queue message ID '{queueMessage.MessageId}' was dequeued {queueMessage.DequeueCount} times," +
+ $" which is greater than the threshold poison message threshold ({maxThreshold}). " +
+ $"A de-serialization error ocurred: \n {deserializationException}";
+
+ // Create poison message table if it doesn't exist and add the poison message
+ TableEntity tableEntity = new TableEntity(queueMessage.MessageId, this.Name)
+ {
+ ["RawMessage"] = queueMessage.Body,
+ ["Reason"] = guidance
+ };
+
+ string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
+ Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
+ await poisonMessagesTable.CreateIfNotExistsAsync();
+
+ await poisonMessagesTable.InsertEntityAsync(tableEntity);
+
+ // Delete the message from the queue
+ await this.storageQueue.DeleteMessageAsync(queueMessage);
+
+ this.settings.Logger.PoisonMessageDetected(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ string.Empty,
+ 0,
+ string.Empty,
+ string.Empty,
+ string.Empty,
+ this.Name,
+ queueMessage.DequeueCount);
+ }
+ return isPoisonMessage;
+ }
+
+
static string? GetSerializableTraceContext(TaskMessage taskMessage)
{
TraceContextBase traceContext = CorrelationTraceContext.Current;
diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
index 512f761f5..f78e0e97b 100644
--- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
@@ -11,6 +11,7 @@
// limitations under the License.
// ----------------------------------------------------------------------------------
+#nullable enable
namespace DurableTask.AzureStorage.Messaging
{
using System;
@@ -31,13 +32,13 @@ public WorkItemQueue(
protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout;
- public async Task GetMessageAsync(CancellationToken cancellationToken)
+ public async Task GetMessageAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
- QueueMessage queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
+ QueueMessage? queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
if (queueMessage == null)
{
@@ -45,12 +46,26 @@ public async Task GetMessageAsync(CancellationToken cancellationTok
continue;
}
- MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
+ try
+ {
+ MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
- this.backoffHelper.Reset();
- return data;
+ this.backoffHelper.Reset();
+ return data;
+ }
+ catch (Exception exception)
+ {
+ // Deserialization errors can be persistent, so we check if this is a poison message.
+ bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
+ if (isPoisonMessage)
+ {
+ // we have already handled the poison message, so we move on.
+ continue;
+ }
+ }
+
}
catch (Exception e)
{
diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs
index 8e6a8f316..29f799b57 100644
--- a/src/DurableTask.Core/History/HistoryEvent.cs
+++ b/src/DurableTask.Core/History/HistoryEvent.cs
@@ -93,5 +93,16 @@ protected HistoryEvent(int eventId)
/// Implementation for .
///
public ExtensionDataObject? ExtensionData { get; set; }
+
+ ///
+ /// Gets or sets whether this is a poison message.
+ ///
+ public bool IsPoison { get; set; } = false;
+
+ ///
+ /// Gets or sets user-facing details for why a message was labeled as poison.
+ /// This is to be set by each storage provider.
+ ///
+ public string PoisonGuidance { get; set; } = "";
}
}
\ No newline at end of file
diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs
index c9e402401..7d642965a 100644
--- a/src/DurableTask.Core/TaskActivityDispatcher.cs
+++ b/src/DurableTask.Core/TaskActivityDispatcher.cs
@@ -23,6 +23,7 @@ namespace DurableTask.Core
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Core.Tracing;
+ using DurableTask.Core.Serializing;
///
/// Dispatcher for task activities to handle processing and renewing of work items
@@ -190,6 +191,20 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
try
{
+ if (scheduledEvent.IsPoison)
+ {
+ // If the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity
+ // by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance
+ // on how to deal with the poison message.
+ // We need to account for all possible deserialization modes, so we construct an exception valid in all modes.
+ // TODO: revise - this is clunky
+ var exception = new Exception(scheduledEvent.PoisonGuidance);
+ var failureDetails = new FailureDetails(exception);
+ var details = Utils.SerializeCause(exception, JsonDataConverter.Default);
+ var taskFailure = new TaskFailureException(details, exception, details).WithFailureDetails(failureDetails);
+ throw taskFailure;
+ }
+
string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs
index 81ea37778..12da6b6f3 100644
--- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs
+++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs
@@ -197,6 +197,22 @@ void ProcessEvents(IEnumerable events)
void ProcessEvent(HistoryEvent historyEvent)
{
+ if (historyEvent.IsPoison)
+ {
+ // If the message is labeled as "poison", then we should avoid processing it again.
+ // Therefore, we replace the event "in place" with an "ExecutionTerminatedEvent", so the
+ // orchestrator stops immediately.
+
+ var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance);
+ historyEvent = terminationEvent;
+
+ // since replay is not guaranteed, we need to populate `this.result`
+ // with a completed task
+ var taskCompletionSource = new TaskCompletionSource();
+ taskCompletionSource.SetResult("");
+ this.result = taskCompletionSource.Task;
+ }
+
bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{