Skip to content

Commit

Permalink
Don't emit RoomEvent.Reconnecting for resumes (#371)
Browse files Browse the repository at this point in the history
* Don't emit RoomEvent.Reconnecting for resumes

* spotless
  • Loading branch information
davidliu authored Feb 3, 2024
1 parent 1cc0c67 commit 265d322
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 LiveKit, Inc.
* Copyright 2023-2024 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,4 +21,5 @@ enum class ConnectionState {
CONNECTED,
DISCONNECTED,
RECONNECTING,
RESUMING,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import io.livekit.android.room.util.createAnswer
import io.livekit.android.room.util.setLocalDescription
import io.livekit.android.util.CloseableCoroutineScope
import io.livekit.android.util.Either
import io.livekit.android.util.FlowObservable
import io.livekit.android.util.LKLog
import io.livekit.android.util.flowDelegate
import io.livekit.android.util.nullSafe
import io.livekit.android.webrtc.RTCStatsGetter
import io.livekit.android.webrtc.copy
Expand Down Expand Up @@ -73,36 +75,35 @@ internal constructor(
/**
* Reflects the combined connection state of SignalClient and primary PeerConnection.
*/
internal var connectionState: ConnectionState = ConnectionState.DISCONNECTED
set(value) {
val oldVal = field
field = value
if (value == oldVal) {
return
}
when (value) {
ConnectionState.CONNECTED -> {
if (oldVal == ConnectionState.DISCONNECTED) {
LKLog.d { "primary ICE connected" }
listener?.onEngineConnected()
} else if (oldVal == ConnectionState.RECONNECTING) {
LKLog.d { "primary ICE reconnected" }
listener?.onEngineReconnected()
}
@FlowObservable
internal var connectionState: ConnectionState by flowDelegate(ConnectionState.DISCONNECTED) { newVal, oldVal ->
if (newVal == oldVal) {
return@flowDelegate
}
when (newVal) {
ConnectionState.CONNECTED -> {
if (oldVal == ConnectionState.DISCONNECTED) {
LKLog.d { "primary ICE connected" }
listener?.onEngineConnected()
} else if (oldVal == ConnectionState.RECONNECTING) {
LKLog.d { "primary ICE reconnected" }
listener?.onEngineReconnected()
} else if (oldVal == ConnectionState.RESUMING) {
listener?.onEngineResumed()
}
}

ConnectionState.DISCONNECTED -> {
LKLog.d { "primary ICE disconnected" }
if (oldVal == ConnectionState.CONNECTED) {
reconnect()
}
ConnectionState.DISCONNECTED -> {
LKLog.d { "primary ICE disconnected" }
if (oldVal == ConnectionState.CONNECTED) {
reconnect()
}
}

else -> {
}
else -> {
}
}

}
internal var reconnectType: ReconnectType = ReconnectType.DEFAULT
private var reconnectingJob: Job? = null
private var fullReconnectOnNext = false
Expand Down Expand Up @@ -373,8 +374,8 @@ internal constructor(
val forceFullReconnect = fullReconnectOnNext
fullReconnectOnNext = false
val job = coroutineScope.launch {
connectionState = ConnectionState.RECONNECTING
listener?.onEngineReconnecting()
var hasResumedOnce = false
var hasReconnectedOnce = false

val reconnectStartTime = SystemClock.elapsedRealtime()
for (retries in 0 until MAX_RECONNECT_RETRIES) {
Expand Down Expand Up @@ -406,6 +407,12 @@ internal constructor(
val connectOptions = connectOptions ?: ConnectOptions()
if (isFullReconnect) {
LKLog.v { "Attempting full reconnect." }

if (!hasReconnectedOnce) {
hasReconnectedOnce = true
listener?.onEngineReconnecting()
}
connectionState = ConnectionState.RECONNECTING
try {
closeResources("Full Reconnecting")
listener?.onFullReconnecting()
Expand All @@ -416,6 +423,11 @@ internal constructor(
continue
}
} else {
if (!hasResumedOnce) {
hasResumedOnce = true
listener?.onEngineResuming()
}
connectionState = ConnectionState.RESUMING
LKLog.v { "Attempting soft reconnect." }
subscriber?.prepareForIceRestart()
try {
Expand Down Expand Up @@ -588,7 +600,7 @@ internal constructor(
MediaConstraintKeys.FALSE,
),
)
if (connectionState == ConnectionState.RECONNECTING) {
if (connectionState == ConnectionState.RECONNECTING || connectionState == ConnectionState.RESUMING) {
add(
MediaConstraints.KeyValuePair(
MediaConstraintKeys.ICE_RESTART,
Expand Down Expand Up @@ -678,6 +690,8 @@ internal constructor(
fun onEngineConnected()
fun onEngineReconnected()
fun onEngineReconnecting()
fun onEngineResuming() {}
fun onEngineResumed() {}
fun onEngineDisconnected(reason: DisconnectReason)
fun onFailToConnect(error: Throwable)
fun onJoinResponse(response: JoinResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ constructor(
* @suppress
*/
override fun onSignalConnected(isResume: Boolean) {
if (state == State.RECONNECTING && isResume) {
if (isResume) {
// during resume reconnection, need to send sync state upon signal connection.
sendSyncState()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ class RoomMockE2ETest : MockE2ETest() {
@Test
fun onConnectionAvailableWillReconnect() = runTest {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
val engine = component.rtcEngine()
val eventCollector = FlowCollector(engine::connectionState.flow, coroutineRule.scope)
val network = Mockito.mock(Network::class.java)

val connectivityManager = InstrumentationRegistry.getInstrumentation()
Expand All @@ -308,10 +309,16 @@ class RoomMockE2ETest : MockE2ETest() {
callback.onAvailable(network)
}

coroutineRule.dispatcher.scheduler.advanceUntilIdle()
val events = eventCollector.stopCollecting()

Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.Reconnecting)
assertEquals(
listOf(
ConnectionState.CONNECTED,
ConnectionState.RESUMING,
),
events,
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 LiveKit, Inc.
* Copyright 2023-2024 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,45 @@ class RoomReconnectionTypesMockE2ETest(
room.setReconnectionType(reconnectType)
}

private fun expectedEventsForReconnectType(reconnectType: ReconnectType): List<Class<out RoomEvent>> {
return when (reconnectType) {
ReconnectType.FORCE_SOFT_RECONNECT -> {
emptyList()
}

ReconnectType.FORCE_FULL_RECONNECT -> {
listOf(
RoomEvent.Reconnecting::class.java,
RoomEvent.Reconnected::class.java,
)
}

else -> {
throw IllegalArgumentException()
}
}
}

private fun expectedStatesForReconnectType(reconnectType: ReconnectType): List<Room.State> {
return when (reconnectType) {
ReconnectType.FORCE_SOFT_RECONNECT -> {
listOf(Room.State.CONNECTED)
}

ReconnectType.FORCE_FULL_RECONNECT -> {
listOf(
Room.State.CONNECTED,
Room.State.RECONNECTING,
Room.State.CONNECTED,
)
}

else -> {
throw IllegalArgumentException()
}
}
}

@Test
fun reconnectFromPeerConnectionDisconnect() = runTest {
connect()
Expand All @@ -83,19 +122,12 @@ class RoomReconnectionTypesMockE2ETest(
val states = stateCollector.stopCollecting()

assertIsClassList(
listOf(
RoomEvent.Reconnecting::class.java,
RoomEvent.Reconnected::class.java,
),
expectedEventsForReconnectType(reconnectType),
events,
)

assertEquals(
listOf(
Room.State.CONNECTED,
Room.State.RECONNECTING,
Room.State.CONNECTED,
),
expectedStatesForReconnectType(reconnectType),
states,
)
}
Expand All @@ -117,19 +149,12 @@ class RoomReconnectionTypesMockE2ETest(
val states = stateCollector.stopCollecting()

assertIsClassList(
listOf(
RoomEvent.Reconnecting::class.java,
RoomEvent.Reconnected::class.java,
),
expectedEventsForReconnectType(reconnectType),
events,
)

assertEquals(
listOf(
Room.State.CONNECTED,
Room.State.RECONNECTING,
Room.State.CONNECTED,
),
expectedStatesForReconnectType(reconnectType),
states,
)
}
Expand Down

0 comments on commit 265d322

Please sign in to comment.