Skip to content

Commit 14fff6a

Browse files
committed
Add initial implementation of Hold/Resume call
1 parent eab98b2 commit 14fff6a

File tree

7 files changed

+542
-0
lines changed

7 files changed

+542
-0
lines changed

pkg/service/service.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,34 @@ func (s *Service) DeregisterTransferSIPParticipantTopic(sipCallId string) {
254254
}
255255
}
256256

257+
func (s *Service) RegisterHoldSIPParticipantTopic(sipCallId string) error {
258+
if s.rpcSIPServer != nil {
259+
return s.rpcSIPServer.RegisterHoldSIPParticipantTopic(sipCallId)
260+
}
261+
262+
return psrpc.NewErrorf(psrpc.Internal, "RPC server not started")
263+
}
264+
265+
func (s *Service) DeregisterHoldSIPParticipantTopic(sipCallId string) {
266+
if s.rpcSIPServer != nil {
267+
s.rpcSIPServer.DeregisterHoldSIPParticipantTopic(sipCallId)
268+
}
269+
}
270+
271+
func (s *Service) RegisterUnholdSIPParticipantTopic(sipCallId string) error {
272+
if s.rpcSIPServer != nil {
273+
return s.rpcSIPServer.RegisterUnholdSIPParticipantTopic(sipCallId)
274+
}
275+
276+
return psrpc.NewErrorf(psrpc.Internal, "RPC server not started")
277+
}
278+
279+
func (s *Service) DeregisterUnholdSIPParticipantTopic(sipCallId string) {
280+
if s.rpcSIPServer != nil {
281+
s.rpcSIPServer.DeregisterUnholdSIPParticipantTopic(sipCallId)
282+
}
283+
}
284+
257285
func (s *Service) OnSessionEnd(ctx context.Context, callIdentifier *sip.CallIdentifier, callInfo *livekit.SIPCallInfo, reason string) {
258286
s.log.Infow("SIP call ended", "callID", callInfo.CallId, "reason", reason)
259287
}

pkg/sip/client.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,3 +327,19 @@ func (c *Client) RegisterTransferSIPParticipant(sipCallID string, o *outboundCal
327327
func (c *Client) DeregisterTransferSIPParticipant(sipCallID string) {
328328
c.handler.DeregisterTransferSIPParticipantTopic(sipCallID)
329329
}
330+
331+
func (c *Client) RegisterHoldSIPParticipant(sipCallID string, o *outboundCall) error {
332+
return c.handler.RegisterHoldSIPParticipantTopic(sipCallID)
333+
}
334+
335+
func (c *Client) DeregisterHoldSIPParticipant(sipCallID string) {
336+
c.handler.DeregisterHoldSIPParticipantTopic(sipCallID)
337+
}
338+
339+
func (c *Client) RegisterUnholdSIPParticipant(sipCallID string, o *outboundCall) error {
340+
return c.handler.RegisterUnholdSIPParticipantTopic(sipCallID)
341+
}
342+
343+
func (c *Client) DeregisterUnholdSIPParticipant(sipCallID string) {
344+
c.handler.DeregisterUnholdSIPParticipantTopic(sipCallID)
345+
}

pkg/sip/inbound.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package sip
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"crypto/sha256"
2021
"encoding/hex"
@@ -912,6 +913,8 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
912913
c.s.cmu.Unlock()
913914

914915
c.s.DeregisterTransferSIPParticipant(c.cc.ID())
916+
c.s.DeregisterHoldSIPParticipant(c.cc.ID())
917+
c.s.DeregisterUnholdSIPParticipant(c.cc.ID())
915918

916919
// Call the handler asynchronously to avoid blocking
917920
if c.s.handler != nil {
@@ -996,6 +999,16 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo
996999
return err
9971000
}
9981001

1002+
err = c.s.RegisterHoldSIPParticipant(LocalTag(c.cc.ID()), c)
1003+
if err != nil {
1004+
return err
1005+
}
1006+
1007+
err = c.s.RegisterUnholdSIPParticipant(LocalTag(c.cc.ID()), c)
1008+
if err != nil {
1009+
return err
1010+
}
1011+
9991012
err = c.lkRoom.Connect(c.s.conf, rconf)
10001013
if err != nil {
10011014
return err
@@ -1113,6 +1126,48 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade
11131126

11141127
}
11151128

1129+
func (c *inboundCall) holdCall(ctx context.Context) error {
1130+
c.log.Infow("holding inbound call")
1131+
1132+
// Disable media timeout during hold to prevent call termination
1133+
if c.media != nil {
1134+
c.media.EnableTimeout(false)
1135+
c.log.Infow("media timeout disabled for hold")
1136+
}
1137+
1138+
err := c.cc.holdCall(ctx)
1139+
if err != nil {
1140+
c.log.Infow("inbound call failed to hold", "error", err)
1141+
// Re-enable timeout if hold failed
1142+
if c.media != nil {
1143+
c.media.EnableTimeout(true)
1144+
}
1145+
return err
1146+
}
1147+
1148+
c.log.Infow("inbound call held")
1149+
return nil
1150+
}
1151+
1152+
func (c *inboundCall) unholdCall(ctx context.Context) error {
1153+
c.log.Infow("unholding inbound call")
1154+
1155+
err := c.cc.unholdCall(ctx)
1156+
if err != nil {
1157+
c.log.Infow("inbound call failed to unhold", "error", err)
1158+
return err
1159+
}
1160+
1161+
// Re-enable media timeout after unhold
1162+
if c.media != nil {
1163+
c.media.EnableTimeout(true)
1164+
c.log.Infow("media timeout re-enabled after unhold")
1165+
}
1166+
1167+
c.log.Infow("inbound call unheld")
1168+
return nil
1169+
}
1170+
11161171
func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
11171172
c := &sipInbound{
11181173
s: s,
@@ -1575,3 +1630,147 @@ func (c *sipInbound) CloseWithStatus(code sip.StatusCode, status string) {
15751630
c.drop()
15761631
}
15771632
}
1633+
1634+
func (c *sipInbound) holdCall(ctx context.Context) error {
1635+
c.mu.Lock()
1636+
1637+
if c.invite == nil || c.inviteOk == nil {
1638+
c.mu.Unlock()
1639+
return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't hold non established call")
1640+
}
1641+
1642+
// Create INVITE with SDP modified for hold (a=sendonly)
1643+
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)
1644+
c.setCSeq(req)
1645+
1646+
// Copy headers from original INVITE
1647+
req.AppendHeader(c.invite.From())
1648+
req.AppendHeader(c.invite.To())
1649+
req.AppendHeader(c.invite.CallID())
1650+
req.AppendHeader(c.contact)
1651+
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
1652+
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))
1653+
1654+
// Copy Route headers from original INVITE or Record-Route from response
1655+
if len(c.invite.GetHeaders("Route")) > 0 {
1656+
sip.CopyHeaders("Route", c.invite, req)
1657+
} else {
1658+
hdrs := c.inviteOk.GetHeaders("Record-Route")
1659+
for i := len(hdrs) - 1; i >= 0; i-- {
1660+
rrh, ok := hdrs[i].(*sip.RecordRouteHeader)
1661+
if !ok {
1662+
continue
1663+
}
1664+
1665+
h := rrh.Clone()
1666+
req.AppendHeader(h)
1667+
}
1668+
}
1669+
1670+
// Modify SDP to set direction to sendonly (hold)
1671+
sdpOffer := c.invite.Body()
1672+
if len(sdpOffer) > 0 {
1673+
// Replace a=sendrecv with a=sendonly for hold
1674+
sdpOffer = bytes.ReplaceAll(sdpOffer, []byte("a=sendrecv"), []byte("a=sendonly"))
1675+
req.SetBody(sdpOffer)
1676+
}
1677+
1678+
c.swapSrcDst(req)
1679+
c.mu.Unlock()
1680+
1681+
// Send the INVITE request
1682+
tx, err := c.Transaction(req)
1683+
if err != nil {
1684+
return err
1685+
}
1686+
defer tx.Terminate()
1687+
1688+
resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil)
1689+
if err != nil {
1690+
return err
1691+
}
1692+
1693+
if resp.StatusCode != sip.StatusOK {
1694+
return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)}
1695+
}
1696+
1697+
// Send ACK for the hold INVITE
1698+
ack := sip.NewAckRequest(req, resp, nil)
1699+
if err := c.WriteRequest(ack); err != nil {
1700+
return err
1701+
}
1702+
1703+
return nil
1704+
}
1705+
1706+
func (c *sipInbound) unholdCall(ctx context.Context) error {
1707+
c.mu.Lock()
1708+
1709+
if c.invite == nil || c.inviteOk == nil {
1710+
c.mu.Unlock()
1711+
return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't unhold non established call")
1712+
}
1713+
1714+
// Create INVITE with SDP modified for unhold (a=sendrecv)
1715+
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)
1716+
c.setCSeq(req)
1717+
1718+
// Copy headers from original INVITE
1719+
req.AppendHeader(c.invite.From())
1720+
req.AppendHeader(c.invite.To())
1721+
req.AppendHeader(c.invite.CallID())
1722+
req.AppendHeader(c.contact)
1723+
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
1724+
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))
1725+
1726+
// Copy Route headers from original INVITE or Record-Route from response
1727+
if len(c.invite.GetHeaders("Route")) > 0 {
1728+
sip.CopyHeaders("Route", c.invite, req)
1729+
} else {
1730+
hdrs := c.inviteOk.GetHeaders("Record-Route")
1731+
for i := len(hdrs) - 1; i >= 0; i-- {
1732+
rrh, ok := hdrs[i].(*sip.RecordRouteHeader)
1733+
if !ok {
1734+
continue
1735+
}
1736+
1737+
h := rrh.Clone()
1738+
req.AppendHeader(h)
1739+
}
1740+
}
1741+
1742+
// Modify SDP to set direction to sendrecv (unhold)
1743+
sdpOffer := c.invite.Body()
1744+
if len(sdpOffer) > 0 {
1745+
// Replace a=sendonly with a=sendrecv for unhold
1746+
sdpOffer = bytes.ReplaceAll(sdpOffer, []byte("a=sendonly"), []byte("a=sendrecv"))
1747+
req.SetBody(sdpOffer)
1748+
}
1749+
1750+
c.swapSrcDst(req)
1751+
c.mu.Unlock()
1752+
1753+
// Send the INVITE request
1754+
tx, err := c.Transaction(req)
1755+
if err != nil {
1756+
return err
1757+
}
1758+
defer tx.Terminate()
1759+
1760+
resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil)
1761+
if err != nil {
1762+
return err
1763+
}
1764+
1765+
if resp.StatusCode != sip.StatusOK {
1766+
return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)}
1767+
}
1768+
1769+
// Send ACK for the unhold INVITE
1770+
ack := sip.NewAckRequest(req, resp, nil)
1771+
if err := c.WriteRequest(ack); err != nil {
1772+
return err
1773+
}
1774+
1775+
return nil
1776+
}

0 commit comments

Comments
 (0)