diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 800a6dda..b173aea6 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -896,6 +896,8 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt c.media.HandleDTMF(c.handleDTMF) } + c.stats.startRateLogger(c.ctx, c.log, defaultRateLoggerInterval, RoomSampleRate) + // Must be set earlier to send the pin prompts. if w := c.lkRoom.SwapOutput(c.media.GetAudioWriter()); w != nil { _ = w.Close() diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 90aedd77..0d24cd7c 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,10 +15,12 @@ package sip import ( + "context" "encoding/json" "fmt" "strconv" "sync/atomic" + "time" "github.com/pion/interceptor" prtp "github.com/pion/rtp" @@ -27,15 +29,22 @@ import ( "github.com/livekit/media-sdk/rtp" + "github.com/livekit/protocol/logger" "github.com/livekit/sip/pkg/stats" ) +const ( + defaultRateLoggerInterval = 10 * time.Second +) + var _ json.Marshaler = (*Stats)(nil) type Stats struct { Port PortStats Room RoomStats Closed atomic.Bool + + rateLoggerStarted atomic.Bool } type StatsSnapshot struct { @@ -57,6 +66,11 @@ type PortStatsSnapshot struct { AudioPackets uint64 `json:"audio_packets"` AudioBytes uint64 `json:"audio_bytes"` + AudioInFrames uint64 `json:"audio_in_frames"` + AudioInSamples uint64 `json:"audio_in_samples"` + AudioOutFrames uint64 `json:"audio_out_frames"` + AudioOutSamples uint64 `json:"audio_out_samples"` + DTMFPackets uint64 `json:"dtmf_packets"` DTMFBytes uint64 `json:"dtmf_bytes"` @@ -67,6 +81,9 @@ type RoomStatsSnapshot struct { InputPackets uint64 `json:"input_packets"` InputBytes uint64 `json:"input_bytes"` + PublishedFrames uint64 `json:"published_frames"` + PublishedSamples uint64 `json:"published_samples"` + MixerSamples uint64 `json:"mixer_samples"` MixerFrames uint64 `json:"mixer_frames"` @@ -102,26 +119,32 @@ func (s *Stats) Load() StatsSnapshot { m := &r.Mixer return StatsSnapshot{ Port: PortStatsSnapshot{ - Streams: p.Streams.Load(), - Packets: p.Packets.Load(), - IgnoredPackets: p.IgnoredPackets.Load(), - InputPackets: p.InputPackets.Load(), - MuxPackets: p.MuxPackets.Load(), - MuxBytes: p.MuxBytes.Load(), - AudioPackets: p.AudioPackets.Load(), - AudioBytes: p.AudioBytes.Load(), - DTMFPackets: p.DTMFPackets.Load(), - DTMFBytes: p.DTMFBytes.Load(), - Closed: p.Closed.Load(), + Streams: p.Streams.Load(), + Packets: p.Packets.Load(), + IgnoredPackets: p.IgnoredPackets.Load(), + InputPackets: p.InputPackets.Load(), + MuxPackets: p.MuxPackets.Load(), + MuxBytes: p.MuxBytes.Load(), + AudioPackets: p.AudioPackets.Load(), + AudioBytes: p.AudioBytes.Load(), + AudioInFrames: p.AudioInFrames.Load(), + AudioInSamples: p.AudioInSamples.Load(), + AudioOutFrames: p.AudioOutFrames.Load(), + AudioOutSamples: p.AudioOutSamples.Load(), + DTMFPackets: p.DTMFPackets.Load(), + DTMFBytes: p.DTMFBytes.Load(), + Closed: p.Closed.Load(), }, Room: RoomStatsSnapshot{ - InputPackets: r.InputPackets.Load(), - InputBytes: r.InputBytes.Load(), - MixerSamples: r.MixerSamples.Load(), - MixerFrames: r.MixerFrames.Load(), - OutputSamples: r.OutputSamples.Load(), - OutputFrames: r.OutputFrames.Load(), - Closed: r.Closed.Load(), + InputPackets: r.InputPackets.Load(), + InputBytes: r.InputBytes.Load(), + PublishedFrames: r.PublishedFrames.Load(), + PublishedSamples: r.PublishedSamples.Load(), + MixerSamples: r.MixerSamples.Load(), + MixerFrames: r.MixerFrames.Load(), + OutputSamples: r.OutputSamples.Load(), + OutputFrames: r.OutputFrames.Load(), + Closed: r.Closed.Load(), }, Mixer: MixerStatsSnapshot{ Tracks: m.Tracks.Load(), @@ -146,6 +169,87 @@ func (s *Stats) MarshalJSON() ([]byte, error) { return json.Marshal(s.Load()) } +func (s *Stats) startRateLogger(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { + if interval <= 0 { + interval = defaultRateLoggerInterval + } + if expectedSampleRate <= 0 { + expectedSampleRate = RoomSampleRate + } + if !s.rateLoggerStarted.CompareAndSwap(false, true) { + return + } + go s.rateLoggerLoop(ctx, log, interval, expectedSampleRate) +} + +func (s *Stats) rateLoggerLoop(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + last := s.Load() + lastTime := time.Now() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + now := time.Now() + cur := s.Load() + dt := now.Sub(lastTime).Seconds() + if dt <= 0 { + last = cur + lastTime = now + continue + } + + rxSamples := cur.Port.AudioInSamples - last.Port.AudioInSamples + txSamples := cur.Port.AudioOutSamples - last.Port.AudioOutSamples + pubSamples := cur.Room.PublishedSamples - last.Room.PublishedSamples + + if rxSamples == 0 && txSamples == 0 && pubSamples == 0 { + last = cur + lastTime = now + if cur.Closed { + return + } + continue + } + + rxRate := float64(rxSamples) / dt + txRate := float64(txSamples) / dt + pubRate := float64(pubSamples) / dt + + log.Infow("media sample rate", + "sip_rx_pcm_hz", rxRate, + "sip_rx_ppm", ratePPM(rxRate, expectedSampleRate), + "sip_tx_pcm_hz", txRate, + "sip_tx_ppm", ratePPM(txRate, expectedSampleRate), + "lk_publish_pcm_hz", pubRate, + "lk_publish_ppm", ratePPM(pubRate, expectedSampleRate), + "expected_pcm_hz", expectedSampleRate, + "interval", now.Sub(lastTime), + "sip_rx_samples", rxSamples, + "sip_tx_samples", txSamples, + "lk_publish_samples", pubSamples, + ) + + last = cur + lastTime = now + if cur.Closed { + return + } + } + } +} + +func ratePPM(rate float64, expected int) float64 { + if expected <= 0 { + return 0 + } + return (rate - float64(expected)) / float64(expected) * 1_000_000 +} + const ( channels = 1 RoomSampleRate = 48000 diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 0d623d3b..8d44b058 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -56,6 +56,11 @@ type PortStats struct { AudioPackets atomic.Uint64 AudioBytes atomic.Uint64 + AudioInFrames atomic.Uint64 + AudioInSamples atomic.Uint64 + AudioOutFrames atomic.Uint64 + AudioOutSamples atomic.Uint64 + DTMFPackets atomic.Uint64 DTMFBytes atomic.Uint64 @@ -562,6 +567,9 @@ func (p *MediaPort) setupOutput() error { // Encoding pipeline (LK PCM -> SIP RTP) audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP) + if p.stats != nil { + audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) + } if p.conf.Audio.DTMFType != 0 { p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) @@ -585,7 +593,11 @@ func (p *MediaPort) setupOutput() error { func (p *MediaPort) setupInput() { // Decoding pipeline (SIP RTP -> LK PCM) - audioHandler := p.conf.Audio.Codec.DecodeRTP(p.audioIn, p.conf.Audio.Type) + var audioWriter msdk.PCM16Writer = p.audioIn + if p.stats != nil { + audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) + } + audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) p.audioInHandler = audioHandler mux := rtp.NewMux(nil) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 7acb6579..c9c0d2c6 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -575,6 +575,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { if err = c.media.SetConfig(mc); err != nil { return err } + c.stats.startRateLogger(context.Background(), c.log, defaultRateLoggerInterval, RoomSampleRate) c.c.cmu.Lock() c.c.byRemote[c.cc.Tag()] = c diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 8e003828..e52c6bcc 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -41,6 +41,9 @@ type RoomStats struct { InputPackets atomic.Uint64 InputBytes atomic.Uint64 + PublishedFrames atomic.Uint64 + PublishedSamples atomic.Uint64 + MixerFrames atomic.Uint64 MixerSamples atomic.Uint64 @@ -424,7 +427,7 @@ func (r *Room) NewParticipantTrack(sampleRate int) (msdk.WriteCloser[msdk.PCM16S if err != nil { return nil, err } - return pw, nil + return newMediaWriterCount(pw, &r.stats.PublishedFrames, &r.stats.PublishedSamples), nil } func (r *Room) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error {