Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ public async Task<ITopologyManager> CreateTopologyManagerAsync(CancellationToken
return new TopologyManager(configuration.Address, rpcClient);
}

public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken)
public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false)
{
var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration);
var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration, isBrowser);
await PrepareRecoverable(autoRecoveringConsumer, cancellationToken).ConfigureAwait(false);
return autoRecoveringConsumer;
}
Expand All @@ -225,6 +225,13 @@ public async Task<IRequestReplyClient> CreateRequestReplyClientAsync(RequestRepl
return autoRecoveringRpcClient;
}

public async Task<IBrowser> CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken)
{
var autoRecoveringConsumer = new AutoRecoveringConsumer(_loggerFactory, configuration, isBrowser: true);
await PrepareRecoverable(autoRecoveringConsumer, cancellationToken).ConfigureAwait(false);
return new Browser(autoRecoveringConsumer);
}

public event EventHandler<ConnectionClosedEventArgs> ConnectionClosed;
public event EventHandler<ConnectionRecoveredEventArgs> ConnectionRecovered;
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
Expand Down
6 changes: 4 additions & 2 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ internal class AutoRecoveringConsumer : IConsumer, IRecoverable
private readonly ILogger<AutoRecoveringConsumer> _logger;
private readonly ConsumerConfiguration _configuration;
private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true);
private readonly bool _isBrowser;
private bool _closed;
private volatile Exception _failureCause;
private volatile IConsumer _consumer;

public AutoRecoveringConsumer(ILoggerFactory loggerFactory, ConsumerConfiguration configuration)
public AutoRecoveringConsumer(ILoggerFactory loggerFactory, ConsumerConfiguration configuration, bool isBrowser = false)
{
_logger = loggerFactory.CreateLogger<AutoRecoveringConsumer>();
_configuration = configuration;
_isBrowser = isBrowser;
}

public async ValueTask<Message> ReceiveAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -115,7 +117,7 @@ private bool IsSuspended()
public async Task RecoverAsync(IConnection connection, CancellationToken cancellationToken)
{
var oldConsumer = _consumer;
_consumer = await connection.CreateConsumerAsync(_configuration, cancellationToken).ConfigureAwait(false);
_consumer = await connection.CreateConsumerAsync(_configuration, cancellationToken, isBrowser: _isBrowser).ConfigureAwait(false);
await DisposeUnderlyingConsumerSafe(oldConsumer).ConfigureAwait(false);
Log.ConsumerRecovered(_logger);
}
Expand Down
81 changes: 81 additions & 0 deletions src/ArtemisNetClient/Browser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ActiveMQ.Artemis.Client
{
internal class Browser : IBrowser
{

private IConsumer _consumer;
private Message _current;

public Browser(IConsumer consumer)
{
_consumer = consumer;
}

public Message Current => _current;

object IEnumerator.Current => _current;

public void Dispose()
{
throw new NotImplementedException();
}

public async ValueTask DisposeAsync()
{
if (_consumer != null)
await _consumer.DisposeAsync();
}


public IEnumerator<Message> GetEnumerator()
{
return this;
}

public bool MoveNext()
{
_current = Next();

return _current != null;
}

private Message Next()
{
var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
var token = cts.Token;

try
{
_current = null;
_current = _consumer.ReceiveAsync(token).Result;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want blocking on async call.


return _current;
}
catch(OperationCanceledException ex)
{
var error = ex.Message;
return null;
}
catch(Exception)
{
throw;
}
}

public void Reset()
{
throw new NotImplementedException();
}

IEnumerator IEnumerable.GetEnumerator()
{
return this;
}
}
}
10 changes: 9 additions & 1 deletion src/ArtemisNetClient/Builders/ConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace ActiveMQ.Artemis.Client.Builders
{
internal class ConsumerBuilder
{
private readonly Symbol ATTACH_DISTRIBUTION_MODE_COPY = new Symbol("copy");
private readonly ILoggerFactory _loggerFactory;
private readonly TransactionsManager _transactionsManager;
private readonly Session _session;
Expand All @@ -25,7 +26,7 @@ public ConsumerBuilder(ILoggerFactory loggerFactory, TransactionsManager transac
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public async Task<IConsumer> CreateAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken)
public async Task<IConsumer> CreateAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false)
{
CheckConfiguration(configuration);
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -39,6 +40,13 @@ public async Task<IConsumer> CreateAsync(ConsumerConfiguration configuration, Ca
Durable = GetTerminusDurability(configuration)
};

if (isBrowser)
{
// set distribution mode to copy
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#doc-idp328592
source.DistributionMode = ATTACH_DISTRIBUTION_MODE_COPY;
}

var receiverLink = new ReceiverLink(_session, GetReceiverName(configuration), source, OnAttached);
receiverLink.AddClosedCallback(OnClosed);
await _tcs.Task.ConfigureAwait(false);
Expand Down
14 changes: 12 additions & 2 deletions src/ArtemisNetClient/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public async Task<ITopologyManager> CreateTopologyManagerAsync(CancellationToken
}
}

public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken)
public async Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken, bool isBrowser = false)
{
CheckState();

var session = await CreateSession(cancellationToken).ConfigureAwait(false);
var consumerBuilder = new ConsumerBuilder(_loggerFactory, _transactionsManager, session);
return await consumerBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false);
return await consumerBuilder.CreateAsync(configuration, cancellationToken, isBrowser).ConfigureAwait(false);
}

public async Task<IProducer> CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken)
Expand Down Expand Up @@ -92,6 +92,16 @@ public async Task<IRequestReplyClient> CreateRequestReplyClientAsync(RequestRepl
return await requestReplyClientBuilder.CreateAsync(configuration, cancellationToken).ConfigureAwait(false);
}

public async Task<IBrowser> CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken)
{
CheckState();

var session = await CreateSession(cancellationToken).ConfigureAwait(false);
var consumerBuilder = new ConsumerBuilder(_loggerFactory, _transactionsManager, session);
var consumer = await consumerBuilder.CreateAsync(configuration, cancellationToken, isBrowser: true).ConfigureAwait(false);
return new Browser(consumer);
}

internal async Task<TransactionCoordinator> CreateTransactionCoordinator(CancellationToken cancellationToken)
{
var session = await CreateSession(cancellationToken).ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions src/ArtemisNetClient/IBrowser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;
using System.Collections.Generic;

namespace ActiveMQ.Artemis.Client
{
public interface IBrowser : IEnumerator<Message>, IEnumerable<Message>, IAsyncDisposable
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather expect this interface to expose a method that returns IAsyncEnumerable:

IAsyncEnumerable<Message> ReceiveAllAsync(CancellationToken cancellationToken);

Than implement IEnumerable itself.

As a side note, you cannot implement IEnumerable without blocking.

A naive implementation might look as follows:

public async IAsyncEnumerable<Message> ReceiveAllAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        yield return await _consumer.ReceiveAsync(cancellationToken).ConfigureAwait(false);
        // we probably need to ack message, so the browser won't block after we run out of the consumer credit
    }
}

But I'm not sure that reusing IConsumer to do all the heavy-lifting is the right call. I'd be more inclined to do sth similar to what was done in RequestReplyClient.

}
}
5 changes: 3 additions & 2 deletions src/ArtemisNetClient/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ public interface IConnection : IAsyncDisposable
/// </summary>
bool IsOpened { get; }
Task<ITopologyManager> CreateTopologyManagerAsync(CancellationToken cancellationToken = default);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IConsumer> CreateConsumerAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default, bool isBrowser = false);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IConsumer != IBrowser. They have different characteristics. I wouldn't pretend they represent the same thing and try to shoehorn them under a single interface umbrella.

Task<IProducer> CreateProducerAsync(ProducerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IAnonymousProducer> CreateAnonymousProducerAsync(AnonymousProducerConfiguration configuration, CancellationToken cancellationToken = default);
Task<IRequestReplyClient> CreateRequestReplyClientAsync(RequestReplyClientConfiguration configuration, CancellationToken cancellationToken = default);

Task<IBrowser> CreateBrowserAsync(ConsumerConfiguration configuration, CancellationToken cancellationToken = default);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a special BrowserConfiguration type.

A big chunk of properties from ConsumerConfiguration doesn't make sense in a browser scenario:

I think we don't need RoutingType as the queue should already exist, Credit also seems redundant as there is no api to acknowledge message, Shared, Durable.

I'm not sure about NoLocalFilter. It definitely works for message browser scenario, but I'm not sure if it makes sense to expose it as part of the public api.


/// <summary>
/// Raised when the connection is closed.
/// </summary>
Expand Down
144 changes: 144 additions & 0 deletions test/ArtemisNetClient.IntegrationTests/MessageBrowseSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Client.IntegrationTests
{
public class MessageBrowseSpec : ActiveMQNetIntegrationSpec
{
public MessageBrowseSpec(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Should_find_message_by_browsing_for_id_property_with_existing_id_and_single_message()
{
await using var connection = await CreateConnection();
var address = Guid.NewGuid().ToString();
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);

var messageId = Guid.NewGuid().ToString();
var message = new Message("foo");
message.ApplicationProperties["id"] = messageId;

await producer.SendAsync(message);

var configuration = new ConsumerConfiguration
{
Address = address,
RoutingType = RoutingType.Anycast,
FilterExpression = $"id = '{messageId}'",
};

await using var browser = await connection.CreateBrowserAsync(configuration);


Message existingMessage = null;
if (browser.MoveNext())
{
existingMessage = browser.Current;
}

Assert.Equal("foo", existingMessage.GetBody<string>());
}

[Fact]
public async Task Should_find_message_by_browsing_for_id_property_with_existing_id_and_multiple_messages()
{
await using var connection = await CreateConnection();
var address = Guid.NewGuid().ToString();
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);

var messageId = Guid.NewGuid().ToString();
var message = new Message("foo");
message.ApplicationProperties["id"] = messageId;

await producer.SendAsync(new Message(Guid.NewGuid().ToString()));
await producer.SendAsync(new Message(Guid.NewGuid().ToString()));

await producer.SendAsync(message);

await producer.SendAsync(new Message(Guid.NewGuid().ToString()));
await producer.SendAsync(new Message(Guid.NewGuid().ToString()));

var configuration = new ConsumerConfiguration
{
Address = address,
RoutingType = RoutingType.Anycast,
FilterExpression = $"id = '{messageId}'",
};

await using var browser = await connection.CreateBrowserAsync(configuration);


Message existingMessage = null;
if (browser.MoveNext())
{
existingMessage = browser.Current;
}

Assert.Equal("foo", existingMessage.GetBody<string>());
}

[Fact]
public async Task Should_not_find_message_by_browsing_for_id_property_with_inexisting_id()
{
await using var connection = await CreateConnection();
var address = Guid.NewGuid().ToString();
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);

var message = new Message("foo");
message.ApplicationProperties["id"] = Guid.NewGuid().ToString();

await producer.SendAsync(message);

var configuration = new ConsumerConfiguration
{
Address = address,
RoutingType = RoutingType.Anycast,
FilterExpression = $"id = '{Guid.NewGuid()}'", // inexistent id
};

await using var browser = await connection.CreateBrowserAsync(configuration);

Message maybeMessage = null;
if (browser.MoveNext())
{
maybeMessage = browser.Current;
}

Assert.Null(maybeMessage);
}

[Fact]
public async Task Should_not_find_message_by_browsing_for_id_property_with_empty_id()
{
await using var connection = await CreateConnection();
var address = Guid.NewGuid().ToString();
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);

var message = new Message("foo");
message.ApplicationProperties["id"] = Guid.NewGuid().ToString();

await producer.SendAsync(message);

var configuration = new ConsumerConfiguration
{
Address = address,
RoutingType = RoutingType.Anycast,
FilterExpression = $"id = ''", // empty id
};

await using var browser = await connection.CreateBrowserAsync(configuration);

Message maybeMessage = null;
if (browser.MoveNext())
{
maybeMessage = browser.Current;
}

Assert.Null(maybeMessage);
}
}
}
Loading