diff --git a/.changeset/curly-trains-fix.md b/.changeset/curly-trains-fix.md new file mode 100644 index 00000000..dc7dac8f --- /dev/null +++ b/.changeset/curly-trains-fix.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Implement client metrics diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 581af546..6171c25d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -1028,6 +1028,20 @@ internal constructor( listener?.onTranscriptionReceived(dp.transcription) } + LivekitModels.DataPacket.ValueCase.METRICS -> { + // TODO + } + + LivekitModels.DataPacket.ValueCase.CHAT_MESSAGE -> { + // TODO + } + + LivekitModels.DataPacket.ValueCase.RPC_REQUEST, + LivekitModels.DataPacket.ValueCase.RPC_ACK, + LivekitModels.DataPacket.ValueCase.RPC_RESPONSE, + -> { + // TODO + } LivekitModels.DataPacket.ValueCase.VALUE_NOT_SET, null, -> { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index b186267e..fc54b815 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -38,6 +38,7 @@ import io.livekit.android.e2ee.E2EEOptions import io.livekit.android.events.* import io.livekit.android.memory.CloseableManager import io.livekit.android.renderer.TextureViewRenderer +import io.livekit.android.room.metrics.collectMetrics import io.livekit.android.room.network.NetworkCallbackManagerFactory import io.livekit.android.room.participant.* import io.livekit.android.room.provisions.LKObjects @@ -180,6 +181,12 @@ constructor( var isRecording: Boolean by flowDelegate(false) private set + /** + * @suppress + */ + @VisibleForTesting + var enableMetrics: Boolean = true + /** * end-to-end encryption manager */ @@ -441,6 +448,12 @@ constructor( val videoTrack = localParticipant.createVideoTrack() localParticipant.publishVideoTrack(videoTrack) } + + coroutineScope.launch { + if (enableMetrics) { + collectMetrics(room = this@Room, rtcEngine = engine) + } + } } val outerHandler = coroutineContext.job.invokeOnCompletion { cause -> diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt new file mode 100644 index 00000000..1324ec93 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/metrics/RTCMetricsManager.kt @@ -0,0 +1,320 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.metrics + +import io.livekit.android.room.RTCEngine +import io.livekit.android.room.Room +import io.livekit.android.room.participant.Participant +import io.livekit.android.util.LKLog +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import livekit.LivekitMetrics.MetricLabel +import livekit.LivekitMetrics.MetricSample +import livekit.LivekitMetrics.MetricsBatch +import livekit.LivekitMetrics.TimeSeriesMetric +import livekit.LivekitModels.DataPacket +import livekit.org.webrtc.RTCStats +import livekit.org.webrtc.RTCStatsReport +import java.util.concurrent.TimeUnit +import kotlin.coroutines.resume + +/** + * Handles getting the WebRTC metrics and sending them through the data channels. + * + * See [RTCMetric] for the related metrics we send. + */ +internal suspend fun collectMetrics(room: Room, rtcEngine: RTCEngine) = coroutineScope { + launch { collectPublisherMetrics(room, rtcEngine) } + launch { collectSubscriberMetrics(room, rtcEngine) } +} + +private suspend fun collectPublisherMetrics(room: Room, rtcEngine: RTCEngine) { + while (currentCoroutineContext().isActive) { + delay(1000) + val report = suspendCancellableCoroutine { cont -> + room.getPublisherRTCStats { cont.resume(it) } + } + + val strings = mutableListOf() + val stats = findPublisherVideoStats(strings, room, report, room.localParticipant.identity) + + val dataPacket = with(DataPacket.newBuilder()) { + metrics = with(MetricsBatch.newBuilder()) { + timestampMs = report.timestampUs.microToMilli() + addAllStrData(strings) + addAllTimeSeries(stats) + build() + } + kind = DataPacket.Kind.RELIABLE + build() + } + + try { + rtcEngine.sendData(dataPacket) + } catch (e: Exception) { + LKLog.i(e) { "Error sending metrics: " } + } + } +} + +private suspend fun collectSubscriberMetrics(room: Room, rtcEngine: RTCEngine) { + while (currentCoroutineContext().isActive) { + delay(1000) + val report = suspendCancellableCoroutine { cont -> + room.getSubscriberRTCStats { cont.resume(it) } + } + + val strings = mutableListOf() + val stats = findSubscriberAudioStats(strings, report, room.localParticipant.identity) + + findSubscriberVideoStats(strings, report, room.localParticipant.identity) + + val dataPacket = with(DataPacket.newBuilder()) { + metrics = with(MetricsBatch.newBuilder()) { + timestampMs = report.timestampUs.microToMilli() + addAllStrData(strings) + addAllTimeSeries(stats) + build() + } + kind = DataPacket.Kind.RELIABLE + build() + } + + try { + rtcEngine.sendData(dataPacket) + } catch (e: Exception) { + LKLog.i(e) { "Error sending metrics: " } + } + } +} + +private fun findPublisherVideoStats(strings: MutableList, room: Room, report: RTCStatsReport, participantIdentity: Participant.Identity?): List { + val mediaSources = report.statsMap + .values + .filter { stat -> stat.type == "media-source" && stat.members["kind"] == "video" } + val videoTracks = report.statsMap + .values + .filter { stat -> stat.type == "outbound-rtp" && stat.members["kind"] == "video" } + .mapNotNull { stat -> stat to getPublishVideoTrackSid(room, mediaSources, stat) } + + val metrics = videoTracks + .flatMap { (stat, trackSid) -> + val durations = stat.members["qualityLimitationDurations"] as? Map<*, *> ?: return emptyList() + val rid = stat.members["rid"] as? String + qualityLimitations.mapNotNull { (label, key) -> + val duration = durations[key] as? Number ?: return@mapNotNull null + val sample = createMetricSample(stat.timestampUs.microToMilli(), duration) + createTimeSeries( + label = label.protoLabel, + strings = strings, + samples = listOf(sample), + identity = participantIdentity, + trackSid = trackSid, + rid = rid, + ) + } + } + + return metrics +} + +/** + * The track sid isn't available on outbound-rtp stats, so we cross-reference against + * the MediaSource trackIdentifier (which is a locally generated id), and then look up + * the local published track for the sid. + */ +private fun getPublishVideoTrackSid(room: Room, mediaSources: List, videoTrack: RTCStats): String? { + val mediaSourceId = videoTrack.members["mediaSourceId"] ?: return null + val mediaSource = mediaSources.firstOrNull { m -> m.id == mediaSourceId } ?: return null + val trackIdentifier = mediaSource.members["trackIdentifier"] ?: return null + + val trackPubPair = room.localParticipant.videoTrackPublications + .firstOrNull { (_, track) -> track?.rtcTrack?.id() == trackIdentifier } ?: return null + + val (publication) = trackPubPair + + return publication.sid +} + +private fun findSubscriberAudioStats(strings: MutableList, report: RTCStatsReport, participantIdentity: Participant.Identity?): List { + val audioTracks = report.statsMap.filterValues { stat -> + stat.type == "inbound-rtp" && stat.members["kind"] == "audio" + } + + val metrics = audioTracks.values + .flatMap { stat -> + listOf( + RTCMetric.CONCEALED_SAMPLES, + RTCMetric.CONCEALMENT_EVENTS, + RTCMetric.SILENT_CONCEALED_SAMPLES, + RTCMetric.JITTER_BUFFER_DELAY, + RTCMetric.JITTER_BUFFER_EMITTED_COUNT, + ).mapNotNull { metric -> + createTimeSeriesForMetric( + stat = stat, + metric = metric, + strings = strings, + identity = participantIdentity, + ) + } + } + + return metrics +} + +private fun findSubscriberVideoStats(strings: MutableList, report: RTCStatsReport, participantIdentity: Participant.Identity?): List { + val videoTracks = report.statsMap.filterValues { stat -> + stat.type == "inbound-rtp" && stat.members["kind"] == "video" + } + + val metrics = videoTracks.values + .flatMap { stat -> + listOf( + RTCMetric.FREEZE_COUNT, + RTCMetric.TOTAL_FREEZES_DURATION, + RTCMetric.PAUSE_COUNT, + RTCMetric.TOTAL_PAUSES_DURATION, + RTCMetric.JITTER_BUFFER_DELAY, + RTCMetric.JITTER_BUFFER_EMITTED_COUNT, + ).mapNotNull { metric -> + createTimeSeriesForMetric( + stat = stat, + metric = metric, + strings = strings, + identity = participantIdentity, + ) + } + } + + return metrics +} + +// Utility methods + +/** + * Gets the final index to use for indexes pointing at the MetricsBatch.str_data. + * Index starts at [MetricLabel.METRIC_LABEL_PREDEFINED_MAX_VALUE]. + * + * Receivers should parse index values like so: + * ``` + * if index < LABEL_MAX_VALUE + * MetricLabel[index] + * else + * str_data[index - 4096] + * ``` + */ +private fun MutableList.getOrCreateIndex(string: String): Int { + var index = indexOf(string) + + if (index == -1) { + // Doesn't exist, create. + add(string) + index = size - 1 + } + + return index + MetricLabel.METRIC_LABEL_PREDEFINED_MAX_VALUE.number +} + +private fun createMetricSample( + timestampMs: Long, + value: Number, +): MetricSample { + return with(MetricSample.newBuilder()) { + this.timestampMs = timestampMs + this.value = value.toFloat() + build() + } +} + +private fun createTimeSeriesForMetric( + stat: RTCStats, + metric: RTCMetric, + strings: MutableList, + identity: Participant.Identity? = null, +): TimeSeriesMetric? { + val value = stat.members[metric.statKey] as? Number ?: return null + val trackSid = stat.members["trackIdentifier"] as? String ?: return null + val rid = stat.members["rid"] as? String + + val sample = createMetricSample(stat.timestampUs.microToMilli(), value) + + return createTimeSeries( + label = metric.protoLabel, + strings = strings, + samples = listOf(sample), + identity = identity, + trackSid = trackSid, + rid = rid, + ) +} + +private fun createTimeSeries( + label: MetricLabel, + strings: MutableList, + samples: List, + identity: Participant.Identity? = null, + trackSid: String? = null, + rid: String? = null, +): TimeSeriesMetric { + return with(TimeSeriesMetric.newBuilder()) { + this.label = label.number + + if (identity != null) { + this.participantIdentity = strings.getOrCreateIndex(identity.value) + } + if (trackSid != null) { + this.trackSid = strings.getOrCreateIndex(trackSid) + } + + if (rid != null) { + this.rid = strings.getOrCreateIndex(rid) + } + this.addAllSamples(samples) + build() + } +} + +private fun Number.microToMilli(): Long { + return TimeUnit.MILLISECONDS.convert(this.toLong(), TimeUnit.MILLISECONDS) +} + +private enum class RTCMetric(val protoLabel: MetricLabel, val statKey: String) { + FREEZE_COUNT(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_FREEZE_COUNT, "freezeCount"), + TOTAL_FREEZES_DURATION(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_TOTAL_FREEZE_DURATION, "totalFreezesDuration"), + PAUSE_COUNT(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_PAUSE_COUNT, "pauseCount"), + TOTAL_PAUSES_DURATION(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_TOTAL_PAUSES_DURATION, "totalPausesDuration"), + + CONCEALED_SAMPLES(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_CONCEALED_SAMPLES, "concealedSamples"), + SILENT_CONCEALED_SAMPLES(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_SILENT_CONCEALED_SAMPLES, "silentConcealedSamples"), + CONCEALMENT_EVENTS(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_CONCEALMENT_EVENTS, "concealmentEvents"), + + JITTER_BUFFER_DELAY(MetricLabel.CLIENT_SUBSCRIBER_JITTER_BUFFER_DELAY, "jitterBufferDelay"), + JITTER_BUFFER_EMITTED_COUNT(MetricLabel.CLIENT_SUBSCRIBER_JITTER_BUFFER_EMITTED_COUNT, "jitterBufferEmittedCount"), + + QUALITY_LIMITATION_DURATION_BANDWIDTH(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_BANDWIDTH, "qualityLimitationDurations"), + QUALITY_LIMITATION_DURATION_CPU(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_CPU, "qualityLimitationDurations"), + QUALITY_LIMITATION_DURATION_OTHER(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_OTHER, "qualityLimitationDurations"), +} + +private val qualityLimitations = listOf( + RTCMetric.QUALITY_LIMITATION_DURATION_CPU to "cpu", + RTCMetric.QUALITY_LIMITATION_DURATION_BANDWIDTH to "bandwidth", + RTCMetric.QUALITY_LIMITATION_DURATION_OTHER to "other", +) diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt b/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt index 47c0ed71..9bbb47df 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt @@ -25,6 +25,7 @@ import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.junit.Before import org.junit.Rule +import org.junit.rules.Timeout import org.mockito.junit.MockitoJUnit @OptIn(ExperimentalCoroutinesApi::class) @@ -39,6 +40,9 @@ abstract class BaseTest { @get:Rule var coroutineRule = TestCoroutineRule() + @get:Rule + var globalTimeout: Timeout = Timeout.seconds(60) + @Before fun setupRTCThread() { overrideExecutorAndDispatcher( diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt b/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt index 65480613..97252fb7 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt @@ -58,6 +58,9 @@ abstract class MockE2ETest : BaseTest() { room = component.roomFactory() .create(context) + .apply { + enableMetrics = false + } wsFactory = component.websocketFactory() } diff --git a/protocol b/protocol index 5c7350d2..a601adc5 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 5c7350d25904ed8fd8163e91ff47f0577ca6afad +Subproject commit a601adc5e9027820857a6d445b32a868b19d4184 diff --git a/sample-app-compose/src/main/java/io/livekit/android/composesample/MainActivity.kt b/sample-app-compose/src/main/java/io/livekit/android/composesample/MainActivity.kt index 677aaa9d..f0e4a506 100644 --- a/sample-app-compose/src/main/java/io/livekit/android/composesample/MainActivity.kt +++ b/sample-app-compose/src/main/java/io/livekit/android/composesample/MainActivity.kt @@ -17,8 +17,6 @@ package io.livekit.android.composesample import android.content.Intent -import android.net.ConnectivityManager -import android.net.Network import android.os.Bundle import android.widget.Toast import androidx.activity.ComponentActivity @@ -58,26 +56,10 @@ import io.livekit.android.sample.MainViewModel import io.livekit.android.sample.common.R import io.livekit.android.sample.model.StressTest import io.livekit.android.sample.util.requestNeededPermissions -import io.livekit.android.util.LKLog @ExperimentalPagerApi class MainActivity : ComponentActivity() { - private val networkCallback = object : ConnectivityManager.NetworkCallback() { - /** - * @suppress - */ - override fun onLost(network: Network) { - LKLog.i { "network connection lost" } - } - - /** - * @suppress - */ - override fun onAvailable(network: Network) { - LKLog.i { "network connection available, reconnecting" } - } - } private val viewModel by viewModels() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState)