Skip to content

Commit 3f94076

Browse files
committed
Fix data race on the log pointer.
1 parent b266d7c commit 3f94076

File tree

1 file changed

+58
-43
lines changed

1 file changed

+58
-43
lines changed

pkg/sip/inbound.go

Lines changed: 58 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
348348
s.cmu.RUnlock()
349349
if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() {
350350
log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength())
351-
existing.log.Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
351+
existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
352352
cc.AcceptAsKeepAlive()
353353
return nil
354354
}
@@ -477,7 +477,7 @@ func (s *Server) onAck(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
477477
if c == nil {
478478
return
479479
}
480-
c.log.Infow("ACK from remote")
480+
c.log().Infow("ACK from remote")
481481
c.cc.AcceptAck(req, tx)
482482
}
483483

@@ -501,10 +501,10 @@ func (s *Server) onBye(log *slog.Logger, req *sip.Request, tx sip.ServerTransact
501501
rawReason = h.Value()
502502
reason, err = ParseReasonHeader(rawReason)
503503
if err != nil {
504-
c.log.Warnw("cannot parse reason header", err, "reason-raw", rawReason)
504+
c.log().Warnw("cannot parse reason header", err, "reason-raw", rawReason)
505505
}
506506
}
507-
c.log.Infow("BYE from remote",
507+
c.log().Infow("BYE from remote",
508508
"reason-type", reason.Type,
509509
"reason-cause", reason.Cause,
510510
"reason-text", reason.Text,
@@ -555,7 +555,7 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
555555
c := s.byRemoteTag[tag]
556556
s.cmu.RUnlock()
557557
if c != nil {
558-
c.log.Infow("NOTIFY")
558+
c.log().Infow("NOTIFY")
559559
err := c.cc.handleNotify(req, tx)
560560

561561
code, msg := sipCodeAndMessageFromError(err)
@@ -576,7 +576,7 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans
576576

577577
type inboundCall struct {
578578
s *Server
579-
log logger.Logger
579+
logPtr atomic.Pointer[logger.Logger]
580580
cc *sipInbound
581581
mon *stats.CallMonitor
582582
state *CallState
@@ -611,7 +611,6 @@ func (s *Server) newInboundCall(
611611
extra = HeadersToAttrs(extra, nil, 0, cc, nil)
612612
c := &inboundCall{
613613
s: s,
614-
log: log,
615614
mon: mon,
616615
cc: cc,
617616
call: call,
@@ -621,9 +620,9 @@ func (s *Server) newInboundCall(
621620
jitterBuf: SelectValueBool(s.conf.EnableJitterBuffer, s.conf.EnableJitterBufferProb),
622621
projectID: "", // Will be set in handleInvite when available
623622
}
623+
c.setLog(log.WithValues("jitterBuf", c.jitterBuf))
624624
// we need it created earlier so that the audio mixer is available for pin prompts
625-
c.lkRoom = NewRoom(log, &c.stats.Room)
626-
c.log = c.log.WithValues("jitterBuf", c.jitterBuf)
625+
c.lkRoom = NewRoom(c.log(), &c.stats.Room)
627626
c.ctx, c.cancel = context.WithCancel(context.Background())
628627
s.cmu.Lock()
629628
s.byRemoteTag[cc.Tag()] = c
@@ -633,6 +632,22 @@ func (s *Server) newInboundCall(
633632
return c
634633
}
635634

635+
func (c *inboundCall) setLog(log logger.Logger) {
636+
c.logPtr.Store(&log)
637+
}
638+
639+
func (c *inboundCall) log() logger.Logger {
640+
ptr := c.logPtr.Load()
641+
if ptr == nil {
642+
return nil
643+
}
644+
return *ptr
645+
}
646+
647+
func (c *inboundCall) appendLogValues(kvs ...any) {
648+
c.setLog(c.log().WithValues(kvs...))
649+
}
650+
636651
func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkID string, conf *config.Config) error {
637652
c.mon.InviteAccept()
638653
c.mon.CallStart()
@@ -654,14 +669,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
654669
NoPin: false,
655670
})
656671
if disp.ProjectID != "" {
657-
c.log = c.log.WithValues("projectID", disp.ProjectID)
672+
c.appendLogValues("projectID", disp.ProjectID)
658673
c.projectID = disp.ProjectID
659674
}
660675
if disp.TrunkID != "" {
661-
c.log = c.log.WithValues("sipTrunk", disp.TrunkID)
676+
c.appendLogValues("sipTrunk", disp.TrunkID)
662677
}
663678
if disp.DispatchRuleID != "" {
664-
c.log = c.log.WithValues("sipRule", disp.DispatchRuleID)
679+
c.appendLogValues("sipRule", disp.DispatchRuleID)
665680
}
666681

667682
c.state.Update(ctx, func(info *livekit.SIPCallInfo) {
@@ -676,17 +691,17 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
676691
switch disp.Result {
677692
default:
678693
err := fmt.Errorf("unexpected dispatch result: %v", disp.Result)
679-
c.log.Errorw("Rejecting inbound call", err)
694+
c.log().Errorw("Rejecting inbound call", err)
680695
c.cc.RespondAndDrop(sip.StatusNotImplemented, "")
681696
c.close(true, callDropped, "unexpected-result")
682697
return psrpc.NewError(psrpc.Unimplemented, err)
683698
case DispatchNoRuleDrop:
684-
c.log.Debugw("Rejecting inbound flood")
699+
c.log().Debugw("Rejecting inbound flood")
685700
c.cc.Drop()
686701
c.close(false, callFlood, "flood")
687702
return psrpc.NewErrorf(psrpc.PermissionDenied, "call was not authorized by trunk configuration")
688703
case DispatchNoRuleReject:
689-
c.log.Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
704+
c.log().Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
690705
c.cc.RespondAndDrop(sip.StatusNotFound, "Does not match Trunks or Dispatch Rules")
691706
c.close(false, callDropped, "no-dispatch")
692707
return psrpc.NewErrorf(psrpc.NotFound, "no trunk configuration for call")
@@ -709,9 +724,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
709724
isError = false
710725
}
711726
if isError {
712-
c.log.Errorw("Cannot start media", err)
727+
c.log().Errorw("Cannot start media", err)
713728
} else {
714-
c.log.Warnw("Cannot start media", err)
729+
c.log().Warnw("Cannot start media", err)
715730
}
716731
c.cc.RespondAndDrop(sip.StatusInternalServerError, "")
717732
c.close(true, status, reason)
@@ -734,14 +749,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
734749
if r := c.lkRoom.Room(); r != nil {
735750
headers = AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers)
736751
}
737-
c.log.Infow("Accepting the call", "headers", headers)
752+
c.log().Infow("Accepting the call", "headers", headers)
738753
err := c.cc.Accept(ctx, answerData, headers)
739754
if errors.Is(err, errNoACK) {
740-
c.log.Errorw("Call accepted, but no ACK received", err)
755+
c.log().Errorw("Call accepted, but no ACK received", err)
741756
c.closeWithNoACK()
742757
return false, err
743758
} else if err != nil {
744-
c.log.Errorw("Cannot accept the call", err)
759+
c.log().Errorw("Cannot accept the call", err)
745760
c.close(true, callAcceptFailed, "accept-failed")
746761
return false, err
747762
}
@@ -805,13 +820,13 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
805820
}
806821
// Publish our own track.
807822
if err := c.publishTrack(); err != nil {
808-
c.log.Errorw("Cannot publish track", err)
823+
c.log().Errorw("Cannot publish track", err)
809824
c.close(true, callDropped, "publish-failed")
810825
return errors.Wrap(err, "publishing track to room failed")
811826
}
812827
c.lkRoom.Subscribe()
813828
if !pinPrompt {
814-
c.log.Infow("Waiting for track subscription(s)")
829+
c.log().Infow("Waiting for track subscription(s)")
815830
// For dispatches without pin, we first wait for LK participant to become available,
816831
// and also for at least one track subscription. In the meantime we keep ringing.
817832
if ok, err := c.waitSubscribe(ctx, disp.RingingTimeout); !ok {
@@ -841,9 +856,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
841856
for {
842857
select {
843858
case <-ticker.C:
844-
c.log.Debugw("sending keep-alive")
859+
c.log().Debugw("sending keep-alive")
845860
c.state.ForceFlush(ctx)
846-
c.printStats(c.log)
861+
c.printStats(c.log())
847862
case <-ctx.Done():
848863
c.closeWithHangup()
849864
return nil
@@ -855,7 +870,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
855870
return nil
856871
case <-c.media.Timeout():
857872
if noAck {
858-
c.log.Errorw("Media timeout after missing ACK", errNoACK)
873+
c.log().Errorw("Media timeout after missing ACK", errNoACK)
859874
c.closeWithNoACK()
860875
return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK)
861876
}
@@ -866,7 +881,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
866881
ackReceived = nil
867882
case <-ackTimeout:
868883
// Only warn, the other side still thinks the call is active, media may be flowing.
869-
c.log.Warnw("Call accepted, but no ACK received", errNoACK)
884+
c.log().Warnw("Call accepted, but no ACK received", errNoACK)
870885
// We don't need to wait for a full media timeout initially, we already know something is not quite right.
871886
c.media.SetTimeout(min(inviteOkAckLateTimeout, c.s.conf.MediaTimeoutInitial), c.s.conf.MediaTimeout)
872887
noAck = true
@@ -876,14 +891,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
876891

877892
func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
878893
c.mon.SDPSize(len(offerData), true)
879-
c.log.Debugw("SDP offer", "sdp", string(offerData))
894+
c.log().Debugw("SDP offer", "sdp", string(offerData))
880895
e, err := sdpEncryption(enc)
881896
if err != nil {
882-
c.log.Errorw("Cannot parse encryption", err)
897+
c.log().Errorw("Cannot parse encryption", err)
883898
return nil, err
884899
}
885900

886-
mp, err := NewMediaPort(c.log, c.mon, &MediaOptions{
901+
mp, err := NewMediaPort(c.log(), c.mon, &MediaOptions{
887902
IP: c.s.sconf.MediaIP,
888903
Ports: conf.RTPPort,
889904
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
@@ -908,7 +923,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
908923
return nil, err
909924
}
910925
c.mon.SDPSize(len(answerData), false)
911-
c.log.Debugw("SDP answer", "sdp", string(answerData))
926+
c.log().Debugw("SDP answer", "sdp", string(answerData))
912927

913928
mconf.Processor = c.s.handler.GetMediaProcessor(features)
914929
if err = c.media.SetConfig(mconf); err != nil {
@@ -991,7 +1006,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration)
9911006
func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallDispatch, _ bool, _ error) {
9921007
ctx, span := tracer.Start(ctx, "inboundCall.pinPrompt")
9931008
defer span.End()
994-
c.log.Infow("Requesting Pin for SIP call")
1009+
c.log().Infow("Requesting Pin for SIP call")
9951010
const pinLimit = 16
9961011
c.playAudio(ctx, c.s.res.enterPin)
9971012
pin := ""
@@ -1019,25 +1034,25 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
10191034
// End of the pin
10201035
noPin = pin == ""
10211036

1022-
c.log.Infow("Checking Pin for SIP call", "pin", pin, "noPin", noPin)
1037+
c.log().Infow("Checking Pin for SIP call", "pin", pin, "noPin", noPin)
10231038
disp = c.s.handler.DispatchCall(ctx, &CallInfo{
10241039
TrunkID: trunkID,
10251040
Call: c.call,
10261041
Pin: pin,
10271042
NoPin: noPin,
10281043
})
10291044
if disp.ProjectID != "" {
1030-
c.log = c.log.WithValues("projectID", disp.ProjectID)
1045+
c.appendLogValues("projectID", disp.ProjectID)
10311046
c.projectID = disp.ProjectID
10321047
}
10331048
if disp.TrunkID != "" {
1034-
c.log = c.log.WithValues("sipTrunk", disp.TrunkID)
1049+
c.appendLogValues("sipTrunk", disp.TrunkID)
10351050
}
10361051
if disp.DispatchRuleID != "" {
1037-
c.log = c.log.WithValues("sipRule", disp.DispatchRuleID)
1052+
c.appendLogValues("sipRule", disp.DispatchRuleID)
10381053
}
10391054
if disp.Result != DispatchAccept || disp.Room.RoomName == "" {
1040-
c.log.Infow("Rejecting call", "pin", pin, "noPin", noPin)
1055+
c.log().Infow("Rejecting call", "pin", pin, "noPin", noPin)
10411056
c.playAudio(ctx, c.s.res.wrongPin)
10421057
c.close(false, callDropped, "wrong-pin")
10431058
return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin")
@@ -1067,7 +1082,7 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
10671082
}
10681083
c.stats.Closed.Store(true)
10691084
sipCode, sipStatus := status.SIPStatus()
1070-
log := c.log.WithValues("status", sipCode, "reason", reason)
1085+
log := c.log().WithValues("status", sipCode, "reason", reason)
10711086
defer c.printStats(log)
10721087
c.setStatus(status)
10731088
c.mon.CallTerminate(reason)
@@ -1226,14 +1241,14 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig, status Cal
12261241
c.joinDur()
12271242
}
12281243
c.callDur = c.mon.CallDur()
1229-
c.log = c.log.WithValues(
1244+
c.appendLogValues(
12301245
"room", rconf.RoomName,
12311246
"participant", rconf.Participant.Identity,
12321247
"participantName", rconf.Participant.Name,
12331248
)
1234-
c.log.Infow("Joining room")
1249+
c.log().Infow("Joining room")
12351250
if err := c.createLiveKitParticipant(ctx, rconf, status); err != nil {
1236-
c.log.Errorw("Cannot create LiveKit participant", err)
1251+
c.log().Errorw("Cannot create LiveKit participant", err)
12371252
c.close(true, callDropped, "participant-failed")
12381253
return errors.Wrap(err, "cannot create LiveKit participant")
12391254
}
@@ -1301,18 +1316,18 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade
13011316

13021317
err := tones.Play(rctx, aw, ringVolume, tones.ETSIRinging)
13031318
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
1304-
c.log.Infow("cannot play dial tone", "error", err)
1319+
c.log().Infow("cannot play dial tone", "error", err)
13051320
}
13061321
}()
13071322
}
13081323

13091324
err = c.cc.TransferCall(ctx, transferTo, headers)
13101325
if err != nil {
1311-
c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
1326+
c.log().Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
13121327
return err
13131328
}
13141329

1315-
c.log.Infow("inbound call transferred", "transferTo", transferTo)
1330+
c.log().Infow("inbound call transferred", "transferTo", transferTo)
13161331

13171332
// Give time for the peer to hang up first, but hang up ourselves if this doesn't happen within 1 second
13181333
time.AfterFunc(referByeTimeout, func() { c.Close() })

0 commit comments

Comments
 (0)