Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
c.media.HandleDTMF(c.handleDTMF)
}

c.stats.startRateLogger(c.ctx, c.log, defaultRateLoggerInterval, RoomSampleRate)

// Must be set earlier to send the pin prompts.
if w := c.lkRoom.SwapOutput(c.media.GetAudioWriter()); w != nil {
_ = w.Close()
Expand Down
140 changes: 122 additions & 18 deletions pkg/sip/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package sip

import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync/atomic"
"time"

"github.com/pion/interceptor"
prtp "github.com/pion/rtp"
Expand All @@ -27,15 +29,22 @@ import (

"github.com/livekit/media-sdk/rtp"

"github.com/livekit/protocol/logger"
"github.com/livekit/sip/pkg/stats"
)

const (
defaultRateLoggerInterval = 10 * time.Second
)

var _ json.Marshaler = (*Stats)(nil)

type Stats struct {
Port PortStats
Room RoomStats
Closed atomic.Bool

rateLoggerStarted atomic.Bool
}

type StatsSnapshot struct {
Expand All @@ -57,6 +66,11 @@ type PortStatsSnapshot struct {
AudioPackets uint64 `json:"audio_packets"`
AudioBytes uint64 `json:"audio_bytes"`

AudioInFrames uint64 `json:"audio_in_frames"`
AudioInSamples uint64 `json:"audio_in_samples"`
AudioOutFrames uint64 `json:"audio_out_frames"`
AudioOutSamples uint64 `json:"audio_out_samples"`

DTMFPackets uint64 `json:"dtmf_packets"`
DTMFBytes uint64 `json:"dtmf_bytes"`

Expand All @@ -67,6 +81,9 @@ type RoomStatsSnapshot struct {
InputPackets uint64 `json:"input_packets"`
InputBytes uint64 `json:"input_bytes"`

PublishedFrames uint64 `json:"published_frames"`
PublishedSamples uint64 `json:"published_samples"`

MixerSamples uint64 `json:"mixer_samples"`
MixerFrames uint64 `json:"mixer_frames"`

Expand Down Expand Up @@ -102,26 +119,32 @@ func (s *Stats) Load() StatsSnapshot {
m := &r.Mixer
return StatsSnapshot{
Port: PortStatsSnapshot{
Streams: p.Streams.Load(),
Packets: p.Packets.Load(),
IgnoredPackets: p.IgnoredPackets.Load(),
InputPackets: p.InputPackets.Load(),
MuxPackets: p.MuxPackets.Load(),
MuxBytes: p.MuxBytes.Load(),
AudioPackets: p.AudioPackets.Load(),
AudioBytes: p.AudioBytes.Load(),
DTMFPackets: p.DTMFPackets.Load(),
DTMFBytes: p.DTMFBytes.Load(),
Closed: p.Closed.Load(),
Streams: p.Streams.Load(),
Packets: p.Packets.Load(),
IgnoredPackets: p.IgnoredPackets.Load(),
InputPackets: p.InputPackets.Load(),
MuxPackets: p.MuxPackets.Load(),
MuxBytes: p.MuxBytes.Load(),
AudioPackets: p.AudioPackets.Load(),
AudioBytes: p.AudioBytes.Load(),
AudioInFrames: p.AudioInFrames.Load(),
AudioInSamples: p.AudioInSamples.Load(),
AudioOutFrames: p.AudioOutFrames.Load(),
AudioOutSamples: p.AudioOutSamples.Load(),
DTMFPackets: p.DTMFPackets.Load(),
DTMFBytes: p.DTMFBytes.Load(),
Closed: p.Closed.Load(),
},
Room: RoomStatsSnapshot{
InputPackets: r.InputPackets.Load(),
InputBytes: r.InputBytes.Load(),
MixerSamples: r.MixerSamples.Load(),
MixerFrames: r.MixerFrames.Load(),
OutputSamples: r.OutputSamples.Load(),
OutputFrames: r.OutputFrames.Load(),
Closed: r.Closed.Load(),
InputPackets: r.InputPackets.Load(),
InputBytes: r.InputBytes.Load(),
PublishedFrames: r.PublishedFrames.Load(),
PublishedSamples: r.PublishedSamples.Load(),
MixerSamples: r.MixerSamples.Load(),
MixerFrames: r.MixerFrames.Load(),
OutputSamples: r.OutputSamples.Load(),
OutputFrames: r.OutputFrames.Load(),
Closed: r.Closed.Load(),
},
Mixer: MixerStatsSnapshot{
Tracks: m.Tracks.Load(),
Expand All @@ -146,6 +169,87 @@ func (s *Stats) MarshalJSON() ([]byte, error) {
return json.Marshal(s.Load())
}

func (s *Stats) startRateLogger(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) {
if interval <= 0 {
interval = defaultRateLoggerInterval
}
if expectedSampleRate <= 0 {
expectedSampleRate = RoomSampleRate
}
if !s.rateLoggerStarted.CompareAndSwap(false, true) {
return
}
go s.rateLoggerLoop(ctx, log, interval, expectedSampleRate)
}

func (s *Stats) rateLoggerLoop(ctx context.Context, log logger.Logger, interval time.Duration, expectedSampleRate int) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

last := s.Load()
lastTime := time.Now()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
cur := s.Load()
dt := now.Sub(lastTime).Seconds()
if dt <= 0 {
last = cur
lastTime = now
continue
}

rxSamples := cur.Port.AudioInSamples - last.Port.AudioInSamples
txSamples := cur.Port.AudioOutSamples - last.Port.AudioOutSamples
pubSamples := cur.Room.PublishedSamples - last.Room.PublishedSamples

if rxSamples == 0 && txSamples == 0 && pubSamples == 0 {
last = cur
lastTime = now
if cur.Closed {
return
}
continue
}

rxRate := float64(rxSamples) / dt
txRate := float64(txSamples) / dt
pubRate := float64(pubSamples) / dt

log.Infow("media sample rate",
"sip_rx_pcm_hz", rxRate,
"sip_rx_ppm", ratePPM(rxRate, expectedSampleRate),
"sip_tx_pcm_hz", txRate,
"sip_tx_ppm", ratePPM(txRate, expectedSampleRate),
"lk_publish_pcm_hz", pubRate,
"lk_publish_ppm", ratePPM(pubRate, expectedSampleRate),
"expected_pcm_hz", expectedSampleRate,
"interval", now.Sub(lastTime),
"sip_rx_samples", rxSamples,
"sip_tx_samples", txSamples,
"lk_publish_samples", pubSamples,
)

last = cur
lastTime = now
if cur.Closed {
return
}
}
}
}

func ratePPM(rate float64, expected int) float64 {
if expected <= 0 {
return 0
}
return (rate - float64(expected)) / float64(expected) * 1_000_000
}

const (
channels = 1
RoomSampleRate = 48000
Expand Down
14 changes: 13 additions & 1 deletion pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type PortStats struct {
AudioPackets atomic.Uint64
AudioBytes atomic.Uint64

AudioInFrames atomic.Uint64
AudioInSamples atomic.Uint64
AudioOutFrames atomic.Uint64
AudioOutSamples atomic.Uint64

DTMFPackets atomic.Uint64
DTMFBytes atomic.Uint64

Expand Down Expand Up @@ -562,6 +567,9 @@ func (p *MediaPort) setupOutput() error {

// Encoding pipeline (LK PCM -> SIP RTP)
audioOut := p.conf.Audio.Codec.EncodeRTP(p.audioOutRTP)
if p.stats != nil {
audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples)
}

if p.conf.Audio.DTMFType != 0 {
p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate)
Expand All @@ -585,7 +593,11 @@ func (p *MediaPort) setupOutput() error {

func (p *MediaPort) setupInput() {
// Decoding pipeline (SIP RTP -> LK PCM)
audioHandler := p.conf.Audio.Codec.DecodeRTP(p.audioIn, p.conf.Audio.Type)
var audioWriter msdk.PCM16Writer = p.audioIn
if p.stats != nil {
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
}
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)
p.audioInHandler = audioHandler

mux := rtp.NewMux(nil)
Expand Down
1 change: 1 addition & 0 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error {
if err = c.media.SetConfig(mc); err != nil {
return err
}
c.stats.startRateLogger(context.Background(), c.log, defaultRateLoggerInterval, RoomSampleRate)

c.c.cmu.Lock()
c.c.byRemote[c.cc.Tag()] = c
Expand Down
5 changes: 4 additions & 1 deletion pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type RoomStats struct {
InputPackets atomic.Uint64
InputBytes atomic.Uint64

PublishedFrames atomic.Uint64
PublishedSamples atomic.Uint64

MixerFrames atomic.Uint64
MixerSamples atomic.Uint64

Expand Down Expand Up @@ -424,7 +427,7 @@ func (r *Room) NewParticipantTrack(sampleRate int) (msdk.WriteCloser[msdk.PCM16S
if err != nil {
return nil, err
}
return pw, nil
return newMediaWriterCount(pw, &r.stats.PublishedFrames, &r.stats.PublishedSamples), nil
}

func (r *Room) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error {
Expand Down