From 47a0406f612986307970cf7d783eadd3faa6e595 Mon Sep 17 00:00:00 2001 From: Marco Becker Date: Thu, 5 Jan 2023 08:28:55 +0100 Subject: [PATCH] Add functionality to browse queues --- .../AutoRecoveringConnection.cs | 11 +- .../AutoRecovering/AutoRecoveringConsumer.cs | 6 +- src/ArtemisNetClient/Browser.cs | 81 +++++ .../Builders/ConsumerBuilder.cs | 10 +- src/ArtemisNetClient/Connection.cs | 14 +- src/ArtemisNetClient/IBrowser.cs | 9 + src/ArtemisNetClient/IConnection.cs | 5 +- .../MessageBrowseSpec.cs | 144 ++++++++ .../CreateBrowserSpec.cs | 339 ++++++++++++++++++ 9 files changed, 610 insertions(+), 9 deletions(-) create mode 100644 src/ArtemisNetClient/Browser.cs create mode 100644 src/ArtemisNetClient/IBrowser.cs create mode 100644 test/ArtemisNetClient.IntegrationTests/MessageBrowseSpec.cs create mode 100644 test/ArtemisNetClient.UnitTests/CreateBrowserSpec.cs diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs index 5072b901..afd34c8a 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs @@ -197,9 +197,9 @@ public async Task CreateTopologyManagerAsync(CancellationToken return new TopologyManager(configuration.Address, rpcClient); } - public async Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken) + public async Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false) { - var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration); + var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration, isBrowser); await PrepareRecoverable(autoRecoveringConsumer, cancellationToken).ConfigureAwait(false); return autoRecoveringConsumer; } @@ -225,6 +225,13 @@ public async Task CreateRequestReplyClientAsync(RequestRepl return autoRecoveringRpcClient; } + public async Task CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken) + { + var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration, isBrowser: true); + await PrepareRecoverable(autoRecoveringConsumer, cancellationToken).ConfigureAwait(false); + return new Browser(autoRecoveringConsumer); + } + public event EventHandler ConnectionClosed; public event EventHandler ConnectionRecovered; public event EventHandler ConnectionRecoveryError; diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs index d4b76365..e1b77b63 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs @@ -14,14 +14,16 @@ internal class AutoRecoveringConsumer : IConsumer, IRecoverable private readonly ILogger _logger; private readonly ConsumerConfiguration _configuration; private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true); + private readonly bool _isBrowser; private bool _closed; private volatile Exception _failureCause; private volatile IConsumer _consumer; - public AutoRecoveringConsumer(ILoggerFactory loggerFactory, ConsumerConfiguration configuration) + public AutoRecoveringConsumer(ILoggerFactory loggerFactory, ConsumerConfiguration configuration, bool isBrowser = false) { _logger = loggerFactory.CreateLogger(); _configuration = configuration; + _isBrowser = isBrowser; } public async ValueTask ReceiveAsync(CancellationToken cancellationToken = default) @@ -115,7 +117,7 @@ private bool IsSuspended() public async Task RecoverAsync(IConnection connection, CancellationToken cancellationToken) { var oldConsumer = _consumer; - _consumer = await connection.CreateConsumerAsync(_configuration, cancellationToken).ConfigureAwait(false); + _consumer = await connection.CreateConsumerAsync(_configuration, cancellationToken, isBrowser: _isBrowser).ConfigureAwait(false); await DisposeUnderlyingConsumerSafe(oldConsumer).ConfigureAwait(false); Log.ConsumerRecovered(_logger); } diff --git a/src/ArtemisNetClient/Browser.cs b/src/ArtemisNetClient/Browser.cs new file mode 100644 index 00000000..9f2ec6ec --- /dev/null +++ b/src/ArtemisNetClient/Browser.cs @@ -0,0 +1,81 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ActiveMQ.Artemis.Client +{ + internal class Browser : IBrowser + { + + private IConsumer _consumer; + private Message _current; + + public Browser(IConsumer consumer) + { + _consumer = consumer; + } + + public Message Current => _current; + + object IEnumerator.Current => _current; + + public void Dispose() + { + throw new NotImplementedException(); + } + + public async ValueTask DisposeAsync() + { + if (_consumer != null) + await _consumer.DisposeAsync(); + } + + + public IEnumerator GetEnumerator() + { + return this; + } + + public bool MoveNext() + { + _current = Next(); + + return _current != null; + } + + private Message Next() + { + var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + var token = cts.Token; + + try + { + _current = null; + _current = _consumer.ReceiveAsync(token).Result; + + return _current; + } + catch(OperationCanceledException ex) + { + var error = ex.Message; + return null; + } + catch(Exception) + { + throw; + } + } + + public void Reset() + { + throw new NotImplementedException(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return this; + } + } +} \ No newline at end of file diff --git a/src/ArtemisNetClient/Builders/ConsumerBuilder.cs b/src/ArtemisNetClient/Builders/ConsumerBuilder.cs index 9fc84f83..c9a3983f 100644 --- a/src/ArtemisNetClient/Builders/ConsumerBuilder.cs +++ b/src/ArtemisNetClient/Builders/ConsumerBuilder.cs @@ -12,6 +12,7 @@ namespace ActiveMQ.Artemis.Client.Builders { internal class ConsumerBuilder { + private readonly Symbol ATTACH_DISTRIBUTION_MODE_COPY = new Symbol("copy"); private readonly ILoggerFactory _loggerFactory; private readonly TransactionsManager _transactionsManager; private readonly Session _session; @@ -25,7 +26,7 @@ public ConsumerBuilder(ILoggerFactory loggerFactory, TransactionsManager transac _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } - public async Task CreateAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken) + public async Task CreateAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false) { CheckConfiguration(configuration); cancellationToken.ThrowIfCancellationRequested(); @@ -39,6 +40,13 @@ public async Task CreateAsync(ConsumerConfiguration configuration, Ca Durable = GetTerminusDurability(configuration) }; + if (isBrowser) + { + // set distribution mode to copy + // http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#doc-idp328592 + source.DistributionMode = ATTACH_DISTRIBUTION_MODE_COPY; + } + var receiverLink = new ReceiverLink(_session, GetReceiverName(configuration), source, OnAttached); receiverLink.AddClosedCallback(OnClosed); await _tcs.Task.ConfigureAwait(false); diff --git a/src/ArtemisNetClient/Connection.cs b/src/ArtemisNetClient/Connection.cs index 16b72e58..8ddeb91b 100644 --- a/src/ArtemisNetClient/Connection.cs +++ b/src/ArtemisNetClient/Connection.cs @@ -56,13 +56,13 @@ public async Task CreateTopologyManagerAsync(CancellationToken } } - public async Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken) + public async Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false) { CheckState(); var session = await CreateSession(cancellationToken).ConfigureAwait(false); var consumerBuilder = new ConsumerBuilder(_loggerFactory, _transactionsManager, session); - return await consumerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false); + return await consumerBuilder.CreateAsync(configuration, cancellationToken, isBrowser).ConfigureAwait(false); } public async Task CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken) @@ -92,6 +92,16 @@ public async Task CreateRequestReplyClientAsync(RequestRepl return await requestReplyClientBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false); } + public async Task CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken) + { + CheckState(); + + var session = await CreateSession(cancellationToken).ConfigureAwait(false); + var consumerBuilder = new ConsumerBuilder(_loggerFactory, _transactionsManager, session); + var consumer = await consumerBuilder.CreateAsync(configuration, cancellationToken, isBrowser: true).ConfigureAwait(false); + return new Browser(consumer); + } + internal async Task CreateTransactionCoordinator(CancellationToken cancellationToken) { var session = await CreateSession(cancellationToken).ConfigureAwait(false); diff --git a/src/ArtemisNetClient/IBrowser.cs b/src/ArtemisNetClient/IBrowser.cs new file mode 100644 index 00000000..da4322ea --- /dev/null +++ b/src/ArtemisNetClient/IBrowser.cs @@ -0,0 +1,9 @@ +using System; +using System.Collections.Generic; + +namespace ActiveMQ.Artemis.Client +{ + public interface IBrowser : IEnumerator, IEnumerable, IAsyncDisposable + { + } +} \ No newline at end of file diff --git a/src/ArtemisNetClient/IConnection.cs b/src/ArtemisNetClient/IConnection.cs index c680e360..1afacc19 100644 --- a/src/ArtemisNetClient/IConnection.cs +++ b/src/ArtemisNetClient/IConnection.cs @@ -25,11 +25,12 @@ public interface IConnection : IAsyncDisposable /// bool IsOpened { get; } Task CreateTopologyManagerAsync(CancellationToken cancellationToken = default); - Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default); + Task CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default, bool isBrowser = false); Task CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken = default); Task CreateAnonymousProducerAsync(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default); Task CreateRequestReplyClientAsync(RequestReplyClientConfiguration configuration, CancellationToken cancellationToken = default); - + Task CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default); + /// /// Raised when the connection is closed. /// diff --git a/test/ArtemisNetClient.IntegrationTests/MessageBrowseSpec.cs b/test/ArtemisNetClient.IntegrationTests/MessageBrowseSpec.cs new file mode 100644 index 00000000..bf61c161 --- /dev/null +++ b/test/ArtemisNetClient.IntegrationTests/MessageBrowseSpec.cs @@ -0,0 +1,144 @@ +using System; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Artemis.Client.IntegrationTests +{ + public class MessageBrowseSpec : ActiveMQNetIntegrationSpec + { + public MessageBrowseSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Should_find_message_by_browsing_for_id_property_with_existing_id_and_single_message() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + + var messageId = Guid.NewGuid().ToString(); + var message = new Message("foo"); + message.ApplicationProperties["id"] = messageId; + + await producer.SendAsync(message); + + var configuration = new ConsumerConfiguration + { + Address = address, + RoutingType = RoutingType.Anycast, + FilterExpression = $"id = '{messageId}'", + }; + + await using var browser = await connection.CreateBrowserAsync(configuration); + + + Message existingMessage = null; + if (browser.MoveNext()) + { + existingMessage = browser.Current; + } + + Assert.Equal("foo", existingMessage.GetBody()); + } + + [Fact] + public async Task Should_find_message_by_browsing_for_id_property_with_existing_id_and_multiple_messages() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + + var messageId = Guid.NewGuid().ToString(); + var message = new Message("foo"); + message.ApplicationProperties["id"] = messageId; + + await producer.SendAsync(new Message(Guid.NewGuid().ToString())); + await producer.SendAsync(new Message(Guid.NewGuid().ToString())); + + await producer.SendAsync(message); + + await producer.SendAsync(new Message(Guid.NewGuid().ToString())); + await producer.SendAsync(new Message(Guid.NewGuid().ToString())); + + var configuration = new ConsumerConfiguration + { + Address = address, + RoutingType = RoutingType.Anycast, + FilterExpression = $"id = '{messageId}'", + }; + + await using var browser = await connection.CreateBrowserAsync(configuration); + + + Message existingMessage = null; + if (browser.MoveNext()) + { + existingMessage = browser.Current; + } + + Assert.Equal("foo", existingMessage.GetBody()); + } + + [Fact] + public async Task Should_not_find_message_by_browsing_for_id_property_with_inexisting_id() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + + var message = new Message("foo"); + message.ApplicationProperties["id"] = Guid.NewGuid().ToString(); + + await producer.SendAsync(message); + + var configuration = new ConsumerConfiguration + { + Address = address, + RoutingType = RoutingType.Anycast, + FilterExpression = $"id = '{Guid.NewGuid()}'", // inexistent id + }; + + await using var browser = await connection.CreateBrowserAsync(configuration); + + Message maybeMessage = null; + if (browser.MoveNext()) + { + maybeMessage = browser.Current; + } + + Assert.Null(maybeMessage); + } + + [Fact] + public async Task Should_not_find_message_by_browsing_for_id_property_with_empty_id() + { + await using var connection = await CreateConnection(); + var address = Guid.NewGuid().ToString(); + await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast); + + var message = new Message("foo"); + message.ApplicationProperties["id"] = Guid.NewGuid().ToString(); + + await producer.SendAsync(message); + + var configuration = new ConsumerConfiguration + { + Address = address, + RoutingType = RoutingType.Anycast, + FilterExpression = $"id = ''", // empty id + }; + + await using var browser = await connection.CreateBrowserAsync(configuration); + + Message maybeMessage = null; + if (browser.MoveNext()) + { + maybeMessage = browser.Current; + } + + Assert.Null(maybeMessage); + } + } +} \ No newline at end of file diff --git a/test/ArtemisNetClient.UnitTests/CreateBrowserSpec.cs b/test/ArtemisNetClient.UnitTests/CreateBrowserSpec.cs new file mode 100644 index 00000000..b0fe16d6 --- /dev/null +++ b/test/ArtemisNetClient.UnitTests/CreateBrowserSpec.cs @@ -0,0 +1,339 @@ +using ActiveMQ.Artemis.Client.Exceptions; +using ActiveMQ.Artemis.Client.UnitTests.Utils; +using Amqp; +using Amqp.Framing; +using Amqp.Handler; +using Amqp.Types; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace ActiveMQ.Artemis.Client.UnitTests +{ + public class CreateBrowserSpec : ActiveMQNetSpec + { + public CreateBrowserSpec(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task Should_be_created_and_closed() + { + var endpoint = GetUniqueEndpoint(); + var consumerAttached = new ManualResetEvent(false); + var consumerClosed = new ManualResetEvent(false); + + var testHandler = new TestHandler(@event => + { + switch (@event.Id) + { + case EventId.LinkRemoteOpen when @event.Context is Attach attach && !attach.Role: + consumerAttached.Set(); + break; + case EventId.LinkRemoteClose when @event.Context is Detach detach && detach.Closed: + consumerClosed.Set(); + break; + } + }); + + using var host = CreateOpenedContainerHost(endpoint, testHandler); + await using var connection = await CreateConnection(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = "test-consumer", + RoutingType = RoutingType.Anycast + }; + + var browser = await connection.CreateBrowserAsync(configuration); + + await browser.DisposeAsync(); + + Assert.True(consumerAttached.WaitOne(Timeout)); + Assert.True(consumerClosed.WaitOne(Timeout)); + } + + [Fact] + public async Task Should_attach_to_specified_address() + { + var endpoint = GetUniqueEndpoint(); + var consumerAttached = new ManualResetEvent(false); + Attach attachFrame = null; + + var testHandler = new TestHandler(@event => + { + switch (@event.Id) + { + case EventId.LinkRemoteOpen when @event.Context is Attach attach: + attachFrame = attach; + consumerAttached.Set(); + break; + } + }); + + using var host = CreateOpenedContainerHost(endpoint, testHandler); + + await using var connection = await CreateConnection(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = "test-consumer", + RoutingType = RoutingType.Anycast + }; + + await connection.CreateBrowserAsync(configuration); + + Assert.True(consumerAttached.WaitOne(Timeout)); + Assert.IsType(attachFrame.Source); + Assert.Equal("test-consumer", ((Source)attachFrame.Source).Address); + } + + [Theory, MemberData(nameof(RoutingTypesData))] + public async Task Should_attach_to_address_with_specified_RoutingType(RoutingType routingType, Symbol routingCapability) + { + var endpoint = GetUniqueEndpoint(); + var consumerAttached = new ManualResetEvent(false); + Attach attachFrame = null; + + var testHandler = new TestHandler(@event => + { + switch (@event.Id) + { + case EventId.LinkRemoteOpen when @event.Context is Attach attach: + attachFrame = attach; + consumerAttached.Set(); + break; + } + }); + + using var host = CreateOpenedContainerHost(endpoint, testHandler); + + await using var connection = await CreateConnection(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = "test-consumer", + RoutingType = routingType + }; + + await connection.CreateBrowserAsync(configuration); + + Assert.True(consumerAttached.WaitOne(Timeout)); + Assert.NotNull(attachFrame); + Assert.IsType(attachFrame.Source); + Assert.Contains(((Source)attachFrame.Source).Capabilities, routingCapability.Equals); + } + + public static IEnumerable RoutingTypesData() + { + return new[] + { + new object[] { RoutingType.Anycast, Capabilities.Anycast }, + new object[] { RoutingType.Multicast, Capabilities.Multicast } + }; + } + + [Fact] + public async Task Should_connect_to_a_custom_queue_on_specified_address_without_routing_type() + { + var endpoint = GetUniqueEndpoint(); + var consumerAttached = new ManualResetEvent(false); + Attach attachFrame = null; + + var testHandler = new TestHandler(@event => + { + switch (@event.Id) + { + case EventId.LinkRemoteOpen when @event.Context is Attach attach: + attachFrame = attach; + consumerAttached.Set(); + break; + } + }); + + using var host = CreateOpenedContainerHost(endpoint, testHandler); + + await using var connection = await CreateConnection(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = "test-consumer", + Queue = "q1" + }; + + await using var browser = await connection.CreateBrowserAsync(configuration); + + Assert.True(consumerAttached.WaitOne(Timeout)); + Assert.NotNull(attachFrame); + Assert.IsType(attachFrame.Source); + var sourceFrame = (Source)attachFrame.Source; + Assert.Equal("test-consumer::q1", sourceFrame.Address); + Assert.Null(sourceFrame.Capabilities); + } + + [Fact] + public async Task Should_throw_exception_when_selected_queue_doesnt_exist() + { + var endpoint = GetUniqueEndpoint(); + + var testHandler = new TestHandler(@event => + { + switch (@event.Id) + { + case EventId.LinkLocalOpen when @event.Context is Attach attach: + attach.Source = null; + Task.Run(async () => + { + await Task.Delay(TimeSpan.FromMilliseconds(5)); + await @event.Link.CloseAsync(System.Threading.Timeout.InfiniteTimeSpan, new Error(ErrorCode.NotFound) + { + Description = "Queue: 'q1' does not exist" + }); + }); + break; + } + }); + + using var host = CreateOpenedContainerHost(endpoint, testHandler); + host.Open(); + + await using var connection = await CreateConnection(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = "a1", + Queue = "q1" + }; + + var exception = await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(configuration)); + Assert.Contains("Queue: 'q1' does not exist", exception.Message); + Assert.Equal(ErrorCode.NotFound, exception.ErrorCode); + } + + [Fact] + public async Task Should_cancel_CreateBrowserAsync_when_attach_frame_not_received_on_time() + { + var endpoint = GetUniqueEndpoint(); + + using var host = CreateContainerHostThatWillNeverSendAttachFrameBack(endpoint); + + await using var connection = await CreateConnection(endpoint); + var cancellationTokenSource = new CancellationTokenSource(ShortTimeout); + + var configuration = new ConsumerConfiguration + { + Address = "a1", + Queue = "q1", + }; + + await Assert.ThrowsAnyAsync(async () => await connection.CreateBrowserAsync(configuration, cancellationTokenSource.Token)); + } + + [Fact] + public async Task Throws_when_created_with_null_configuration() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + + await using var connection = await CreateConnection(endpoint); + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(null)); + } + + [Fact] + public async Task Throws_when_created_with_null_address() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = null + }; + + await using var connection = await CreateConnection(endpoint); + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(configuration)); + } + + [Fact] + public async Task Throws_when_created_with_empty_address() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + + var configuration = new ConsumerConfiguration + { + Address = string.Empty + }; + + await using var connection = await CreateConnection(endpoint); + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(configuration)); + } + + [Theory] + [InlineData(-1)] + [InlineData(0)] + public async Task Throws_when_created_with_credit_less_than_1(int credit) + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(new ConsumerConfiguration + { + Address = "a1", + RoutingType = RoutingType.Multicast, + Credit = credit + })); + } + + [Theory] + [InlineData(1)] + [InlineData(100)] + public async Task Should_create_Consumer_when_credit_is_greater_or_equal_to_1(int credit) + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await using var consumer = await connection.CreateBrowserAsync(new ConsumerConfiguration + { + Address = "a1", + RoutingType = RoutingType.Multicast, + Credit = credit + }); + } + + [Theory] + [InlineData(RoutingType.Anycast)] + [InlineData(RoutingType.Multicast)] + public async Task Throws_when_created_with_queue_name_and_routing_type(RoutingType routingType) + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(new ConsumerConfiguration + { + Address = "a1", + Queue = "q1", + RoutingType = routingType, + })); + } + + [Fact] + public async Task Throws_when_queue_name_not_provided() + { + var endpoint = GetUniqueEndpoint(); + using var host = CreateOpenedContainerHost(endpoint); + await using var connection = await CreateConnection(endpoint); + + await Assert.ThrowsAsync(() => connection.CreateBrowserAsync(new ConsumerConfiguration + { + Address = "a1", + })); + } + } +} \ No newline at end of file