Skip to content

Commit 0f8c77e

Browse files
Merge pull request #6924 from Particular/1005_delayed_retries
make it clear that the user needs to deal with NServiceBus messages in addition to native messages
2 parents 6e692bd + 8a6c0de commit 0f8c77e

11 files changed

Lines changed: 89 additions & 64 deletions

File tree

Snippets/ASQ/ASQN_12/CustomEnvelopeUnwrapper.cs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,22 @@ class CustomEnvelopeUnwrapper
1616
{
1717
MessageUnwrapper = queueMessage =>
1818
{
19-
using (var stream = new MemoryStream(Convert.FromBase64String(queueMessage.MessageText)))
20-
using (var streamReader = new StreamReader(stream))
21-
using (var textReader = new JsonTextReader(streamReader))
19+
//Determine whether the message is one expected by the standard program flow.
20+
// All other messages should be forwarded to the framework by returning null.
21+
//NOTE: More complex methods may be needed in some scenarios to determine
22+
// whether the message is of an expected type, but this check
23+
// should be kept as lightweight as possible. Deserialization of the message
24+
// body happens later in the pipeline.
25+
return queueMessage.MessageText.Contains("MyMessageIdFieldName") &&
26+
queueMessage.MessageText.Contains("MyMessageCustomPropertyFieldName")
27+
//this was a native message just return the body as is with no headers
28+
? new MessageWrapper
2229
{
23-
//try deserialize to a NServiceBus envelope first
24-
var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);
25-
26-
if (wrapper.Id != null)
27-
{
28-
//this was a envelope message
29-
return wrapper;
30-
}
31-
32-
//this was a native message just return the body as is with no headers
33-
return new MessageWrapper
34-
{
35-
Id = queueMessage.MessageId,
36-
Headers = new Dictionary<string, string>(),
37-
Body = Convert.FromBase64String(queueMessage.MessageText)
38-
};
30+
Id = queueMessage.MessageId,
31+
Headers = new Dictionary<string, string>(),
32+
Body = queueMessage.Body.ToArray()
3933
}
34+
: null;
4035
}
4136
};
4237

Snippets/ASQ/ASQN_13/CustomEnvelopeUnwrapper.cs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using System.Buffers.Text;
32
using System.Text.Json;
43
using NServiceBus;
54
using NServiceBus.Azure.Transports.WindowsAzureStorageQueues;
@@ -15,24 +14,22 @@ class CustomEnvelopeUnwrapper
1514
{
1615
MessageUnwrapper = queueMessage =>
1716
{
18-
var messageText = Convert.FromBase64String(queueMessage.MessageText);
19-
20-
//try deserialize to a NServiceBus envelope first
21-
var wrapper = JsonSerializer.Deserialize<MessageWrapper>(messageText);
22-
23-
if (wrapper?.Id != null)
24-
{
25-
//this was a envelope message
26-
return wrapper;
27-
}
28-
17+
//Determine whether the message is one expected by the standard program flow.
18+
// All other messages should be forwarded to the framework by returning null.
19+
//NOTE: More complex methods may be needed in some scenarios to determine
20+
// whether the message is of an expected type, but this check
21+
// should be kept as lightweight as possible. Deserialization of the message
22+
// body happens later in the pipeline.
23+
return queueMessage.MessageText.Contains("MyMessageIdFieldName") &&
24+
queueMessage.MessageText.Contains("MyMessageCustomPropertyFieldName")
2925
//this was a native message just return the body as is with no headers
30-
return new MessageWrapper
26+
? new MessageWrapper
3127
{
3228
Id = queueMessage.MessageId,
33-
Headers = new Dictionary<string, string>(),
34-
Body = messageText
35-
};
29+
Headers = [],
30+
Body = queueMessage.Body.ToArray()
31+
}
32+
: null;
3633
}
3734
};
3835

samples/azure/native-integration-asq/ASQN_12/NativeSender/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ static async Task Main()
2727

2828
var nativeMessage = new NativeMessage
2929
{
30+
NativeMessageId = Guid.NewGuid(),
3031
Content = $"Hello from native sender @ {DateTimeOffset.Now}"
3132
};
3233

samples/azure/native-integration-asq/ASQN_12/Receiver/Program.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,19 @@ static async Task Main()
2121

2222
#region Native-message-mapping
2323

24-
transport.MessageUnwrapper = message => new MessageWrapper
25-
{
26-
Id = message.MessageId,
27-
Body = message.Body.ToArray(),
28-
Headers = new Dictionary<string, string>
24+
transport.MessageUnwrapper = message =>
25+
message.MessageText.Contains("NativeMessageId") &&
26+
message.MessageText.Contains("Content")
27+
? new MessageWrapper
2928
{
30-
{ Headers.EnclosedMessageTypes, typeof(NativeMessage).FullName }
29+
Id = message.MessageId,
30+
Body = message.Body.ToArray(),
31+
Headers = new Dictionary<string, string>
32+
{
33+
{ Headers.EnclosedMessageTypes, typeof(NativeMessage).FullName }
34+
}
3135
}
32-
};
36+
: null; // not a raw native message - allow the framework to deal with it
3337

3438
#endregion
3539

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System;
12
using NServiceBus;
23

34
public class NativeMessage : IMessage
45
{
6+
public Guid NativeMessageId { get; set; }
57
public string Content { get; set; }
68
}

samples/azure/native-integration-asq/ASQN_13/NativeSender/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
var nativeMessage = new NativeMessage
2525
{
26+
NativeMessageId = Guid.NewGuid(),
2627
Content = $"Hello from native sender @ {DateTimeOffset.Now}"
2728
};
2829

samples/azure/native-integration-asq/ASQN_13/Receiver/Program.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers.Text;
23
using System.Collections.Generic;
34

45
using NServiceBus;
@@ -18,15 +19,19 @@
1819

1920
#region Native-message-mapping
2021

21-
transport.MessageUnwrapper = message => new MessageWrapper
22-
{
23-
Id = message.MessageId,
24-
Body = message.Body.ToArray(),
25-
Headers = new Dictionary<string, string>
26-
{
27-
{ Headers.EnclosedMessageTypes, typeof(NativeMessage).FullName }
28-
}
29-
};
22+
transport.MessageUnwrapper = message =>
23+
message.MessageText.Contains("NativeMessageId") &&
24+
message.MessageText.Contains("Content")
25+
? new MessageWrapper
26+
{
27+
Id = message.MessageId,
28+
Body = message.Body.ToArray(),
29+
Headers = new Dictionary<string, string>
30+
{
31+
{ Headers.EnclosedMessageTypes, typeof(NativeMessage).FullName }
32+
}
33+
}
34+
: null; // not a raw native message - allow the framework to deal with it
3035

3136
#endregion
3237

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System;
12
using NServiceBus;
23

34
public class NativeMessage : IMessage
45
{
6+
public Guid NativeMessageId { get; set; }
57
public string Content { get; set; }
68
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
### Custom envelope unwrapper
2+
3+
Azure Storage Queues lacks native header support. NServiceBus solves this by wrapping headers and message body in a custom envelope structure. This envelope is serialized using the configured [serializer](/nservicebus/serialization) for the endpoint before being sent.
4+
5+
Creating this envelope can cause unnecessary complexity if headers are not needed, as is the case in native integration scenarios. For this reason, NServiceBus.Transport.AzureStorageQueues 9.0 and above support configuring a custom envelope unwrapper.
6+
7+
> [!WARNING]
8+
> In this scenario, NServiceBus may place messages in your queue in addition to the native messages that are expected, for example if a message results in a [delayed retry](delayed-delivery.md). Any custom envelope unwrapper must verify if the incoming message is capable of being deserialized as a native message, and the resulting body must be serialized according to the configured endpoint serializer. If either of these conditions cannot be met then `MessageUnwrapper` should return `null` to allow the default unwrapper to handle the message.
9+
10+
The snippet below shows custom unwrapping logic that enables both NServiceBus formatted and plain serialized messages to be consumed.
11+
12+
snippet: CustomEnvelopeUnwrapper
13+
14+
> [!NOTE]
15+
> This feature is currently NOT compatible with ServiceControl. A [ServiceControl transport adapter](/servicecontrol/transport-adapter.md) is required to leverage both.

transports/azure-storage-queues/native-integration_sending_asqn_[9,).partial.md

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)