Skip to content

Commit d5b445a

Browse files
authored
Fix sharded pub/sub handling over slot migrations (#2969)
* add failing test and mitigation for OSS sharded sunbscribe behavior * fix unsolicited SUNSUBSCRIBE * remove redundant code * ssh test * SUNSUBSCRIBE handling; if possible, use the active connection to find where we should be subscribing * PR nits * more PR nits
1 parent 8a6ad4a commit d5b445a

File tree

6 files changed

+426
-78
lines changed

6 files changed

+426
-78
lines changed

docs/ReleaseNotes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ Current package versions:
88

99
## Unreleased
1010

11+
- Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969))
12+
1113
## 2.9.25
1214

1315
- (build) Fix SNK on non-Windows builds ([#2963 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2963))

src/StackExchange.Redis/Message.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ protected override void WriteImpl(PhysicalConnection physical)
856856

857857
internal abstract class CommandChannelBase : Message
858858
{
859-
protected readonly RedisChannel Channel;
859+
internal readonly RedisChannel Channel;
860860

861861
protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel) : base(db, flags, command)
862862
{

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 204 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ internal sealed partial class PhysicalConnection : IDisposable
2929

3030
private const int DefaultRedisDatabaseCount = 16;
3131

32-
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
33-
3432
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
3533
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
3634

@@ -1669,6 +1667,130 @@ internal async ValueTask<bool> ConnectedAsync(Socket? socket, ILogger? log, Sock
16691667
}
16701668
}
16711669

1670+
private enum PushKind
1671+
{
1672+
None,
1673+
Message,
1674+
PMessage,
1675+
SMessage,
1676+
Subscribe,
1677+
PSubscribe,
1678+
SSubscribe,
1679+
Unsubscribe,
1680+
PUnsubscribe,
1681+
SUnsubscribe,
1682+
}
1683+
private PushKind GetPushKind(in Sequence<RawResult> result, out RedisChannel channel)
1684+
{
1685+
var len = result.Length;
1686+
if (len < 2)
1687+
{
1688+
// for supported cases, we demand at least the kind and the subscription channel
1689+
channel = default;
1690+
return PushKind.None;
1691+
}
1692+
1693+
const int MAX_LEN = 16;
1694+
Debug.Assert(MAX_LEN >= Enumerable.Max(
1695+
[
1696+
PushMessage.Length, PushPMessage.Length, PushSMessage.Length,
1697+
PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length,
1698+
PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length,
1699+
]));
1700+
ref readonly RawResult pushKind = ref result[0];
1701+
var multiSegmentPayload = pushKind.Payload;
1702+
if (multiSegmentPayload.Length <= MAX_LEN)
1703+
{
1704+
var span = multiSegmentPayload.IsSingleSegment
1705+
? multiSegmentPayload.First.Span
1706+
: CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload);
1707+
1708+
var hash = FastHash.Hash64(span);
1709+
RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None;
1710+
PushKind kind;
1711+
switch (hash)
1712+
{
1713+
case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3:
1714+
kind = PushKind.Message;
1715+
break;
1716+
case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4:
1717+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1718+
kind = PushKind.PMessage;
1719+
break;
1720+
case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3:
1721+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1722+
kind = PushKind.SMessage;
1723+
break;
1724+
case PushSubscribe.Hash when PushSubscribe.Is(hash, span):
1725+
kind = PushKind.Subscribe;
1726+
break;
1727+
case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span):
1728+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1729+
kind = PushKind.PSubscribe;
1730+
break;
1731+
case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span):
1732+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1733+
kind = PushKind.SSubscribe;
1734+
break;
1735+
case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span):
1736+
kind = PushKind.Unsubscribe;
1737+
break;
1738+
case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span):
1739+
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
1740+
kind = PushKind.PUnsubscribe;
1741+
break;
1742+
case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span):
1743+
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
1744+
kind = PushKind.SUnsubscribe;
1745+
break;
1746+
default:
1747+
kind = PushKind.None;
1748+
break;
1749+
}
1750+
if (kind != PushKind.None)
1751+
{
1752+
// the channel is always the second element
1753+
channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions);
1754+
return kind;
1755+
}
1756+
}
1757+
channel = default;
1758+
return PushKind.None;
1759+
1760+
static ReadOnlySpan<byte> CopyTo(Span<byte> target, in ReadOnlySequence<byte> source)
1761+
{
1762+
source.CopyTo(target);
1763+
return target.Slice(0, (int)source.Length);
1764+
}
1765+
}
1766+
1767+
[FastHash("message")]
1768+
private static partial class PushMessage { }
1769+
1770+
[FastHash("pmessage")]
1771+
private static partial class PushPMessage { }
1772+
1773+
[FastHash("smessage")]
1774+
private static partial class PushSMessage { }
1775+
1776+
[FastHash("subscribe")]
1777+
private static partial class PushSubscribe { }
1778+
1779+
[FastHash("psubscribe")]
1780+
private static partial class PushPSubscribe { }
1781+
1782+
[FastHash("ssubscribe")]
1783+
private static partial class PushSSubscribe { }
1784+
1785+
[FastHash("unsubscribe")]
1786+
private static partial class PushUnsubscribe { }
1787+
1788+
[FastHash("punsubscribe")]
1789+
private static partial class PushPUnsubscribe { }
1790+
1791+
[FastHash("sunsubscribe")]
1792+
private static partial class PushSUnsubscribe { }
1793+
16721794
private void MatchResult(in RawResult result)
16731795
{
16741796
// check to see if it could be an out-of-band pubsub message
@@ -1679,85 +1801,87 @@ private void MatchResult(in RawResult result)
16791801

16801802
// out of band message does not match to a queued message
16811803
var items = result.GetItems();
1682-
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
1804+
var kind = GetPushKind(items, out var subscriptionChannel);
1805+
switch (kind)
16831806
{
1684-
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
1807+
case PushKind.Message:
1808+
case PushKind.SMessage:
1809+
_readStatus = kind is PushKind.Message ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
16851810

1686-
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
1687-
var configChanged = muxer.ConfigurationChangedChannel;
1688-
if (configChanged != null && items[1].IsEqual(configChanged))
1689-
{
1690-
EndPoint? blame = null;
1691-
try
1811+
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
1812+
var configChanged = muxer.ConfigurationChangedChannel;
1813+
if (configChanged != null && items[1].IsEqual(configChanged))
16921814
{
1693-
if (!items[2].IsEqual(CommonReplies.wildcard))
1815+
EndPoint? blame = null;
1816+
try
16941817
{
1695-
// We don't want to fail here, just trying to identify
1696-
_ = Format.TryParseEndPoint(items[2].GetString(), out blame);
1818+
if (!items[2].IsEqual(CommonReplies.wildcard))
1819+
{
1820+
// We don't want to fail here, just trying to identify
1821+
_ = Format.TryParseEndPoint(items[2].GetString(), out blame);
1822+
}
1823+
}
1824+
catch
1825+
{
1826+
/* no biggie */
16971827
}
1698-
}
1699-
catch { /* no biggie */ }
1700-
Trace("Configuration changed: " + Format.ToString(blame));
1701-
_readStatus = ReadStatus.Reconfigure;
1702-
muxer.ReconfigureIfNeeded(blame, true, "broadcast");
1703-
}
17041828

1705-
// invoke the handlers
1706-
RedisChannel channel;
1707-
if (items[0].IsEqual(message))
1708-
{
1709-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
1710-
Trace("MESSAGE: " + channel);
1711-
}
1712-
else // see check on outer-if that restricts to message / smessage
1713-
{
1714-
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
1715-
Trace("SMESSAGE: " + channel);
1716-
}
1717-
if (!channel.IsNull)
1718-
{
1719-
if (TryGetPubSubPayload(items[2], out var payload))
1720-
{
1721-
_readStatus = ReadStatus.InvokePubSub;
1722-
muxer.OnMessage(channel, channel, payload);
1829+
Trace("Configuration changed: " + Format.ToString(blame));
1830+
_readStatus = ReadStatus.Reconfigure;
1831+
muxer.ReconfigureIfNeeded(blame, true, "broadcast");
17231832
}
1724-
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
1725-
else if (TryGetMultiPubSubPayload(items[2], out var payloads))
1833+
1834+
// invoke the handlers
1835+
if (!subscriptionChannel.IsNull)
17261836
{
1727-
_readStatus = ReadStatus.InvokePubSub;
1728-
muxer.OnMessage(channel, channel, payloads);
1837+
Trace($"{kind}: {subscriptionChannel}");
1838+
if (TryGetPubSubPayload(items[2], out var payload))
1839+
{
1840+
_readStatus = ReadStatus.InvokePubSub;
1841+
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payload);
1842+
}
1843+
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
1844+
else if (TryGetMultiPubSubPayload(items[2], out var payloads))
1845+
{
1846+
_readStatus = ReadStatus.InvokePubSub;
1847+
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payloads);
1848+
}
17291849
}
1730-
}
1731-
return; // AND STOP PROCESSING!
1732-
}
1733-
else if (items.Length >= 4 && items[0].IsEqual(pmessage))
1734-
{
1735-
_readStatus = ReadStatus.PubSubPMessage;
1850+
return; // and stop processing
1851+
case PushKind.PMessage:
1852+
_readStatus = ReadStatus.PubSubPMessage;
17361853

1737-
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1738-
1739-
Trace("PMESSAGE: " + channel);
1740-
if (!channel.IsNull)
1741-
{
1742-
if (TryGetPubSubPayload(items[3], out var payload))
1854+
var messageChannel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
1855+
if (!messageChannel.IsNull)
17431856
{
1744-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1745-
1746-
_readStatus = ReadStatus.InvokePubSub;
1747-
muxer.OnMessage(sub, channel, payload);
1857+
Trace($"{kind}: {messageChannel} via {subscriptionChannel}");
1858+
if (TryGetPubSubPayload(items[3], out var payload))
1859+
{
1860+
_readStatus = ReadStatus.InvokePubSub;
1861+
muxer.OnMessage(subscriptionChannel, messageChannel, payload);
1862+
}
1863+
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
1864+
{
1865+
_readStatus = ReadStatus.InvokePubSub;
1866+
muxer.OnMessage(subscriptionChannel, messageChannel, payloads);
1867+
}
17481868
}
1749-
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
1869+
return; // and stop processing
1870+
case PushKind.SUnsubscribe when !PeekChannelMessage(RedisCommand.SUNSUBSCRIBE, subscriptionChannel):
1871+
// then it was *unsolicited* - this probably means the slot was migrated
1872+
// (otherwise, we'll let the command-processor deal with it)
1873+
_readStatus = ReadStatus.PubSubUnsubscribe;
1874+
var server = BridgeCouldBeNull?.ServerEndPoint;
1875+
if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription))
17501876
{
1751-
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
1752-
1753-
_readStatus = ReadStatus.InvokePubSub;
1754-
muxer.OnMessage(sub, channel, payloads);
1877+
// wipe and reconnect; but: to where?
1878+
// counter-intuitively, the only server we *know* already knows the new route is:
1879+
// the outgoing server, since it had to change to MIGRATING etc; the new INCOMING server
1880+
// knows, but *we don't know who that is*, and other nodes: aren't guaranteed to know (yet)
1881+
muxer.DefaultSubscriber.ResubscribeToServer(subscription, subscriptionChannel, server, cause: PushSUnsubscribe.Text);
17551882
}
1756-
}
1757-
return; // AND STOP PROCESSING!
1883+
return; // and STOP PROCESSING; unsolicited
17581884
}
1759-
1760-
// if it didn't look like "[p|s]message", then we still need to process the pending queue
17611885
}
17621886
Trace("Matching result...");
17631887

@@ -1875,6 +1999,19 @@ static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence<RawResult>
18751999
}
18762000
}
18772001

2002+
private bool PeekChannelMessage(RedisCommand command, RedisChannel channel)
2003+
{
2004+
Message? msg;
2005+
bool haveMsg;
2006+
lock (_writtenAwaitingResponse)
2007+
{
2008+
haveMsg = _writtenAwaitingResponse.TryPeek(out msg);
2009+
}
2010+
2011+
return haveMsg && msg is CommandChannelBase typed
2012+
&& typed.Command == command && typed.Channel == channel;
2013+
}
2014+
18782015
private volatile Message? _activeMessage;
18792016

18802017
internal void GetHeadMessages(out Message? now, out Message? next)
@@ -2168,6 +2305,7 @@ internal enum ReadStatus
21682305
MatchResultComplete,
21692306
ResetArena,
21702307
ProcessBufferComplete,
2308+
PubSubUnsubscribe,
21712309
NA = -1,
21722310
}
21732311
private volatile ReadStatus _readStatus;

0 commit comments

Comments
 (0)