Skip to content

Commit fe249d6

Browse files
committed
When subscribing to already subscribed channel or group should not resubscribe just emit SubscriptionChanged status.
1 parent ca83e11 commit fe249d6

File tree

4 files changed

+200
-13
lines changed

4 files changed

+200
-13
lines changed

pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,138 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
13651365
}
13661366
}
13671367

1368+
@Test
1369+
fun whenSubscribingToAlreadySubscribedChannelShouldNotResubscribeButShouldEmitSubscriptionChangedStatus() {
1370+
val channelName = randomChannel()
1371+
val channelName02 = randomChannel()
1372+
val expectedMessage = "test_${randomValue()}"
1373+
1374+
val connectedLatch = CountDownLatch(1) // Wait for first connection
1375+
val subscriptionChangedLatch = CountDownLatch(1) // Wait for subscription changed event
1376+
val messagesLatch = CountDownLatch(2) // Wait for both listeners to receive message
1377+
1378+
val connectedEventCount = AtomicInteger(0)
1379+
val subscriptionChangedCount =
1380+
AtomicInteger(0) // status event emitted but no actual resubscribe when channels unchanged
1381+
val subscribeHttpRequestCount = AtomicInteger(0) // Count actual HTTP subscribe requests
1382+
1383+
// Track messages received by each listener
1384+
val listenerAMessageCount = AtomicInteger(0)
1385+
val listenerBMessageCount = AtomicInteger(0)
1386+
1387+
// Custom logger to count HTTP subscribe requests
1388+
val customLogger = object : CustomLogger {
1389+
override fun debug(logMessage: LogMessage) {
1390+
if (logMessage.type == LogMessageType.NETWORK_REQUEST) {
1391+
val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest
1392+
if (networkRequestDetails.path.contains("/v2/subscribe/")) {
1393+
subscribeHttpRequestCount.incrementAndGet()
1394+
println("HTTP Subscribe request #${subscribeHttpRequestCount.get()}: ${networkRequestDetails.path}")
1395+
}
1396+
}
1397+
}
1398+
}
1399+
1400+
clientConfig = {
1401+
customLoggers = listOf(customLogger)
1402+
}
1403+
1404+
pubnub.addListener(
1405+
object : StatusListener {
1406+
override fun status(
1407+
pubnub: PubNub,
1408+
status: PNStatus,
1409+
) {
1410+
when (status.category) {
1411+
PNStatusCategory.PNConnectedCategory -> {
1412+
connectedEventCount.incrementAndGet()
1413+
connectedLatch.countDown()
1414+
}
1415+
1416+
PNStatusCategory.PNSubscriptionChanged -> {
1417+
subscriptionChangedCount.incrementAndGet()
1418+
subscriptionChangedLatch.countDown()
1419+
}
1420+
1421+
else -> {}
1422+
}
1423+
}
1424+
}
1425+
)
1426+
1427+
// First subscription
1428+
val subscriptionSet1 = pubnub.subscriptionSetOf(setOf(channelName, channelName02))
1429+
subscriptionSet1.addListener(
1430+
object : EventListener {
1431+
override fun message(
1432+
pubnub: PubNub,
1433+
result: PNMessageResult,
1434+
) {
1435+
println("ListenerA received: ${result.message.asString}")
1436+
listenerAMessageCount.incrementAndGet()
1437+
messagesLatch.countDown()
1438+
}
1439+
}
1440+
)
1441+
1442+
// Second subscription (SAME channel, different listener)
1443+
val subscriptionSet2 = pubnub.subscriptionSetOf(setOf(channelName))
1444+
subscriptionSet2.addListener(
1445+
object : EventListener {
1446+
override fun message(
1447+
pubnub: PubNub,
1448+
result: PNMessageResult,
1449+
) {
1450+
println("ListenerB received: ${result.message.asString}")
1451+
listenerBMessageCount.incrementAndGet()
1452+
messagesLatch.countDown()
1453+
}
1454+
}
1455+
)
1456+
1457+
try {
1458+
subscriptionSet1.subscribe()
1459+
assertTrue("Failed to receive PNConnectedCategory", connectedLatch.await(5, TimeUnit.SECONDS))
1460+
1461+
subscriptionSet2.subscribe()
1462+
assertTrue("Failed to receive PNSubscriptionChanged", subscriptionChangedLatch.await(5, TimeUnit.SECONDS))
1463+
1464+
// Give a moment for any potential resubscribe to occur
1465+
Thread.sleep(500)
1466+
1467+
// Publish a message - both listeners should receive it
1468+
pubnub.publish(channelName, expectedMessage).sync()
1469+
assertTrue("Failed to receive messages on both listeners", messagesLatch.await(5, TimeUnit.SECONDS))
1470+
1471+
// Verify both listeners received the message (this works regardless of the bug)
1472+
assertEquals("ListenerA should receive message", 1, listenerAMessageCount.get())
1473+
assertEquals("ListenerB should receive message", 1, listenerBMessageCount.get())
1474+
1475+
assertEquals(
1476+
"Should emit PNSubscriptionChanged status but not resubscribe when channels unchanged",
1477+
1, // Status emitted but no actual resubscribe (no cancel/new receive)
1478+
subscriptionChangedCount.get()
1479+
)
1480+
1481+
assertEquals(
1482+
"Should have exactly 1 PNConnectedCategory (initial handshake only)",
1483+
1,
1484+
connectedEventCount.get()
1485+
)
1486+
1487+
// Should only have 2 HTTP "/subscribe" requests: handshake + subscribe
1488+
assertEquals(
1489+
"Should have exactly 2 HTTP subscribe requests (handshake + subscribe, NO resubscribe)",
1490+
2,
1491+
subscribeHttpRequestCount.get()
1492+
)
1493+
} finally {
1494+
// Ensure cleanup happens even if assertions fail
1495+
subscriptionSet1.close()
1496+
subscriptionSet2.close()
1497+
}
1498+
}
1499+
13681500
@Test
13691501
fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels() {
13701502
// given

pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ internal interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>
1010

1111
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())
1212

13+
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransitionWithEffects(
14+
vararg invocations: Ei,
15+
): Pair<S, Set<Ei>> = Pair(this, invocations.toSet())
16+
1317
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
1418
state: S,
1519
vararg invocations: Ei,

pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.pubnub.api.enums.PNStatusCategory
55
import com.pubnub.api.models.consumer.PNStatus
66
import com.pubnub.internal.eventengine.State
77
import com.pubnub.internal.eventengine.noTransition
8+
import com.pubnub.internal.eventengine.noTransitionWithEffects
89
import com.pubnub.internal.eventengine.transitionTo
910
import com.pubnub.internal.subscribe.eventengine.effect.SubscribeEffectInvocation
1011
import com.pubnub.internal.subscribe.eventengine.event.SubscribeEvent
@@ -227,17 +228,32 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
227228
}
228229

229230
is SubscribeEvent.SubscriptionChanged -> {
230-
transitionTo(
231-
Receiving(event.channels, event.channelGroups, subscriptionCursor),
232-
SubscribeEffectInvocation.EmitStatus(
233-
PNStatus(
234-
PNStatusCategory.PNSubscriptionChanged,
235-
currentTimetoken = subscriptionCursor.timetoken,
236-
affectedChannels = event.channels,
237-
affectedChannelGroups = event.channelGroups,
231+
// If channels and channelGroups haven't changed, emit status without resubscribing
232+
if (event.channels == channels && event.channelGroups == channelGroups) {
233+
noTransitionWithEffects(
234+
SubscribeEffectInvocation.EmitStatus(
235+
PNStatus(
236+
PNStatusCategory.PNSubscriptionChanged,
237+
currentTimetoken = subscriptionCursor.timetoken,
238+
affectedChannels = event.channels,
239+
affectedChannelGroups = event.channelGroups,
240+
),
238241
),
239-
),
240-
)
242+
)
243+
} else {
244+
// Channels changed, need to resubscribe
245+
transitionTo(
246+
Receiving(event.channels, event.channelGroups, subscriptionCursor),
247+
SubscribeEffectInvocation.EmitStatus(
248+
PNStatus(
249+
PNStatusCategory.PNSubscriptionChanged,
250+
currentTimetoken = subscriptionCursor.timetoken,
251+
affectedChannels = event.channels,
252+
affectedChannelGroups = event.channelGroups,
253+
),
254+
),
255+
)
256+
}
241257
}
242258

243259
is SubscribeEvent.SubscriptionRestored -> {

pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,44 @@ class TransitionFromReceivingStateTest {
8484
}
8585

8686
@Test
87-
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event() {
87+
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event_with_different_channels_and_groups() {
8888
// given
89+
val newChannels = setOf("Channel2", "Channel3")
90+
val newChannelGroups = setOf("ChannelGroup2")
91+
92+
// when
93+
val (state, invocations) =
94+
transition(
95+
SubscribeState.Receiving(channels, channelGroups, subscriptionCursor),
96+
SubscribeEvent.SubscriptionChanged(newChannels, newChannelGroups),
97+
)
98+
99+
// then
100+
Assertions.assertTrue(state is SubscribeState.Receiving)
101+
state as SubscribeState.Receiving
102+
103+
assertEquals(newChannels, state.channels)
104+
assertEquals(newChannelGroups, state.channelGroups)
105+
assertEquals(subscriptionCursor, state.subscriptionCursor)
106+
assertEquals(
107+
setOf(
108+
SubscribeEffectInvocation.CancelReceiveMessages,
109+
SubscribeEffectInvocation.EmitStatus(
110+
createSubscriptionChangedStatus(
111+
state.subscriptionCursor,
112+
newChannels,
113+
newChannelGroups,
114+
),
115+
),
116+
SubscribeEffectInvocation.ReceiveMessages(newChannels, newChannelGroups, subscriptionCursor),
117+
),
118+
invocations,
119+
)
120+
}
121+
122+
@Test
123+
fun stays_in_RECEIVING_and_emits_status_without_resubscribing_when_SUBSCRIPTION_CHANGED_event_has_same_channels_and_groups() {
124+
// given - channels and channelGroups are the same
89125
// when
90126
val (state, invocations) =
91127
transition(
@@ -100,17 +136,16 @@ class TransitionFromReceivingStateTest {
100136
assertEquals(channels, state.channels)
101137
assertEquals(channelGroups, state.channelGroups)
102138
assertEquals(subscriptionCursor, state.subscriptionCursor)
139+
// Should only emit status - no cancel or new receive
103140
assertEquals(
104141
setOf(
105-
SubscribeEffectInvocation.CancelReceiveMessages,
106142
SubscribeEffectInvocation.EmitStatus(
107143
createSubscriptionChangedStatus(
108144
state.subscriptionCursor,
109145
channels,
110146
channelGroups,
111147
),
112148
),
113-
SubscribeEffectInvocation.ReceiveMessages(channels, channelGroups, subscriptionCursor),
114149
),
115150
invocations,
116151
)

0 commit comments

Comments
 (0)