From 0379dbb4f394d2588c805a1f5dbec5b771164efc Mon Sep 17 00:00:00 2001 From: Havret Date: Sun, 16 Aug 2020 20:28:29 +0200 Subject: [PATCH] Support for FirstAcquirer field --- src/ArtemisNetClient/Message/Header.cs | 2 + src/ArtemisNetClient/Message/Message.cs | 6 +++ .../MessageFirstAcquirerSpec.cs | 45 +++++++++++++++++++ 3 files changed, 53 insertions(+) create mode 100644 test/ArtemisNetClient.IntegrationTests/MessageFirstAcquirerSpec.cs diff --git a/src/ArtemisNetClient/Message/Header.cs b/src/ArtemisNetClient/Message/Header.cs index 2929d3b6..245df61b 100644 --- a/src/ArtemisNetClient/Message/Header.cs +++ b/src/ArtemisNetClient/Message/Header.cs @@ -46,5 +46,7 @@ public uint? Ttl } public uint DeliveryCount => _innerHeader.HasField(4) ? _innerHeader.DeliveryCount : default; + + public bool FirstAcquirer => _innerHeader.FirstAcquirer; } } \ No newline at end of file diff --git a/src/ArtemisNetClient/Message/Message.cs b/src/ArtemisNetClient/Message/Message.cs index 6bc2e35b..ad34b405 100644 --- a/src/ArtemisNetClient/Message/Message.cs +++ b/src/ArtemisNetClient/Message/Message.cs @@ -150,6 +150,12 @@ public byte[] UserId /// public uint DeliveryCount => Header.DeliveryCount; + /// + /// If this value is true, then this message has not been acquired by any other link. + /// If this value is false, then this message MAY have previously been acquired by another link or links. + /// + public bool FirstAcquirer => Header.FirstAcquirer; + public DurabilityMode? DurabilityMode { get => Header.Durable switch diff --git a/test/ArtemisNetClient.IntegrationTests/MessageFirstAcquirerSpec.cs b/test/ArtemisNetClient.IntegrationTests/MessageFirstAcquirerSpec.cs new file mode 100644 index 00000000..7c03f066 --- /dev/null +++ b/test/ArtemisNetClient.IntegrationTests/MessageFirstAcquirerSpec.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Artemis.Client.IntegrationTests +{ + public class MessageFirstAcquirerSpec : ActiveMQNetIntegrationSpec + { + public MessageFirstAcquirerSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Should_receive_msg_with_FirstAcquirer_set_when_msg_was_received_for_the_first_time() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + await using var consumer1 = await connection.CreateConsumerAsync(address, RoutingType.Anycast); + await using var consumer2 = await connection.CreateConsumerAsync(address, RoutingType.Anycast); + + await producer.SendAsync(new Message("foo")); + var msg = await consumer1.ReceiveAsync(CancellationToken); + Assert.True(msg.FirstAcquirer); + } + + [Fact] + public async Task Should_receive_msg_with_FirstAcquirer_set_to_false_when_msg_was_redelivered() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + await using var consumer1 = await connection.CreateConsumerAsync(address, RoutingType.Anycast); + await using var consumer2 = await connection.CreateConsumerAsync(address, RoutingType.Anycast); + + await producer.SendAsync(new Message("foo")); + var msg = await consumer1.ReceiveAsync(CancellationToken); + consumer1.Reject(msg); + + var msg2 = await consumer2.ReceiveAsync(CancellationToken); + Assert.False(msg2.FirstAcquirer); + } + } +} \ No newline at end of file