Skip to content

Commit 35eeaa3

Browse files
authored
Improve logging for media (#475)
* Add tests for the media timeout logic. * Print call stats during keep-alives. Track closed states.
1 parent dd142da commit 35eeaa3

File tree

6 files changed

+255
-26
lines changed

6 files changed

+255
-26
lines changed

pkg/sip/inbound.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
815815
case <-ticker.C:
816816
c.log.Debugw("sending keep-alive")
817817
c.state.ForceFlush(ctx)
818+
c.printStats(c.log)
818819
case <-ctx.Done():
819820
c.closeWithHangup()
820821
return nil
@@ -1027,18 +1028,21 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
10271028
}
10281029
}
10291030

1031+
func (c *inboundCall) printStats(log logger.Logger) {
1032+
log.Infow("call statistics", "stats", c.stats.Load())
1033+
}
1034+
10301035
// close should only be called from handleInvite.
10311036
func (c *inboundCall) close(error bool, status CallStatus, reason string) {
10321037
if !c.done.CompareAndSwap(false, true) {
10331038
return
10341039
}
1035-
c.setStatus(status)
1036-
c.mon.CallTerminate(reason)
1040+
c.stats.Closed.Store(true)
10371041
sipCode, sipStatus := status.SIPStatus()
10381042
log := c.log.WithValues("status", sipCode, "reason", reason)
1039-
defer func() {
1040-
log.Infow("call statistics", "stats", c.stats.Load())
1041-
}()
1043+
defer c.printStats(log)
1044+
c.setStatus(status)
1045+
c.mon.CallTerminate(reason)
10421046
if error {
10431047
log.Warnw("Closing inbound call with error", nil)
10441048
} else {

pkg/sip/media.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ import (
2020
"strconv"
2121
"sync/atomic"
2222

23-
msdk "github.com/livekit/media-sdk"
2423
"github.com/pion/interceptor"
2524
prtp "github.com/pion/rtp"
2625

26+
msdk "github.com/livekit/media-sdk"
27+
2728
"github.com/livekit/media-sdk/rtp"
2829

2930
"github.com/livekit/sip/pkg/stats"
@@ -32,14 +33,16 @@ import (
3233
var _ json.Marshaler = (*Stats)(nil)
3334

3435
type Stats struct {
35-
Port PortStats
36-
Room RoomStats
36+
Port PortStats
37+
Room RoomStats
38+
Closed atomic.Bool
3739
}
3840

3941
type StatsSnapshot struct {
40-
Port PortStatsSnapshot `json:"port"`
41-
Room RoomStatsSnapshot `json:"room"`
42-
Mixer MixerStatsSnapshot `json:"mixer"`
42+
Port PortStatsSnapshot `json:"port"`
43+
Room RoomStatsSnapshot `json:"room"`
44+
Mixer MixerStatsSnapshot `json:"mixer"`
45+
Closed bool `json:"closed"`
4346
}
4447

4548
type PortStatsSnapshot struct {
@@ -56,6 +59,8 @@ type PortStatsSnapshot struct {
5659

5760
DTMFPackets uint64 `json:"dtmf_packets"`
5861
DTMFBytes uint64 `json:"dtmf_bytes"`
62+
63+
Closed bool `json:"closed"`
5964
}
6065

6166
type RoomStatsSnapshot struct {
@@ -67,6 +72,8 @@ type RoomStatsSnapshot struct {
6772

6873
OutputSamples uint64 `json:"output_samples"`
6974
OutputFrames uint64 `json:"output_frames"`
75+
76+
Closed bool `json:"closed"`
7077
}
7178

7279
type MixerStatsSnapshot struct {
@@ -105,6 +112,7 @@ func (s *Stats) Load() StatsSnapshot {
105112
AudioBytes: p.AudioBytes.Load(),
106113
DTMFPackets: p.DTMFPackets.Load(),
107114
DTMFBytes: p.DTMFBytes.Load(),
115+
Closed: p.Closed.Load(),
108116
},
109117
Room: RoomStatsSnapshot{
110118
InputPackets: r.InputPackets.Load(),
@@ -113,6 +121,7 @@ func (s *Stats) Load() StatsSnapshot {
113121
MixerFrames: r.MixerFrames.Load(),
114122
OutputSamples: r.OutputSamples.Load(),
115123
OutputFrames: r.OutputFrames.Load(),
124+
Closed: r.Closed.Load(),
116125
},
117126
Mixer: MixerStatsSnapshot{
118127
Tracks: m.Tracks.Load(),
@@ -129,6 +138,7 @@ func (s *Stats) Load() StatsSnapshot {
129138
OutputSamples: m.OutputSamples.Load(),
130139
OutputFrames: m.OutputFrames.Load(),
131140
},
141+
Closed: s.Closed.Load(),
132142
}
133143
}
134144

pkg/sip/media_port.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ type PortStats struct {
5858

5959
DTMFPackets atomic.Uint64
6060
DTMFBytes atomic.Uint64
61+
62+
Closed atomic.Bool
6163
}
6264

6365
type UDPConn interface {
@@ -271,6 +273,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
271273
ticker.Reset(tick)
272274
startPackets = p.packetCount.Load()
273275
lastTime = time.Now()
276+
p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick)
274277
case <-ticker.C:
275278
curPackets := p.packetCount.Load()
276279
if curPackets != lastPackets {
@@ -306,7 +309,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
306309
}
307310

308311
// Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet.
309-
if since < timeout {
312+
if since+timeout/10 < timeout {
310313
continue
311314
}
312315
p.log.Infow("triggering media timeout",
@@ -325,6 +328,8 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
325328

326329
func (p *MediaPort) Close() {
327330
p.closed.Once(func() {
331+
defer p.stats.Closed.Store(true)
332+
328333
p.mu.Lock()
329334
defer p.mu.Unlock()
330335
if w := p.audioOut.Swap(nil); w != nil {

pkg/sip/media_port_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,202 @@ func checkPCM(t testing.TB, exp, got msdk.PCM16Sample) {
326326
expHit := int(float64(len(expSamples)) * percHit)
327327
require.True(t, hits >= expHit, "min=%v, max=%v\ngot:\n%v", slices.Min(got), slices.Max(got), got)
328328
}
329+
330+
func newMediaPair(t testing.TB, opt1, opt2 *MediaOptions) (m1, m2 *MediaPort) {
331+
if opt1 == nil {
332+
opt1 = &MediaOptions{}
333+
}
334+
if opt2 == nil {
335+
opt2 = &MediaOptions{}
336+
}
337+
c1, c2 := newUDPPipe()
338+
339+
opt1.IP = newIP("1.1.1.1")
340+
opt1.Ports = rtcconfig.PortRange{Start: 10000}
341+
342+
opt2.IP = newIP("2.2.2.2")
343+
opt2.Ports = rtcconfig.PortRange{Start: 20000}
344+
345+
const rate = 16000
346+
347+
log := logger.GetLogger()
348+
349+
var err error
350+
351+
m1, err = NewMediaPortWith(log.WithName("one"), nil, c1, opt1, rate)
352+
require.NoError(t, err)
353+
t.Cleanup(m1.Close)
354+
355+
m2, err = NewMediaPortWith(log.WithName("two"), nil, c2, opt2, rate)
356+
require.NoError(t, err)
357+
t.Cleanup(m2.Close)
358+
359+
offer, err := m1.NewOffer(sdp.EncryptionNone)
360+
require.NoError(t, err)
361+
offerData, err := offer.SDP.Marshal()
362+
require.NoError(t, err)
363+
364+
answer, mc2, err := m2.SetOffer(offerData, sdp.EncryptionNone)
365+
require.NoError(t, err)
366+
answerData, err := answer.SDP.Marshal()
367+
require.NoError(t, err)
368+
369+
mc1, err := m1.SetAnswer(offer, answerData, sdp.EncryptionNone)
370+
require.NoError(t, err)
371+
372+
err = m1.SetConfig(mc1)
373+
require.NoError(t, err)
374+
375+
err = m2.SetConfig(mc2)
376+
require.NoError(t, err)
377+
378+
w2 := m2.GetAudioWriter()
379+
require.Equal(t, "Switch(16000) -> G722(encode) -> RTP(16000)", w2.String())
380+
381+
return m1, m2
382+
}
383+
384+
func TestMediaTimeout(t *testing.T) {
385+
const (
386+
timeout = time.Second / 4
387+
initial = timeout * 2
388+
dt = timeout / 4
389+
)
390+
391+
t.Run("initial", func(t *testing.T) {
392+
m1, _ := newMediaPair(t, &MediaOptions{
393+
MediaTimeoutInitial: initial,
394+
MediaTimeout: timeout,
395+
}, nil)
396+
397+
m1.EnableTimeout(true)
398+
399+
targ := time.Now().Add(initial)
400+
select {
401+
case <-m1.Timeout():
402+
t.Fatal("initial timeout ignored")
403+
case <-time.After(initial / 2):
404+
}
405+
406+
select {
407+
case <-time.After(time.Until(targ) + dt):
408+
t.Fatal("timeout didn't trigger")
409+
case <-m1.Timeout():
410+
}
411+
})
412+
413+
t.Run("regular", func(t *testing.T) {
414+
m1, m2 := newMediaPair(t, &MediaOptions{
415+
MediaTimeoutInitial: initial,
416+
MediaTimeout: timeout,
417+
}, nil)
418+
m1.EnableTimeout(true)
419+
420+
w2 := m2.GetAudioWriter()
421+
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
422+
require.NoError(t, err)
423+
424+
select {
425+
case <-time.After(dt):
426+
t.Fatal("no media received")
427+
case <-m1.Received():
428+
}
429+
430+
select {
431+
case <-time.After(2*timeout + dt):
432+
t.Fatal("timeout didn't trigger")
433+
case <-m1.Timeout():
434+
}
435+
})
436+
437+
t.Run("no timeout", func(t *testing.T) {
438+
m1, m2 := newMediaPair(t, &MediaOptions{
439+
MediaTimeoutInitial: initial,
440+
MediaTimeout: timeout,
441+
}, nil)
442+
m1.EnableTimeout(true)
443+
444+
w2 := m2.GetAudioWriter()
445+
446+
for i := 0; i < 10; i++ {
447+
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
448+
require.NoError(t, err)
449+
450+
select {
451+
case <-time.After(timeout / 2):
452+
case <-m1.Timeout():
453+
t.Fatal("timeout")
454+
}
455+
}
456+
})
457+
458+
t.Run("reset timeout", func(t *testing.T) {
459+
m1, m2 := newMediaPair(t, &MediaOptions{
460+
MediaTimeoutInitial: initial,
461+
MediaTimeout: timeout,
462+
}, nil)
463+
m1.EnableTimeout(true)
464+
465+
w2 := m2.GetAudioWriter()
466+
467+
for i := 0; i < 5; i++ {
468+
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
469+
require.NoError(t, err)
470+
471+
select {
472+
case <-time.After(timeout / 2):
473+
case <-m1.Timeout():
474+
t.Fatal("timeout")
475+
}
476+
}
477+
478+
m1.SetTimeout(initial, timeout)
479+
480+
targ := time.Now().Add(initial)
481+
select {
482+
case <-m1.Timeout():
483+
t.Fatal("initial timeout ignored")
484+
case <-time.After(initial / 2):
485+
}
486+
487+
select {
488+
case <-time.After(time.Until(targ) + dt):
489+
t.Fatal("timeout didn't trigger")
490+
case <-m1.Timeout():
491+
}
492+
})
493+
494+
t.Run("reset", func(t *testing.T) {
495+
m1, m2 := newMediaPair(t, &MediaOptions{
496+
MediaTimeoutInitial: initial,
497+
MediaTimeout: timeout,
498+
}, nil)
499+
m1.EnableTimeout(true)
500+
501+
w2 := m2.GetAudioWriter()
502+
503+
for i := 0; i < 5; i++ {
504+
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
505+
require.NoError(t, err)
506+
507+
select {
508+
case <-time.After(timeout / 2):
509+
case <-m1.Timeout():
510+
t.Fatal("timeout")
511+
}
512+
}
513+
514+
m1.SetTimeout(initial, timeout)
515+
516+
for i := 0; i < 5; i++ {
517+
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
518+
require.NoError(t, err)
519+
520+
select {
521+
case <-time.After(timeout / 2):
522+
case <-m1.Timeout():
523+
t.Fatal("timeout")
524+
}
525+
}
526+
})
527+
}

pkg/sip/outbound.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ func (c *outboundCall) WaitClose(ctx context.Context) error {
217217
case <-ticker.C:
218218
c.log.Debugw("sending keep-alive")
219219
c.state.ForceFlush(ctx)
220+
c.printStats()
220221
case <-c.Disconnected():
221222
c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED)
222223
return nil
@@ -269,8 +270,15 @@ func (c *outboundCall) closeWithTimeout() {
269270
c.close(psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), callDropped, "media-timeout", livekit.DisconnectReason_UNKNOWN_REASON)
270271
}
271272

273+
func (c *outboundCall) printStats() {
274+
c.log.Infow("call statistics", "stats", c.stats.Load())
275+
}
276+
272277
func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) {
273278
c.stopped.Once(func() {
279+
c.stats.Closed.Store(true)
280+
defer c.printStats()
281+
274282
c.setStatus(status)
275283
if err != nil {
276284
c.log.Warnw("Closing outbound call with error", nil, "reason", description)
@@ -292,8 +300,6 @@ func (c *outboundCall) close(err error, status CallStatus, description string, r
292300

293301
c.stopSIP(description)
294302

295-
c.log.Infow("call statistics", "stats", c.stats.Load())
296-
297303
c.c.cmu.Lock()
298304
delete(c.c.activeCalls, c.cc.ID())
299305
if tag := c.cc.Tag(); tag != "" {

0 commit comments

Comments
 (0)