diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5525cfd --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +config.yaml +dist/ +*.pprof +.vscode/ diff --git a/README.md b/README.md index c881d11..c4c57c3 100644 --- a/README.md +++ b/README.md @@ -2,177 +2,44 @@ ## Why -`SFU-to-SFU` is an example of a cascaded decentralised SFU. The intention is to be a implementation of Matrix's [MSC3401: Native Group VoIP signalling](https://github.com/matrix-org/matrix-spec-proposals/blob/matthew/group-voip/proposals/3401-group-voip.md). -This example is self contained and doesn't require any external software. The project was informed by the following goals. - -* **Easy Scaling** - SFU count can be grown/shrunk as users arrive. We don't scale on the dimension of calls making things easier. -* **Shorter Last Mile** - Users can connect to SFUs closest to them. Links `SFU <-> SFU` are higher quality then public hops. -* **Flexibility in WebRTC server choice** - All communication takes place using standard protocols/formats. You can use whatever server software best fits your needs. -* **Client Simplicity** - Clients will need to be created on lots of platforms. We should aim to use native WebRTC features as much as possible. - -The SFUs themselves have no concept of conference calls/rooms etc... All of this is communicated in the Matrix room. The SFUs themselves just operate off of -pub/sub semantics. The pub/sub streams are keyed by `foci`, `call_id`, `device_id` and `purpose` these keys come from [MSC3401](https://github.com/matrix-org/matrix-spec-proposals/blob/matthew/group-voip/proposals/3401-group-voip.md). - -Lets say you have a Matrix room where user `Alice` wishes to publish a screenshare to `Bob` and `Charlie`. - -``` -* `Alice` establishes a session with a SFU -* `Alice` publishes a screenshare feed with `call_id`, `device_id` and `purpose` -* `Alice` publishes to the matrix room with the values `foci`, `call_id`, `device_id` and `purpose` - -# Connecting directly to publishers FOCI -* `Bob` connects directly to `foci` and establishes a session. -* `Bob` requests a stream with values `foci`, `call_id`, `device_id` and `purpose`. - -# Connect to FOCI through different SFU -* `Charlie` connects to a SFU they run on a remote host. -* `Charlie` requests a stream with values `foci`, `call_id`, `device_id` and `purpose`. -* `Charlie`'s SFU connects to `foci` and requests the stream. -* `Alice`'s stream arrives to Charlie via `Alice -> FOCI -> Charlie's SFU -> Charlie` -``` +`SFU-to-SFU` is an example of a cascaded decentralised SFU. The intention is to +be a implementation of Matrix's [MSC3401: Native Group VoIP +signalling](https://github.com/matrix-org/matrix-spec-proposals/blob/matthew/group-voip/proposals/3401-group-voip.md). +This example is self contained and doesn't require any external software. The +project was informed by the following goals. + +* **Easy Scaling** - SFU count can be grown/shrunk as users arrive. We don't + scale on the dimension of calls making things easier. +* **Shorter Last Mile** - Users can connect to SFUs closest to them. Links `SFU + <-> SFU` are higher quality then public hops. +* **Flexibility in WebRTC server choice** - All communication takes place using + standard protocols/formats. You can use whatever server software best fits + your needs. +* **Client Simplicity** - Clients will need to be created on lots of platforms. + We should aim to use native WebRTC features as much as possible. + +This implements the MSC only roughly - given the current experimental nature of +this projects, it deviates in certain areas from the MSC. ## How -### Establishing a session -Client sends a POST with a WebRTC Offer that is datachannel only. Server responds with Answer. - -Server will open a datachannel called `signaling`. Clients can send publish/subscribe now. - -`POST /createSession` - -`Request` -``` -o=- 6685856480478485828 2 IN IP4 127.0.0.1 -s=- -t=0 0 -a=group:BUNDLE 0 -a=extmap-allow-mixed -a=msid-semantic: WMS -m=application 9 UDP/DTLS/SCTP webrtc-datachannel -c=IN IP4 0.0.0.0 -a=ice-ufrag:gLSF -a=ice-pwd:xuxSHK0uJuSb607uYunnzlCQ -a=ice-options:trickle -a=fingerprint:sha-256 C2:1F:9B:A1:C2:DF:7E:13:E4:F9:64:F5:EC:4D:17:A1:89:21:0E:32:61:2A:B7:A5:A7:2A:7C:06:AC:FB:B2:A1 -a=setup:actpass -a=mid:0 -a=sctp-port:5000 -a=max-message-size:262144 -``` - -`Response` -``` -o=- 1712750552704711910 2 IN IP4 127.0.0.1 -s=- -t=0 0 -a=group:BUNDLE 0 -a=extmap-allow-mixed -a=msid-semantic: WMS -m=application 9 UDP/DTLS/SCTP webrtc-datachannel -c=IN IP4 0.0.0.0 -a=ice-ufrag:90cu -a=ice-pwd:PARVC6h9kLvvgCqxSocjrXYZ -a=ice-options:trickle -a=fingerprint:sha-256 7F:79:0F:50:FF:D1:3F:DF:CA:BD:06:89:2B:C8:05:2E:EC:7D:EF:66:AF:A8:6E:D8:70:C6:74:68:E6:5C:47:D7 -a=setup:active -a=mid:0 -a=sctp-port:5000 -a=max-message-size:262144 -``` - -### Publish a Stream -A user can start publish a stream by making a JSON request to publish with a new Offer. With the following keys. - -* `event` - Must be `publish` -* `id` - Unique ID for this message. Allows server to respond with with errors -* Stream Identification - `call_id`, `device_id`, `purpose` -* `sdp` - Offer frome the Peer. Any new additional tracks will belong to the stream. - -``` -{ - event: 'publish', - id: `ABC`, - call_id: 'AAA', - device_id: 'BBB', - purpose: 'DDD', - sdp: `...`, -} -``` - -** Errors ** -* Stream already exists -* Server over capacity - -The server will respond to the `subscribe` with the answer. - -``` -{ - event: 'publish', - id: `ABC`, - call_id: 'AAA', - device_id: 'BBB', - purpose: 'DDD', - sdp: `...`, -} -``` - - -### Subscribe to a Stream -A user can subscribe to a stream by making a JSON request to subscribe with a new Offer. With the following keys. - -* `event` - Must be `subscribe` -* `id` - Unique ID for this message. Allows server to respond with with errors -* Stream Identification - `call_id`, `device_id`, `purpose` - -``` -{ - event: 'subscribe', - id: `ABC`, - call_id: 'AAA', - device_id: 'BBB', - purpose: 'DDD', - sdp: `...`, -} -``` - -The client will respond to the `subscribe` with the answer. -``` -{ - event: 'subscribe', - id: `ABC`, - sdp: `...`, -} -``` +### Configuration -** Errors ** -* Stream doesn'texist -* Server over capacity +* `cp config.yaml.sample config.yaml` +* Fill in `config.yaml` -### Unpublish a Stream -``` -{ - event: 'unpublish', - id: `ABC`, - call_id: 'AAA', - device_id: 'BBB', - purpose: 'DDD', -} -``` +### Running -### Unsubscribe to a Stream +* `./scripts/run.sh` +* Access at -``` -{ - event: 'unsubscribe', - id: `ABC`, - call_id: 'AAA', - device_id: 'BBB', - purpose: 'DDD', -} -``` +### Profiling -## Running +* `./scripts/profile.sh` +* Access at -* `go run *.go` -* Access at http://localhost:8080 +### Building +* `./scripts/build.sh` +* `./dist/bin` +* Access at diff --git a/cascade.go b/cascade.go deleted file mode 100644 index 461ad6e..0000000 --- a/cascade.go +++ /dev/null @@ -1,128 +0,0 @@ -package main - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "net/http" - "strings" - - "github.com/pion/webrtc/v3" -) - -// Given a FOCI + CallID + DeviceID + Purpose establish a session and Subscribe. Take -// the media from the remote and copy it to a `webrtc.TrackLocal` so we can re-send -func remoteStreamLookup(msg dataChannelMessage) (webrtc.TrackLocal, webrtc.TrackLocal) { - audioTrack, videoTrack := make(chan webrtc.TrackLocal, 1), make(chan webrtc.TrackLocal, 1) - - peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) - if err != nil { - panic(err) - } - - peerConnection.OnTrack(func(trackRemote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { - trackLocal, err := webrtc.NewTrackLocalStaticRTP(trackRemote.Codec().RTPCodecCapability, trackRemote.ID(), trackRemote.StreamID()) - if err != nil { - panic(err) - } - - if strings.Contains(trackRemote.Codec().MimeType, "video") { - videoTrack <- trackLocal - } else { - audioTrack <- trackLocal - } - - copyRemoteToLocal(trackRemote, trackLocal) - }) - - dataChannel, err := peerConnection.CreateDataChannel("signaling", nil) - if err != nil { - panic(err) - } - - dataChannel.OnOpen(func() { - if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { - panic(err) - } - - if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { - panic(err) - } - - offer, err := peerConnection.CreateOffer(nil) - if err != nil { - panic(err) - } - - if err := peerConnection.SetLocalDescription(offer); err != nil { - panic(err) - } - - msg.SDP = offer.SDP - marshaled, err := json.Marshal(msg) - if err != nil { - panic(err) - } - - if err = dataChannel.SendText(string(marshaled)); err != nil { - panic(err) - } - }) - - dataChannel.OnMessage(func(m webrtc.DataChannelMessage) { - if !m.IsString { - log.Fatal("Inbound message is not string") - } - - cascadedMsg := &dataChannelMessage{} - if err := json.Unmarshal(m.Data, cascadedMsg); err != nil { - log.Fatal(err) - } - - switch cascadedMsg.Event { - case "error": - audioTrack <- nil - videoTrack <- nil - case "subscribe": - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: cascadedMsg.SDP}); err != nil { - panic(err) - } - - default: - log.Fatalf("Unknown msg Event type %s", msg.Event) - } - }) - - offer, err := peerConnection.CreateOffer(nil) - if err != nil { - panic(err) - } - - if err := peerConnection.SetLocalDescription(offer); err != nil { - panic(err) - } - - resp, err := http.Post("http://"+msg.FOCI+"/createSession", "application/text", bytes.NewBuffer([]byte(offer.SDP))) - if err != nil { - panic(err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - panic(fmt.Sprintf("Got HTTP Status code %d", resp.StatusCode)) - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(body)}); err != nil { - panic(err) - } - - return <-audioTrack, <-videoTrack - -} diff --git a/config.yaml.sample b/config.yaml.sample new file mode 100644 index 0000000..0b1b4df --- /dev/null +++ b/config.yaml.sample @@ -0,0 +1,4 @@ +homeserverurl: "http://localhost:8008" +userid: "@sfu:shadowfax" +accesstoken: "..." +timeout: 30 diff --git a/foci.go b/foci.go deleted file mode 100644 index d670c63..0000000 --- a/foci.go +++ /dev/null @@ -1,254 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "strings" - "sync" - "time" - - "github.com/pion/rtcp" - "github.com/pion/webrtc/v3" -) - -type streamDetail struct { - callID, deviceID, purpose string - track *webrtc.TrackLocalStaticRTP -} - -type setStreamDetails func(newCallID, newDeviceID, newPurpose string) - -type foci struct { - name string - streamDetailsMu sync.RWMutex - streamDetails []streamDetail -} - -func (f *foci) localStreamLookup(msg dataChannelMessage) (audioTrack, videoTrack webrtc.TrackLocal) { - f.streamDetailsMu.Lock() - defer f.streamDetailsMu.Unlock() - - for _, s := range f.streamDetails { - if s.callID == msg.CallID && s.deviceID == msg.DeviceID && s.purpose == msg.Purpose { - if s.track.Kind() == webrtc.RTPCodecTypeAudio { - audioTrack = s.track - } else { - videoTrack = s.track - } - } - } - return -} - -func (f *foci) dataChannelHandler(peerConnection *webrtc.PeerConnection, d *webrtc.DataChannel, setPublishDetails setStreamDetails) { - sendError := func(errMsg string) { - marshaled, err := json.Marshal(&dataChannelMessage{ - Event: "error", - Message: errMsg, - }) - if err != nil { - panic(err) - } - - if err = d.SendText(string(marshaled)); err != nil { - panic(err) - } - } - - d.OnMessage(func(m webrtc.DataChannelMessage) { - if !m.IsString { - log.Fatal("Inbound message is not string") - } - - msg := &dataChannelMessage{} - if err := json.Unmarshal(m.Data, msg); err != nil { - log.Fatal(err) - } - - switch msg.Event { - case "publish": - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: msg.SDP, - }); err != nil { - panic(err) - } - - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - panic(err) - } - - if err := peerConnection.SetLocalDescription(answer); err != nil { - panic(err) - } - - setPublishDetails(msg.CallID, msg.DeviceID, msg.Purpose) - - msg.SDP = answer.SDP - marshaled, err := json.Marshal(msg) - if err != nil { - panic(err) - } - - if err = d.SendText(string(marshaled)); err != nil { - panic(err) - } - case "subscribe": - var audioTrack, videoTrack webrtc.TrackLocal - - if msg.FOCI == f.name { - audioTrack, videoTrack = f.localStreamLookup(*msg) - } else { - audioTrack, videoTrack = remoteStreamLookup(*msg) - } - - if audioTrack == nil && videoTrack == nil { - sendError("No Such Stream") - return - } - - if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: msg.SDP, - }); err != nil { - panic(err) - } - - if audioTrack != nil { - if _, err := peerConnection.AddTrack(audioTrack); err != nil { - panic(err) - } - } - - if videoTrack != nil { - if _, err := peerConnection.AddTrack(videoTrack); err != nil { - panic(err) - } - } - - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - panic(err) - } - - if err := peerConnection.SetLocalDescription(answer); err != nil { - panic(err) - } - - msg.SDP = answer.SDP - marshaled, err := json.Marshal(msg) - if err != nil { - panic(err) - } - - if err = d.SendText(string(marshaled)); err != nil { - panic(err) - } - default: - log.Fatalf("Unknown msg Event type %s", msg.Event) - } - }) -} - -func (f *foci) handleCreateSession(w http.ResponseWriter, r *http.Request) error { - offer, err := io.ReadAll(r.Body) - if err != nil { - return err - } - - peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) - if err != nil { - return err - } - - var ( - publishDetailsMu sync.RWMutex - callID, deviceID, purpose string - ) - setPublishDetails := func(newCallID, newDeviceID, newPurpose string) { - publishDetailsMu.Lock() - defer publishDetailsMu.Unlock() - - callID = newCallID - deviceID = newDeviceID - purpose = newPurpose - } - - peerConnection.OnTrack(func(trackRemote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { - id := "audio" - if strings.Contains(trackRemote.Codec().MimeType, "video") { - id = "video" - - // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval - go func() { - ticker := time.NewTicker(time.Millisecond * 200) - for range ticker.C { - if errSend := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(trackRemote.SSRC())}}); errSend != nil { - fmt.Println(errSend) - } - } - }() - - } - - publishDetailsMu.Lock() - f.streamDetailsMu.Lock() - trackLocal, err := webrtc.NewTrackLocalStaticRTP(trackRemote.Codec().RTPCodecCapability, id, fmt.Sprintf("%s-%s-%s", callID, deviceID, purpose)) - if err != nil { - panic(err) - } - - f.streamDetails = append(f.streamDetails, streamDetail{ - callID: callID, - deviceID: deviceID, - purpose: purpose, - track: trackLocal, - }) - f.streamDetailsMu.Unlock() - publishDetailsMu.Unlock() - - copyRemoteToLocal(trackRemote, trackLocal) - }) - - peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { - f.dataChannelHandler(peerConnection, d, setPublishDetails) - }) - - peerConnection.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: string(offer), - }) - - answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - return err - } - - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) - if err = peerConnection.SetLocalDescription(answer); err != nil { - return err - } - <-gatherComplete - - _, err = fmt.Fprintf(w, peerConnection.LocalDescription().SDP) - return err -} - -func copyRemoteToLocal(trackRemote *webrtc.TrackRemote, trackLocal *webrtc.TrackLocalStaticRTP) { - buff := make([]byte, 1500) - for { - i, _, err := trackRemote.Read(buff) - if err != nil { - panic(err) - } - - if _, err = trackLocal.Write(buff[:i]); err != nil { - panic(err) - } - } - -} diff --git a/go.mod b/go.mod index 806ffa5..5d0716a 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,17 @@ -module github.com/Sean-Der/sfu-to-sfu +module github.com/matrix-org/sfu-to-sfu go 1.18 require github.com/pion/webrtc/v3 v3.1.31 +require ( + github.com/pion/rtcp v1.2.9 + gopkg.in/yaml.v3 v3.0.1 + maunium.net/go/mautrix v0.11.0 +) + +replace maunium.net/go/mautrix v0.11.0 => ../mautrix-go + require ( github.com/google/uuid v1.3.0 // indirect github.com/pion/datachannel v1.5.2 // indirect @@ -13,7 +21,6 @@ require ( github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.5 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.9 // indirect github.com/pion/rtp v1.7.13 // indirect github.com/pion/sctp v1.8.2 // indirect github.com/pion/sdp/v3 v3.0.4 // indirect @@ -22,8 +29,12 @@ require ( github.com/pion/transport v0.13.0 // indirect github.com/pion/turn/v2 v2.0.8 // indirect github.com/pion/udp v0.1.1 // indirect - golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect - golang.org/x/net v0.0.0-20220401154927-543a649e0bdd // indirect - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + github.com/tidwall/gjson v1.14.1 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.4 // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect ) diff --git a/go.sum b/go.sum index 680dca0..e95d047 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,10 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -79,14 +81,24 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= +github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc= +github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -99,8 +111,9 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220401154927-543a649e0bdd h1:zYlwaUHTmxuf6H7hwO2dgwqozQmH7zf4x+/qql4oVWc= golang.org/x/net v0.0.0-20220401154927-543a649e0bdd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e h1:TsQ7F31D3bUCLeqPT0u+yjp1guoArKaNKmCr22PYgTQ= +golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -116,8 +129,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -141,6 +155,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -148,5 +163,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go deleted file mode 100644 index 11db9d4..0000000 --- a/main.go +++ /dev/null @@ -1,94 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "io/ioutil" - "log" - "net" - "net/http" - "text/template" -) - -func main() { - fociCount := flag.Int("foci-count", 4, "How many FOCI should be started") - fociName := flag.String("foci-name", "localhost", "Name of this FOCI. Used to determine if a cascing request should be made") - httpAddress := flag.String("http-address", ":8080", "Address for frontend for FOCI cluster") - flag.Parse() - - fociPorts := []int{} - for i := 0; i < *fociCount; i++ { - fociPort, err := createFoci(*fociName) - if err != nil { - log.Fatal(err) - } - log.Printf("Starting FOCI on port %d ", fociPort) - - fociPorts = append(fociPorts, fociPort) - } - - log.Print("Serving HTTP on " + *httpAddress) - fileServer := &http.Server{ - Addr: *httpAddress, - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - indexHTML, err := ioutil.ReadFile("static/index.html") - if err != nil { - panic(err) - } - - if err := template.Must(template.New("").Parse(string(indexHTML))).Execute(w, fociPorts); err != nil { - log.Fatal(err) - } - - }), - } - log.Fatal(fileServer.ListenAndServe()) -} - -func createFoci(fociName string) (int, error) { - listener, err := net.Listen("tcp", ":0") - if err != nil { - return 0, err - } - - fociPort := listener.Addr().(*net.TCPAddr).Port - go func() { - fociWebRTCServer := &foci{ - name: fmt.Sprintf("%s:%d", fociName, fociPort), - } - fociHTTPServer := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - setCorsHeaders(w) - - switch { - case r.URL.String() == "/createSession" && r.Method == "POST": - if err := fociWebRTCServer.handleCreateSession(w, r); err != nil { - log.Fatal(err) - } - } - }), - } - - log.Fatal(fociHTTPServer.Serve(listener)) - }() - - return fociPort, nil -} - -type dataChannelMessage struct { - Event string `json:"event"` - Message string `json:"message,omitempty"` - ID string `json:"id"` - CallID string `json:"call_id"` - DeviceID string `json:"device_id"` - Purpose string `json:"purpose"` - SDP string `json:"sdp"` - FOCI string `json:"foci"` -} - -func setCorsHeaders(w http.ResponseWriter) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization,X-CSRF-Token") - w.Header().Set("Access-Control-Expose-Headers", "Authorization") -} diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..5011d8c --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +go build -o dist/bin src/*.go diff --git a/scripts/profile.sh b/scripts/profile.sh new file mode 100755 index 0000000..071103e --- /dev/null +++ b/scripts/profile.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +go run ./src/*.go --cpuProfile cpuProfile.pprof --memProfile memProfile.pprof --logTime diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..bec8362 --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +go run ./src/*.go --logTime diff --git a/src/call.go b/src/call.go new file mode 100644 index 0000000..231e22f --- /dev/null +++ b/src/call.go @@ -0,0 +1,498 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "log" + "time" + + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" + + "github.com/pion/webrtc/v3" +) + +type Call struct { + CallID string + UserID id.UserID + DeviceID id.DeviceID + LocalSessionID id.SessionID + RemoteSessionID id.SessionID + Client *mautrix.Client + PeerConnection *webrtc.PeerConnection + Conf *Conference + dataChannel *webrtc.DataChannel + lastKeepAliveTimestamp time.Time + sentEndOfCandidates bool +} + +func (c *Call) onDCSelect(start []event.SFUTrackDescription) { + if len(start) == 0 { + return + } + + for _, trackDesc := range start { + log.Printf("%s | selecting StreamID %s TrackID %s", c.UserID, trackDesc.StreamID, trackDesc.TrackID) + foundTracks := c.Conf.GetLocalTrackByInfo(LocalTrackInfo{ + StreamID: trackDesc.StreamID, + TrackID: trackDesc.TrackID, + }) + if len(foundTracks) == 0 { + log.Printf("%s | no track found StreamID %s TrackID %s", c.UserID, trackDesc.StreamID, trackDesc.TrackID) + continue + } + for _, track := range foundTracks { + if _, err := c.PeerConnection.AddTrack(track); err == nil { + log.Printf("%s | added %s StreamID %s TrackID %s", c.UserID, track.Kind(), track.StreamID(), track.ID()) + } else { + log.Printf("%s | failed to add %s StreamID %s TrackID %s", c.UserID, track.Kind(), track.StreamID(), track.ID()) + } + } + } +} + +func (c *Call) onDCPublish(sdp string) { + log.Printf("%s | received DC publish", c.UserID) + + err := c.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }) + if err != nil { + log.Printf("%s | failed to set remote description %+v - ignoring: %s", c.UserID, sdp, err) + return + } + + offer, err := c.PeerConnection.CreateAnswer(nil) + if err != nil { + log.Printf("%s | failed to create answer - ignoring: %s", c.UserID, err) + return + } + err = c.PeerConnection.SetLocalDescription(offer) + if err != nil { + log.Printf("%s | failed to set local description %+v - ignoring: %s", c.UserID, offer.SDP, err) + return + } + + c.SendDataChannelMessage(event.SFUMessage{ + Op: event.SFUOperationAnswer, + SDP: offer.SDP, + }) +} + +func (c *Call) onDCUnpublish(stop []event.SFUTrackDescription, sdp string) { + for _, trackDesc := range stop { + log.Printf("%s | unpublishing StreamID %s TrackID %s", c.UserID, trackDesc.StreamID, trackDesc.TrackID) + if removedTracksCount := c.Conf.RemoveTracksFromPeerConnectionsByInfo(LocalTrackInfo{ + StreamID: trackDesc.StreamID, + TrackID: trackDesc.TrackID, + }); removedTracksCount == 0 { + log.Printf("%s | no tracks to remove for: %+v", c.UserID, stop) + } + + } + + err := c.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }) + if err != nil { + log.Printf("%s | failed to set remote description %+v - ignoring: %s", c.UserID, sdp, err) + return + } + + offer, err := c.PeerConnection.CreateAnswer(nil) + if err != nil { + log.Printf("%s | failed to create answer - ignoring: %s", c.UserID, err) + return + } + err = c.PeerConnection.SetLocalDescription(offer) + if err != nil { + log.Printf("%s | failed to set local description %+v - ignoring: %s", c.UserID, offer.SDP, err) + return + } + + c.SendDataChannelMessage(event.SFUMessage{ + Op: event.SFUOperationAnswer, + SDP: offer.SDP, + }) +} + +func (c *Call) onDCAnswer(sdp string) { + log.Printf("%s | received DC answer", c.UserID) + + err := c.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }) + if err != nil { + log.Printf("%s | failed to set remote description %+v - ignoring: %s", c.UserID, sdp, err) + return + } +} + +func (c *Call) onDCAlive() { + c.lastKeepAliveTimestamp = time.Now() + +} + +func (c *Call) onDCMetadata(metadata event.CallSDPStreamMetadata) { + log.Printf("%s | received DC metadata", c.UserID) + + c.Conf.SendUpdatedMetadataFromCall(c.CallID) +} + +func (c *Call) dataChannelHandler(d *webrtc.DataChannel) { + c.dataChannel = d + + d.OnOpen(func() { + c.SendDataChannelMessage(event.SFUMessage{Op: event.SFUOperationMetadata}) + }) + + d.OnError(func(err error) { + log.Fatalf("%s | DC error: %s", c.CallID, err) + }) + + d.OnMessage(func(m webrtc.DataChannelMessage) { + if !m.IsString { + log.Printf("%s | inbound message is not string - ignoring: %+v", c.UserID, m) + return + } + + msg := &event.SFUMessage{} + if err := json.Unmarshal(m.Data, msg); err != nil { + log.Printf("%s | failed to unmarshal %+v - ignoring: %s", c.CallID, msg, err) + return + } + + if msg.Metadata != nil { + c.Conf.UpdateSDPStreamMetadata(c.DeviceID, msg.Metadata) + } + + switch msg.Op { + case event.SFUOperationSelect: + c.onDCSelect(msg.Start) + case event.SFUOperationPublish: + c.onDCPublish(msg.SDP) + case event.SFUOperationUnpublish: + c.onDCUnpublish(msg.Stop, msg.SDP) + case event.SFUOperationAnswer: + c.onDCAnswer(msg.SDP) + case event.SFUOperationAlive: + c.onDCAlive() + case event.SFUOperationMetadata: + c.onDCMetadata(msg.Metadata) + + default: + log.Printf("Unknown operation - ignoring: %s", msg.Op) + // TODO: hook up msg.Stop to unsubscribe from tracks + // TODO: hook cascade back up. + // As we're not an AS, we'd rely on the client + // to send us a "connect" op to tell us how to + // connect to another focus in order to select + // its streams. + } + }) +} + +func (c *Call) negotiationNeededHandler() { + offer, err := c.PeerConnection.CreateOffer(nil) + if err != nil { + log.Printf("%s | failed to create offer - ignoring: %s", c.UserID, err) + return + } + err = c.PeerConnection.SetLocalDescription(offer) + if err != nil { + log.Printf("%s | failed to set local description %+v - ignoring: %s", c.UserID, offer.SDP, err) + return + } + + c.SendDataChannelMessage(event.SFUMessage{ + Op: event.SFUOperationOffer, + SDP: offer.SDP, + }) +} + +func (c *Call) iceCandidateHandler(candidate *webrtc.ICECandidate) { + if candidate == nil { + return + } + + jsonCandidate := candidate.ToJSON() + + candidateEvtContent := &event.Content{ + Parsed: event.CallCandidatesEventContent{ + BaseCallEventContent: event.BaseCallEventContent{ + CallID: c.CallID, + ConfID: c.Conf.ConfID, + DeviceID: c.Client.DeviceID, + SenderSessionID: c.LocalSessionID, + DestSessionID: c.RemoteSessionID, + PartyID: string(c.Client.DeviceID), + Version: event.CallVersion("1"), + }, + Candidates: []event.CallCandidate{{ + Candidate: jsonCandidate.Candidate, + SDPMLineIndex: int(*jsonCandidate.SDPMLineIndex), + SDPMID: *jsonCandidate.SDPMid, + }}, + }, + } + c.sendToDevice(event.CallCandidates, candidateEvtContent) +} + +func (c *Call) trackHandler(trackRemote *webrtc.TrackRemote, rec *webrtc.RTPReceiver) { + go WriteRTCP(trackRemote, c.PeerConnection) + + trackLocal, err := webrtc.NewTrackLocalStaticRTP(trackRemote.Codec().RTPCodecCapability, trackRemote.ID(), trackRemote.StreamID()) + if err != nil { + log.Printf("%s | failed to create new track local static RTP %+v - ignoring: %s", c.UserID, trackRemote.Codec().RTPCodecCapability, err) + return + } + + c.Conf.Tracks.Mutex.Lock() + c.Conf.Tracks.Tracks = append(c.Conf.Tracks.Tracks, LocalTrackWithInfo{ + Track: trackLocal, + Info: LocalTrackInfo{ + TrackID: trackLocal.ID(), + StreamID: trackLocal.StreamID(), + Call: c, + }, + }) + c.Conf.Tracks.Mutex.Unlock() + + log.Printf("%s | published %s StreamID %s TrackID %s", c.UserID, trackLocal.Kind(), trackLocal.StreamID(), trackLocal.ID()) + + go c.Conf.SendUpdatedMetadataFromCall(c.CallID) + go CopyRemoteToLocal(trackRemote, trackLocal) +} + +func (c *Call) iceConnectionStateHandler(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateCompleted || state == webrtc.ICEConnectionStateConnected { + c.lastKeepAliveTimestamp = time.Now() + go c.CheckKeepAliveTimestamp() + + if !c.sentEndOfCandidates { + candidateEvtContent := &event.Content{ + Parsed: event.CallCandidatesEventContent{ + BaseCallEventContent: event.BaseCallEventContent{ + CallID: c.CallID, + ConfID: c.Conf.ConfID, + DeviceID: c.Client.DeviceID, + SenderSessionID: c.LocalSessionID, + DestSessionID: c.RemoteSessionID, + PartyID: string(c.Client.DeviceID), + Version: event.CallVersion("1"), + }, + Candidates: []event.CallCandidate{{Candidate: ""}}, + }, + } + c.sendToDevice(event.CallCandidates, candidateEvtContent) + c.sentEndOfCandidates = true + } + } +} + +func (c *Call) OnInvite(content *event.CallInviteEventContent) { + c.Conf.UpdateSDPStreamMetadata(c.DeviceID, content.SDPStreamMetadata) + offer := content.Offer + + peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + log.Panicf("%s | failed to create new peer connection: %s", c.UserID, err) + } + c.PeerConnection = peerConnection + + peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + c.trackHandler(track, receiver) + }) + peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { + c.dataChannelHandler(d) + }) + peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) { + c.iceCandidateHandler(candidate) + }) + peerConnection.OnNegotiationNeeded(func() { + c.negotiationNeededHandler() + }) + peerConnection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + c.iceConnectionStateHandler(state) + }) + + err = peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: offer.SDP, + }) + if err != nil { + log.Printf("%s | failed to set remote description %+v - ignoring: %s", c.UserID, offer.SDP, err) + return + } + + answer, err := peerConnection.CreateAnswer(nil) + if err != nil { + log.Printf("%s | failed to create answer - ignoring: %s", c.UserID, err) + return + } + + // TODO: trickle ICE for fast conn setup, rather than block here + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) + if err = peerConnection.SetLocalDescription(answer); err != nil { + log.Printf("%s | failed to set local description %+v - ignoring: %s", c.UserID, offer.SDP, err) + return + } + <-gatherComplete + + answerEvtContent := &event.Content{ + Parsed: event.CallAnswerEventContent{ + BaseCallEventContent: event.BaseCallEventContent{ + CallID: c.CallID, + ConfID: c.Conf.ConfID, + DeviceID: c.Client.DeviceID, + SenderSessionID: c.LocalSessionID, + DestSessionID: c.RemoteSessionID, + PartyID: string(c.Client.DeviceID), + Version: event.CallVersion("1"), + }, + Answer: event.CallData{ + Type: "answer", + SDP: peerConnection.LocalDescription().SDP, + }, + SDPStreamMetadata: c.Conf.GetRemoteMetadataForDevice(c.DeviceID), + }, + } + c.sendToDevice(event.CallAnswer, answerEvtContent) +} + +func (c *Call) OnSelectAnswer(content *event.CallSelectAnswerEventContent) { + selectedPartyID := content.SelectedPartyID + if selectedPartyID != string(c.Client.DeviceID) { + c.Terminate() + log.Printf("%s | call was answered on a different device: %s", c.UserID, selectedPartyID) + } +} + +func (c *Call) OnHangup(content *event.CallHangupEventContent) { + c.Terminate() +} + +func (c *Call) OnCandidates(content *event.CallCandidatesEventContent) { + for _, candidate := range content.Candidates { + sdpMLineIndex := uint16(candidate.SDPMLineIndex) + ice := webrtc.ICECandidateInit{ + Candidate: candidate.Candidate, + SDPMid: &candidate.SDPMID, + SDPMLineIndex: &sdpMLineIndex, + UsernameFragment: new(string), + } + if err := c.PeerConnection.AddICECandidate(ice); err != nil { + log.Printf("%s | failed to add ICE candidate %+v: %s", c.UserID, content, err) + } + } +} + +func (c *Call) Terminate() { + log.Printf("%s | terminating call", c.UserID) + + if err := c.PeerConnection.Close(); err != nil { + log.Printf("%s | error closing peer connection: %s", c.UserID, err) + } + + c.Conf.Calls.CallsMu.Lock() + delete(c.Conf.Calls.Calls, c.CallID) + c.Conf.Calls.CallsMu.Unlock() + + info := LocalTrackInfo{Call: c} + c.Conf.RemoveTracksFromPeerConnectionsByInfo(info) + c.Conf.RemoveTracksFromConfByInfo(info) + c.Conf.RemoveMetadataByDeviceID(c.DeviceID) + c.Conf.SendUpdatedMetadataFromCall(c.CallID) +} + +func (c *Call) Hangup(reason event.CallHangupReason) { + hangupEvtContent := &event.Content{ + Parsed: event.CallHangupEventContent{ + BaseCallEventContent: event.BaseCallEventContent{ + CallID: c.CallID, + ConfID: c.Conf.ConfID, + DeviceID: c.Client.DeviceID, + SenderSessionID: c.LocalSessionID, + DestSessionID: c.RemoteSessionID, + PartyID: string(c.Client.DeviceID), + Version: event.CallVersion("1"), + }, + Reason: reason, + }, + } + c.sendToDevice(event.CallHangup, hangupEvtContent) + c.Terminate() +} + +func (c *Call) sendToDevice(callType event.Type, content *event.Content) { + if callType.Type != event.CallCandidates.Type { + log.Printf("%s | sending to device %s", c.UserID, callType.Type) + } + toDevice := &mautrix.ReqSendToDevice{ + Messages: map[id.UserID]map[id.DeviceID]*event.Content{ + c.UserID: { + c.DeviceID: content, + }, + }, + } + + // TODO: E2EE + // TODO: to-device reliability + c.Client.SendToDevice(callType, toDevice) +} + +func (c *Call) SendDataChannelMessage(msg event.SFUMessage) { + if c.dataChannel == nil { + return + } + + msg.Metadata = c.Conf.GetRemoteMetadataForDevice(c.DeviceID) + if msg.Op == "metadata" && len(msg.Metadata) == 0 { + return + } + + marshaled, err := json.Marshal(msg) + if err != nil { + log.Printf("%s | failed to marshal %+v - ignoring: %s", c.UserID, msg, err) + return + } + + err = c.dataChannel.SendText(string(marshaled)) + if err != nil { + log.Printf("%s | failed to send %s over DC: %s", c.UserID, msg.Op, err) + } + + log.Printf("%s | sent DC %s", c.UserID, msg.Op) +} + +func (c *Call) CheckKeepAliveTimestamp() { + timeout := time.Second * time.Duration(config.Timeout) + for range time.Tick(timeout) { + if c.lastKeepAliveTimestamp.Add(timeout).Before(time.Now()) { + if c.PeerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed { + log.Printf("%s | did not get keep-alive message in the last %s:", c.UserID, timeout) + c.Hangup(event.CallHangupKeepAliveTimeout) + } + break + } + } +} diff --git a/src/conference.go b/src/conference.go new file mode 100644 index 0000000..26f7621 --- /dev/null +++ b/src/conference.go @@ -0,0 +1,247 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "errors" + "log" + "sync" + + "github.com/pion/webrtc/v3" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +type LocalTrackInfo struct { + StreamID string + TrackID string + Call *Call +} + +type LocalTrackWithInfo struct { + Track *webrtc.TrackLocalStaticRTP + Info LocalTrackInfo +} + +type Calls struct { + CallsMu sync.RWMutex + Calls map[string]*Call // By callID +} + +type Tracks struct { + Mutex sync.RWMutex + Tracks []LocalTrackWithInfo +} + +type Metadata struct { + Mutex sync.RWMutex + Metadata event.CallSDPStreamMetadata +} + +type Conference struct { + ConfID string + Calls Calls + Tracks Tracks + Metadata Metadata +} + +func (c *Conference) GetCall(callID string, create bool) (*Call, error) { + c.Calls.CallsMu.Lock() + defer c.Calls.CallsMu.Unlock() + ca := c.Calls.Calls[callID] + if ca == nil { + if create { + ca = &Call{ + CallID: callID, + Conf: c, + } + c.Calls.Calls[callID] = ca + } else { + return nil, errors.New("no such call") + } + } + return ca, nil +} + +func (c *Conference) getLocalTrackIndicesByInfo(selectInfo LocalTrackInfo) (tracks []int) { + c.Tracks.Mutex.Lock() + defer c.Tracks.Mutex.Unlock() + + foundIndices := []int{} + for index, track := range c.Tracks.Tracks { + info := track.Info + if selectInfo.Call != nil && selectInfo.Call != info.Call { + continue + } + if selectInfo.StreamID != "" && selectInfo.StreamID != info.StreamID { + continue + } + if selectInfo.TrackID != "" && selectInfo.TrackID != info.TrackID { + continue + } + foundIndices = append(foundIndices, index) + } + + return foundIndices +} + +func (c *Conference) GetLocalTrackByInfo(selectInfo LocalTrackInfo) (tracks []webrtc.TrackLocal) { + indices := c.getLocalTrackIndicesByInfo(selectInfo) + + c.Tracks.Mutex.Lock() + defer c.Tracks.Mutex.Unlock() + + foundTracks := []webrtc.TrackLocal{} + for _, index := range indices { + foundTracks = append(foundTracks, c.Tracks.Tracks[index].Track) + } + + return foundTracks +} + +func (c *Conference) RemoveTracksFromPeerConnectionsByInfo(removeInfo LocalTrackInfo) int { + indices := c.getLocalTrackIndicesByInfo(removeInfo) + + c.Tracks.Mutex.Lock() + defer c.Tracks.Mutex.Unlock() + + // FIXME: the big O of this must be awful... + for _, index := range indices { + info := c.Tracks.Tracks[index].Info + + for _, call := range c.Calls.Calls { + for _, sender := range call.PeerConnection.GetSenders() { + if info.TrackID == sender.Track().ID() { + log.Printf("%s | removing %s StreamID %s TrackID %s", call.UserID, sender.Track().Kind(), sender.Track().StreamID(), sender.Track().ID()) + if err := sender.Stop(); err != nil { + log.Printf("%s | failed to stop sender: %s", call.UserID, err) + } + if err := call.PeerConnection.RemoveTrack(sender); err != nil { + log.Printf("%s | failed to remove track: %s", call.UserID, err) + } + } + } + } + } + + return len(indices) +} + +func (c *Conference) RemoveTracksFromConfByInfo(removeInfo LocalTrackInfo) { + indicesToRemove := c.getLocalTrackIndicesByInfo(removeInfo) + + c.Tracks.Mutex.Lock() + defer c.Tracks.Mutex.Unlock() + + newTracks := []LocalTrackWithInfo{} + for index, track := range c.Tracks.Tracks { + keep := true + for _, indexToRemove := range indicesToRemove { + if indexToRemove == index { + keep = false + } + } + if keep { + newTracks = append(newTracks, track) + } + } + + c.Tracks.Tracks = newTracks +} + +func (c *Conference) RemoveOldCallsByDeviceAndSessionIDs(deviceID id.DeviceID, sessionID id.SessionID) error { + var err error + for _, call := range c.Calls.Calls { + if call.DeviceID == deviceID { + if call.RemoteSessionID == sessionID { + err = errors.New("found existing call with equal DeviceID and SessionID") + } else { + call.Terminate() + } + } + } + return err +} + +func (c *Conference) UpdateSDPStreamMetadata(deviceID id.DeviceID, metadata event.CallSDPStreamMetadata) { + c.Metadata.Mutex.Lock() + defer c.Metadata.Mutex.Unlock() + + // Update existing and add new + for streamID, info := range metadata { + c.Metadata.Metadata[streamID] = info + } + // Remove removed + for streamID, info := range c.Metadata.Metadata { + _, exists := metadata[streamID] + if info.DeviceID == deviceID && !exists { + delete(c.Metadata.Metadata, streamID) + } + } +} + +// Get metadata to send to deviceID. This will not include the device's own +// metadata and metadata which includes tracks which we have not received yet +func (c *Conference) GetRemoteMetadataForDevice(deviceID id.DeviceID) event.CallSDPStreamMetadata { + // First we copy the metadata + metadata := make(event.CallSDPStreamMetadata) + c.Metadata.Mutex.Lock() + for streamID, info := range c.Metadata.Metadata { + metadata[streamID] = info + } + c.Metadata.Mutex.Unlock() + // Loop over the copied metadata + for streamID, info := range metadata { + // Delete metadata received from the device that we're sending metadata to + if info.DeviceID == deviceID { + delete(metadata, streamID) + continue + } + // Loop over the tracks in the copied metadata + for trackID := range info.Tracks { + // Delete metadata, if we're the client hasn't published a track that is + // included in the metadata yet + if len(c.getLocalTrackIndicesByInfo(LocalTrackInfo{ + StreamID: streamID, + TrackID: trackID, + })) == 0 { + delete(metadata, streamID) + break + } + } + } + return metadata +} + +func (c *Conference) RemoveMetadataByDeviceID(deviceID id.DeviceID) { + c.Metadata.Mutex.Lock() + defer c.Metadata.Mutex.Unlock() + + for streamID, info := range c.Metadata.Metadata { + if info.DeviceID == deviceID { + delete(c.Metadata.Metadata, streamID) + } + } +} + +func (c *Conference) SendUpdatedMetadataFromCall(callID string) { + for _, call := range c.Calls.Calls { + if call.CallID != callID { + call.SendDataChannelMessage(event.SFUMessage{Op: event.SFUOperationMetadata}) + } + } +} diff --git a/src/focus.go b/src/focus.go new file mode 100644 index 0000000..6b5ec40 --- /dev/null +++ b/src/focus.go @@ -0,0 +1,158 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "errors" + "log" + "strings" + "sync" + + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" +) + +type Confs struct { + confsMu sync.RWMutex + confs map[string]*Conference +} + +type Focus struct { + name string + client *mautrix.Client + confs Confs +} + +func NewFocus(name string, client *mautrix.Client) *Focus { + f := new(Focus) + + f.name = name + f.client = client + f.confs.confs = make(map[string]*Conference) + + return f +} + +func (f *Focus) GetConf(confID string, create bool) (*Conference, error) { + f.confs.confsMu.Lock() + defer f.confs.confsMu.Unlock() + co := f.confs.confs[confID] + if co == nil { + if create { + co = &Conference{ + ConfID: confID, + } + f.confs.confs[confID] = co + co.Calls.Calls = make(map[string]*Call) + co.Tracks.Tracks = []LocalTrackWithInfo{} + co.Metadata.Metadata = make(event.CallSDPStreamMetadata) + } else { + return nil, errors.New("no such conf") + } + } + return co, nil +} + +func (f *Focus) getExistingCall(confID string, callID string) (*Call, error) { + var conf *Conference + var call *Call + var err error + + if conf, err = f.GetConf(confID, false); err != nil || conf == nil { + log.Printf("failed to get conf %s: %s", confID, err) + return nil, err + } + if call, err = conf.GetCall(callID, false); err != nil || call == nil { + log.Printf("failed to get call %s: %s", callID, err) + return nil, err + } + return call, nil +} + +func (f *Focus) onEvent(_ mautrix.EventSource, evt *event.Event) { + // We only care about to-device events + if evt.Type.Class != event.ToDeviceEventType { + return + } + + if !strings.HasPrefix(evt.Type.Type, "m.call.") && !strings.HasPrefix(evt.Type.Type, "org.matrix.call.") { + log.Printf("received non-call to-device event %s", evt.Type.Type) + return + + } else if evt.Type.Type != event.ToDeviceCallCandidates.Type && evt.Type.Type != event.ToDeviceCallSelectAnswer.Type { + log.Printf("%s | received to-device event %s", evt.Sender.String(), evt.Type.Type) + } + + if evt.Content.Raw["dest_session_id"] != localSessionID { + log.Printf("%s | SessionID %s does not match our SessionID - ignoring", evt.Content.Raw["dest_session_id"], localSessionID) + return + } + + var conf *Conference + var call *Call + var err error + + switch evt.Type.Type { + case event.ToDeviceCallInvite.Type: + invite := evt.Content.AsCallInvite() + if conf, err = f.GetConf(invite.ConfID, true); err != nil { + log.Printf("%s | failed to create conf %s: %+v", evt.Sender.String(), invite.ConfID, err) + return + } + if err := conf.RemoveOldCallsByDeviceAndSessionIDs(invite.DeviceID, invite.SenderSessionID); err != nil { + log.Printf("%s | error removing old calls - ignoring call: %+v", evt.Sender.String(), err) + return + } + if call, err = conf.GetCall(invite.CallID, true); err != nil || call == nil { + log.Printf("%s | failed to create call: %+v", evt.Sender.String(), err) + return + } + call.UserID = evt.Sender + call.DeviceID = invite.DeviceID + // XXX: What if an SFU gets restarted? + call.LocalSessionID = localSessionID + call.RemoteSessionID = invite.SenderSessionID + call.Client = f.client + call.OnInvite(invite) + case event.ToDeviceCallCandidates.Type: + candidates := evt.Content.AsCallCandidates() + if call, err = f.getExistingCall((*candidates).ConfID, (*candidates).CallID); err != nil { + return + } + call.OnCandidates(candidates) + case event.ToDeviceCallSelectAnswer.Type: + selectAnswer := evt.Content.AsCallSelectAnswer() + if call, err = f.getExistingCall(selectAnswer.ConfID, selectAnswer.CallID); err != nil { + return + } + call.OnSelectAnswer(selectAnswer) + case event.ToDeviceCallHangup.Type: + hangup := evt.Content.AsCallHangup() + if call, err = f.getExistingCall(hangup.ConfID, hangup.CallID); err != nil { + return + } + call.OnHangup(hangup) + // Events we don't care about + case event.ToDeviceCallNegotiate.Type: + log.Printf("%s | ignoring event %s as should be handled over DC", evt.Sender.String(), evt.Type.Type) + case event.ToDeviceCallReject.Type: + case event.ToDeviceCallAnswer.Type: + log.Printf("%s | ignoring event %s as we are always the ones answering", evt.Sender.String(), evt.Type.Type) + default: + log.Printf("%s | ignoring unrecognised to-device event of type %s", evt.Sender.String(), evt.Type.Type) + } +} diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..264177a --- /dev/null +++ b/src/main.go @@ -0,0 +1,143 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "os/signal" + "runtime" + "runtime/pprof" + "syscall" + + yaml "gopkg.in/yaml.v3" + + "maunium.net/go/mautrix/id" + + _ "net/http/pprof" +) + +type Config struct { + UserID id.UserID + HomeserverURL string + AccessToken string + Timeout int +} + +var config *Config + +var logTime = flag.Bool("logTime", false, "whether or not to print time and date in logs") +var configFilePath = flag.String("config", "config.yaml", "configuration file path") +var cpuProfile = flag.String("cpuProfile", "", "write CPU profile to `file`") +var memProfile = flag.String("memProfile", "", "write memory profile to `file`") + +func initCpuProfiling(cpuProfile *string) func() { + log.Print("initializing CPU profiling") + + f, err := os.Create(*cpuProfile) + if err != nil { + log.Fatalf("could not create CPU profile: %s", err) + } + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatalf("could not start CPU profile: %s", err) + } + + return func() { + pprof.StopCPUProfile() + if err := f.Close(); err != nil { + log.Fatalf("could not close CPU profile: %s", err) + } + } +} + +func initMemoryProfiling(memProfile *string) func() { + log.Print("initializing memory profiling") + + return func() { + f, err := os.Create(*memProfile) + if err != nil { + log.Fatalf("could not create memory profile: %s", err) + } + runtime.GC() + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatalf("could not write memory profile: %s", err) + } + if err = f.Close(); err != nil { + log.Fatalf("could not close memory profile: %s", err) + } + } +} + +func initLogging(logTime *bool) { + log.SetFlags(0) + if *logTime { + log.SetFlags(log.Ldate | log.Ltime) + } +} + +func loadConfig(configFilePath string) (*Config, error) { + log.Printf("loading %s", configFilePath) + file, err := ioutil.ReadFile(configFilePath) + if err != nil { + log.Fatalf("failed to read config: %s", err) + } + var config Config + if err := yaml.Unmarshal(file, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal YAML: %s", err) + } + return &config, nil +} + +func killListener(c chan os.Signal, beforeExit []func()) { + <-c + log.Printf("ending program") + for _, function := range beforeExit { + function() + } + defer os.Exit(0) +} + +func main() { + flag.Parse() + + initLogging(logTime) + + beforeExit := []func(){} + if *cpuProfile != "" { + beforeExit = append(beforeExit, initCpuProfiling(cpuProfile)) + } + if *memProfile != "" { + beforeExit = append(beforeExit, initMemoryProfiling(memProfile)) + } + + // try to handle os interrupt(signal terminated) + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go killListener(c, beforeExit) + + var err error + if config, err = loadConfig(*configFilePath); err != nil { + log.Fatalf("failed to load config file: %s", err) + } + + if err := InitMatrix(); err != nil { + log.Fatalf("failed to init Matrix: %s", err) + } +} diff --git a/src/matrix.go b/src/matrix.go new file mode 100644 index 0000000..e99b7cc --- /dev/null +++ b/src/matrix.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "log" + + "maunium.net/go/mautrix" +) + +const localSessionID = "sfu" + +func InitMatrix() error { + client, err := mautrix.NewClient(config.HomeserverURL, config.UserID, config.AccessToken) + if err != nil { + log.Fatal("Failed to create client", err) + } + + whoami, err := client.Whoami() + if err != nil { + log.Fatal("Failed to identify SFU user", err) + } + if config.UserID != whoami.UserID { + log.Fatalf("Access token is for the wrong user: %s", config.UserID) + } + log.Printf("Identified SFU as device %s", whoami.DeviceID) + client.DeviceID = whoami.DeviceID + + focus := NewFocus(fmt.Sprintf("%s (%s)", config.UserID, client.DeviceID), client) + + syncer := client.Syncer.(*mautrix.DefaultSyncer) + syncer.ParseEventContent = true + + // TODO: E2EE + syncer.OnEvent(focus.onEvent) + + if err = client.Sync(); err != nil { + log.Panic("Sync failed", err) + } + + return nil +} diff --git a/src/utils.go b/src/utils.go new file mode 100644 index 0000000..b131f98 --- /dev/null +++ b/src/utils.go @@ -0,0 +1,70 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "errors" + "io" + "log" + "strings" + "time" + + "github.com/pion/rtcp" + "github.com/pion/webrtc/v3" +) + +func CopyRemoteToLocal(trackRemote *webrtc.TrackRemote, trackLocal *webrtc.TrackLocalStaticRTP) { + buff := make([]byte, 1500) + for { + i, _, err := trackRemote.Read(buff) + if err != nil { + if !errors.Is(err, io.EOF) { + log.Printf("failed read on StreamID %s TrackID %s: %s", trackLocal.StreamID(), trackRemote.ID(), err) + } + break + } + + if _, err = trackLocal.Write(buff[:i]); err != nil { + if !errors.Is(err, io.ErrClosedPipe) { + log.Printf("failed write on StreamID %s TrackID %s: %s", trackLocal.StreamID(), trackLocal.ID(), err) + } + break + } + } +} + +func WriteRTCP(trackRemote *webrtc.TrackRemote, peerConnection *webrtc.PeerConnection) { + if !strings.Contains(trackRemote.Codec().MimeType, "video") { + return + } + + // FIXME: This is a potential performance killer. This can be less wasteful + // by processing incoming RTCP events, then we would emit a NACK/PLI when a + // viewer requests it + // Send a PLI on an interval so that the publisher is pushing a keyframe + // every 200ms + ticker := time.NewTicker(time.Millisecond * 200) + for range ticker.C { + err := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(trackRemote.SSRC())}}) + if err != nil { + if !errors.Is(err, io.ErrClosedPipe) { + log.Printf("ending RTCP write on TrackID %s: %s", trackRemote.ID(), err) + } + break + } + } +} diff --git a/static/index.html b/static/index.html deleted file mode 100644 index b09dabc..0000000 --- a/static/index.html +++ /dev/null @@ -1,166 +0,0 @@ - - - SFU to SFU - - - - - - Connection State: - New -
- -
-

Publish

- - - -
- - Do screenshare: - -

Subscribe

- - - - - -
- -
- -
- - -