Skip to content

Commit 776223f

Browse files
dennwcalexlivekit
authored andcommitted
Fix data race on the log pointer.
1 parent 20de22f commit 776223f

File tree

1 file changed

+58
-43
lines changed

1 file changed

+58
-43
lines changed

pkg/sip/inbound.go

Lines changed: 58 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
350350
s.cmu.RUnlock()
351351
if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() {
352352
log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength())
353-
existing.log.Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
353+
existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
354354
cc.AcceptAsKeepAlive()
355355
return nil
356356
}
@@ -479,7 +479,7 @@ func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
479479
if c == nil {
480480
return
481481
}
482-
c.log.Infow("ACK from remote")
482+
c.log().Infow("ACK from remote")
483483
c.cc.AcceptAck(req, tx)
484484
}
485485

@@ -503,10 +503,10 @@ func (s *Server) onBye(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
503503
rawReason = h.Value()
504504
reason, err = ParseReasonHeader(rawReason)
505505
if err != nil {
506-
c.log.Warnw("cannot parse reason header", err, "reason-raw", rawReason)
506+
c.log().Warnw("cannot parse reason header", err, "reason-raw", rawReason)
507507
}
508508
}
509-
c.log.Infow("BYE from remote",
509+
c.log().Infow("BYE from remote",
510510
"reason-type", reason.Type,
511511
"reason-cause", reason.Cause,
512512
"reason-text", reason.Text,
@@ -557,7 +557,7 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
557557
c := s.byRemoteTag[tag]
558558
s.cmu.RUnlock()
559559
if c != nil {
560-
c.log.Infow("NOTIFY")
560+
c.log().Infow("NOTIFY")
561561
err := c.cc.handleNotify(req, tx)
562562

563563
code, msg := sipCodeAndMessageFromError(err)
@@ -579,7 +579,7 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
579579
type inboundCall struct {
580580
s *Server
581581
tid traceid.ID
582-
log logger.Logger
582+
logPtr atomic.Pointer[logger.Logger]
583583
cc *sipInbound
584584
mon *stats.CallMonitor
585585
state *CallState
@@ -617,7 +617,6 @@ func (s *Server) newInboundCall(
617617
extra = HeadersToAttrs(extra, nil, 0, cc, nil)
618618
c := &inboundCall{
619619
s: s,
620-
log: log,
621620
tid: tid,
622621
callStart: callStart,
623622
mon: mon,
@@ -629,9 +628,9 @@ func (s *Server) newInboundCall(
629628
jitterBuf: SelectValueBool(s.conf.EnableJitterBuffer, s.conf.EnableJitterBufferProb),
630629
projectID: "", // Will be set in handleInvite when available
631630
}
631+
c.setLog(log.WithValues("jitterBuf", c.jitterBuf))
632632
// we need it created earlier so that the audio mixer is available for pin prompts
633-
c.lkRoom = NewRoom(log, &c.stats.Room)
634-
c.log = c.log.WithValues("jitterBuf", c.jitterBuf)
633+
c.lkRoom = NewRoom(c.log(), &c.stats.Room)
635634
c.ctx, c.cancel = context.WithCancel(context.Background())
636635
s.cmu.Lock()
637636
s.byRemoteTag[cc.Tag()] = c
@@ -641,6 +640,22 @@ func (s *Server) newInboundCall(
641640
return c
642641
}
643642

643+
func (c *inboundCall) setLog(log logger.Logger) {
644+
c.logPtr.Store(&log)
645+
}
646+
647+
func (c *inboundCall) log() logger.Logger {
648+
ptr := c.logPtr.Load()
649+
if ptr == nil {
650+
return nil
651+
}
652+
return *ptr
653+
}
654+
655+
func (c *inboundCall) appendLogValues(kvs ...any) {
656+
c.setLog(c.log().WithValues(kvs...))
657+
}
658+
644659
func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip.Request, trunkID string, conf *config.Config) error {
645660
c.mon.InviteAccept()
646661
c.mon.CallStart()
@@ -662,14 +677,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
662677
NoPin: false,
663678
})
664679
if disp.ProjectID != "" {
665-
c.log = c.log.WithValues("projectID", disp.ProjectID)
680+
c.appendLogValues("projectID", disp.ProjectID)
666681
c.projectID = disp.ProjectID
667682
}
668683
if disp.TrunkID != "" {
669-
c.log = c.log.WithValues("sipTrunk", disp.TrunkID)
684+
c.appendLogValues("sipTrunk", disp.TrunkID)
670685
}
671686
if disp.DispatchRuleID != "" {
672-
c.log = c.log.WithValues("sipRule", disp.DispatchRuleID)
687+
c.appendLogValues("sipRule", disp.DispatchRuleID)
673688
}
674689

675690
c.state.Update(ctx, func(info *livekit.SIPCallInfo) {
@@ -684,17 +699,17 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
684699
switch disp.Result {
685700
default:
686701
err := fmt.Errorf("unexpected dispatch result: %v", disp.Result)
687-
c.log.Errorw("Rejecting inbound call", err)
702+
c.log().Errorw("Rejecting inbound call", err)
688703
c.cc.RespondAndDrop(sip.StatusNotImplemented, "")
689704
c.close(true, callDropped, "unexpected-result")
690705
return psrpc.NewError(psrpc.Unimplemented, err)
691706
case DispatchNoRuleDrop:
692-
c.log.Debugw("Rejecting inbound flood")
707+
c.log().Debugw("Rejecting inbound flood")
693708
c.cc.Drop()
694709
c.close(false, callFlood, "flood")
695710
return psrpc.NewErrorf(psrpc.PermissionDenied, "call was not authorized by trunk configuration")
696711
case DispatchNoRuleReject:
697-
c.log.Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
712+
c.log().Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
698713
c.cc.RespondAndDrop(sip.StatusNotFound, "Does not match Trunks or Dispatch Rules")
699714
c.close(false, callDropped, "no-dispatch")
700715
return psrpc.NewErrorf(psrpc.NotFound, "no trunk configuration for call")
@@ -717,9 +732,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
717732
isError = false
718733
}
719734
if isError {
720-
c.log.Errorw("Cannot start media", err)
735+
c.log().Errorw("Cannot start media", err)
721736
} else {
722-
c.log.Warnw("Cannot start media", err)
737+
c.log().Warnw("Cannot start media", err)
723738
}
724739
c.cc.RespondAndDrop(sip.StatusInternalServerError, "")
725740
c.close(true, status, reason)
@@ -742,14 +757,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
742757
if r := c.lkRoom.Room(); r != nil {
743758
headers = AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers)
744759
}
745-
c.log.Infow("Accepting the call", "headers", headers)
760+
c.log().Infow("Accepting the call", "headers", headers)
746761
err := c.cc.Accept(ctx, answerData, headers)
747762
if errors.Is(err, errNoACK) {
748-
c.log.Errorw("Call accepted, but no ACK received", err)
763+
c.log().Errorw("Call accepted, but no ACK received", err)
749764
c.closeWithNoACK()
750765
return false, err
751766
} else if err != nil {
752-
c.log.Errorw("Cannot accept the call", err)
767+
c.log().Errorw("Cannot accept the call", err)
753768
c.close(true, callAcceptFailed, "accept-failed")
754769
return false, err
755770
}
@@ -813,13 +828,13 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
813828
}
814829
// Publish our own track.
815830
if err := c.publishTrack(); err != nil {
816-
c.log.Errorw("Cannot publish track", err)
831+
c.log().Errorw("Cannot publish track", err)
817832
c.close(true, callDropped, "publish-failed")
818833
return errors.Wrap(err, "publishing track to room failed")
819834
}
820835
c.lkRoom.Subscribe()
821836
if !pinPrompt {
822-
c.log.Infow("Waiting for track subscription(s)")
837+
c.log().Infow("Waiting for track subscription(s)")
823838
// For dispatches without pin, we first wait for LK participant to become available,
824839
// and also for at least one track subscription. In the meantime we keep ringing.
825840
if ok, err := c.waitSubscribe(ctx, disp.RingingTimeout); !ok {
@@ -849,9 +864,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
849864
for {
850865
select {
851866
case <-ticker.C:
852-
c.log.Debugw("sending keep-alive")
867+
c.log().Debugw("sending keep-alive")
853868
c.state.ForceFlush(ctx)
854-
c.printStats(c.log)
869+
c.printStats(c.log())
855870
case <-ctx.Done():
856871
c.closeWithHangup()
857872
return nil
@@ -863,7 +878,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
863878
return nil
864879
case <-c.media.Timeout():
865880
if noAck {
866-
c.log.Errorw("Media timeout after missing ACK", errNoACK)
881+
c.log().Errorw("Media timeout after missing ACK", errNoACK)
867882
c.closeWithNoACK()
868883
return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK)
869884
}
@@ -874,7 +889,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
874889
ackReceived = nil
875890
case <-ackTimeout:
876891
// Only warn, the other side still thinks the call is active, media may be flowing.
877-
c.log.Warnw("Call accepted, but no ACK received", errNoACK)
892+
c.log().Warnw("Call accepted, but no ACK received", errNoACK)
878893
// We don't need to wait for a full media timeout initially, we already know something is not quite right.
879894
c.media.SetTimeout(min(inviteOkAckLateTimeout, c.s.conf.MediaTimeoutInitial), c.s.conf.MediaTimeout)
880895
noAck = true
@@ -884,14 +899,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
884899

885900
func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
886901
c.mon.SDPSize(len(offerData), true)
887-
c.log.Debugw("SDP offer", "sdp", string(offerData))
902+
c.log().Debugw("SDP offer", "sdp", string(offerData))
888903
e, err := sdpEncryption(enc)
889904
if err != nil {
890-
c.log.Errorw("Cannot parse encryption", err)
905+
c.log().Errorw("Cannot parse encryption", err)
891906
return nil, err
892907
}
893908

894-
mp, err := NewMediaPort(tid, c.log, c.mon, &MediaOptions{
909+
mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{
895910
IP: c.s.sconf.MediaIP,
896911
Ports: conf.RTPPort,
897912
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
@@ -916,7 +931,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit
916931
return nil, err
917932
}
918933
c.mon.SDPSize(len(answerData), false)
919-
c.log.Debugw("SDP answer", "sdp", string(answerData))
934+
c.log().Debugw("SDP answer", "sdp", string(answerData))
920935

921936
mconf.Processor = c.s.handler.GetMediaProcessor(features)
922937
if err = c.media.SetConfig(mconf); err != nil {
@@ -999,7 +1014,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration)
9991014
func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallDispatch, _ bool, _ error) {
10001015
ctx, span := tracer.Start(ctx, "inboundCall.pinPrompt")
10011016
defer span.End()
1002-
c.log.Infow("Requesting Pin for SIP call")
1017+
c.log().Infow("Requesting Pin for SIP call")
10031018
const pinLimit = 16
10041019
c.playAudio(ctx, c.s.res.enterPin)
10051020
pin := ""
@@ -1027,25 +1042,25 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
10271042
// End of the pin
10281043
noPin = pin == ""
10291044

1030-
c.log.Infow("Checking Pin for SIP call", "pin", pin, "noPin", noPin)
1045+
c.log().Infow("Checking Pin for SIP call", "pin", pin, "noPin", noPin)
10311046
disp = c.s.handler.DispatchCall(ctx, &CallInfo{
10321047
TrunkID: trunkID,
10331048
Call: c.call,
10341049
Pin: pin,
10351050
NoPin: noPin,
10361051
})
10371052
if disp.ProjectID != "" {
1038-
c.log = c.log.WithValues("projectID", disp.ProjectID)
1053+
c.appendLogValues("projectID", disp.ProjectID)
10391054
c.projectID = disp.ProjectID
10401055
}
10411056
if disp.TrunkID != "" {
1042-
c.log = c.log.WithValues("sipTrunk", disp.TrunkID)
1057+
c.appendLogValues("sipTrunk", disp.TrunkID)
10431058
}
10441059
if disp.DispatchRuleID != "" {
1045-
c.log = c.log.WithValues("sipRule", disp.DispatchRuleID)
1060+
c.appendLogValues("sipRule", disp.DispatchRuleID)
10461061
}
10471062
if disp.Result != DispatchAccept || disp.Room.RoomName == "" {
1048-
c.log.Infow("Rejecting call", "pin", pin, "noPin", noPin)
1063+
c.log().Infow("Rejecting call", "pin", pin, "noPin", noPin)
10491064
c.playAudio(ctx, c.s.res.wrongPin)
10501065
c.close(false, callDropped, "wrong-pin")
10511066
return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin")
@@ -1075,7 +1090,7 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
10751090
}
10761091
c.stats.Closed.Store(true)
10771092
sipCode, sipStatus := status.SIPStatus()
1078-
log := c.log.WithValues("status", sipCode, "reason", reason)
1093+
log := c.log().WithValues("status", sipCode, "reason", reason)
10791094
defer c.printStats(log)
10801095
c.setStatus(status)
10811096
c.mon.CallTerminate(reason)
@@ -1236,14 +1251,14 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig, status Cal
12361251
c.joinDur()
12371252
}
12381253
c.callDur = c.mon.CallDur()
1239-
c.log = c.log.WithValues(
1254+
c.appendLogValues(
12401255
"room", rconf.RoomName,
12411256
"participant", rconf.Participant.Identity,
12421257
"participantName", rconf.Participant.Name,
12431258
)
1244-
c.log.Infow("Joining room")
1259+
c.log().Infow("Joining room")
12451260
if err := c.createLiveKitParticipant(ctx, rconf, status); err != nil {
1246-
c.log.Errorw("Cannot create LiveKit participant", err)
1261+
c.log().Errorw("Cannot create LiveKit participant", err)
12471262
c.close(true, callDropped, "participant-failed")
12481263
return errors.Wrap(err, "cannot create LiveKit participant")
12491264
}
@@ -1311,18 +1326,18 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade
13111326

13121327
err := tones.Play(rctx, aw, ringVolume, tones.ETSIRinging)
13131328
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
1314-
c.log.Infow("cannot play dial tone", "error", err)
1329+
c.log().Infow("cannot play dial tone", "error", err)
13151330
}
13161331
}()
13171332
}
13181333

13191334
err = c.cc.TransferCall(ctx, transferTo, headers)
13201335
if err != nil {
1321-
c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
1336+
c.log().Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
13221337
return err
13231338
}
13241339

1325-
c.log.Infow("inbound call transferred", "transferTo", transferTo)
1340+
c.log().Infow("inbound call transferred", "transferTo", transferTo)
13261341

13271342
// Give time for the peer to hang up first, but hang up ourselves if this doesn't happen within 1 second
13281343
time.AfterFunc(referByeTimeout, func() { c.Close() })

0 commit comments

Comments
 (0)