Skip to content

Commit 688cced

Browse files
committed
Adding single goroutine to batch-delete invites at intervals
1 parent 1f96454 commit 688cced

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

pkg/sip/inbound.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,24 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo {
137137
return c
138138
}
139139

140+
func (s *Server) cleanupInvites() {
141+
ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes
142+
defer ticker.Stop()
143+
for {
144+
select {
145+
case <-s.closing.Watch():
146+
return
147+
case <-ticker.C:
148+
s.imu.Lock()
149+
for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); {
150+
key := it.Item().Value
151+
delete(s.inProgressInvites, *key)
152+
}
153+
s.imu.Unlock()
154+
}
155+
}
156+
}
157+
140158
func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
141159
key := dialogKey{
142160
sipCallID: sipCallID,
@@ -149,15 +167,13 @@ func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
149167
if ok {
150168
return is
151169
}
152-
is = &inProgressInvite{sipCallID: sipCallID}
170+
is = &inProgressInvite{
171+
sipCallID: sipCallID,
172+
expireAt: time.Now().Add(inviteCredentialValidity),
173+
}
153174
s.inProgressInvites[key] = is
175+
s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key})
154176

155-
go func() {
156-
time.Sleep(inviteCredentialValidity)
157-
s.imu.Lock()
158-
defer s.imu.Unlock()
159-
delete(s.inProgressInvites, key)
160-
}()
161177
return is
162178
}
163179

@@ -1403,7 +1419,6 @@ func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request,
14031419
c := &sipInbound{
14041420
log: log,
14051421
s: s,
1406-
id: "unassigned",
14071422
invite: invite,
14081423
inviteTx: inviteTx,
14091424
legTr: legTransportFromReq(invite),

pkg/sip/server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net"
2525
"net/netip"
2626
"sync"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/frostbyte73/core"
@@ -35,6 +36,7 @@ import (
3536
"github.com/livekit/protocol/livekit"
3637
"github.com/livekit/protocol/logger"
3738
"github.com/livekit/protocol/rpc"
39+
"github.com/livekit/protocol/utils"
3840
"github.com/livekit/protocol/utils/traceid"
3941
"github.com/livekit/sipgo"
4042
"github.com/livekit/sipgo/sip"
@@ -141,8 +143,10 @@ type Server struct {
141143
sipListeners []io.Closer
142144
sipUnhandled RequestHandler
143145

144-
imu sync.Mutex
145-
inProgressInvites map[dialogKey]*inProgressInvite
146+
imu sync.Mutex
147+
inProgressInvites map[dialogKey]*inProgressInvite
148+
inviteTimeoutQueue utils.TimeoutQueue[*dialogKey]
149+
isCleanupTaskRunning atomic.Bool
146150

147151
closing core.Fuse
148152
cmu sync.RWMutex
@@ -166,6 +170,7 @@ type inProgressInvite struct {
166170
sipCallID string
167171
challenge digest.Challenge
168172
lkCallID string // SCL_* LiveKit call ID assigned to this dialog
173+
expireAt time.Time
169174
}
170175

171176
func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Server {
@@ -322,6 +327,9 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
322327
}
323328
}
324329

330+
// Start the cleanup task
331+
go s.cleanupInvites()
332+
325333
return nil
326334
}
327335

0 commit comments

Comments
 (0)