Skip to content

Subscribing

Val edited this page Sep 4, 2016 · 8 revisions

Subscriptions and channels

Subscription commands are routed to the shared subscriber pool. Because the shared nature of the pool, multiple channels may be managing their subscriptions through the same connection.

Each channels selects the subscriber connection with less subscriptions at the moment and remembers it for the length of the channel lifetime. This selection is done in a lazy way, meaning that if the channel never tries to execute a subscription command (SUBSCRIBE, PSUBSCRIBE, etc..) it will never select a subscriber connection.

Aggregation

One important feature of RedisClient's subscriber connections is that they aggregate the topics to which its channels are subscribed too. Each connection aggregates the Redis topics and keeps track of how many channels are subscribed to each topic.

On channel disposal, if there are ongoing subscriptions for the channel, they will be cleared up. Make sure you always dispose your channels.

Consider this example:

  • Channel1 executes subscribe post:123:notifications, through Connection1.
  • Connection1 sends the command to Redis.
  • Channel2 also executes subscribe post:123:notifications, through Connection1.
  • Connection1 is aware that is already subscribed to that, so it does not send any command to Redis.
  • A message for the topic post:123:notifications arrives to Connection1.
  • Both Channel1 and Channel2 are notified.
  • Channel1 disposes or executes unsubscribe post:123:notifications.
  • It is executed also through Connection1 because it remembers the connection that was assigned to it, however since Channel2 is subscribed to that, the channel does not send any command to Redis.
  • Channel2 disposes or executes unsubscribe post:123:notifications.
  • It is executed also through Connection1 because it remembers the connection that was assigned to it, this time since there are no more channels subscribed to that, it sends the unsubscribe post:123:notifications command to Redis.

This subscription aggregation comes very handy for having many mostly-idle listeners like when using WebSockets for notifying changes in resources.

Subscribing channels to topics

IRedisChannel exposes a NotificationHandler property that can be used to get or set a handler for messages received by this channel. The handler will receive RedisNotification objects containing the message data.

using (var channel = Client.CreateChannel())
{
    channel.NotificationHandler = msg => Console.WriteLine(msg.Content); // will print 'whatever'
    channel.Execute("psubscribe h?llo");
    channel.Execute("publish hello whatever");
}

Note: You may feel tempted to put the SUBSCRIBE and PUBLISH statements in the same command, however it probably won't work because they will be executed in parallel in subscriber and commander connections respectively. Although technically possible to do, I considered this a very unlikely scenario, so in alas of better performace parallel execution is used.

This handler is not supposed to hold much logic, but just integrate with your development preferences. For example, I like TPL Dataflow, so in the SimpleQA application I use it to provide a more complete solution:

public sealed class RedisClientMessaging : IMessaging
{
    readonly IRedisChannel _channel;
    readonly BufferBlock<RedisNotification> _notifications;

    public RedisClientMessaging(IRedisChannel channel)
    {
        _channel = channel;
        _notifications = new BufferBlock<RedisNotification>();
        _channel.NotificationHandler = n => _notifications.Post(n);
    }

    public async Task<PushMessage> ReceiveMessage(CancellationToken cancel)
    {
        var notification = await _notifications.ReceiveAsync(cancel).ConfigureAwait(false);
        return new PushMessage() { Topic = notification.PublishedKey, Change = notification.Content };
    }

    public Task SendMessageAsync(PushSubscriptionRequest request, CancellationToken cancel)
    {
       _channel.Dispatch("subscribe @key", new { key = request.Topic });
       return Task.FromResult<Object>(null);
    }

    public void Dispose()
    {
        _channel.Dispose();
    }
}

Clone this wiki locally