44
55/* eslint-disable require-jsdoc */
66/* global Promise, Map, WebTransport, WebTransportBidirectionalStream,
7- Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */
7+ Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor,
8+ MediaStreamTrackGenerator, proto */
89
910'use strict' ;
1011
@@ -44,6 +45,10 @@ export class QuicConnection extends EventDispatcher {
4445 this . _transportId = this . _token . transportId ;
4546 this . _initReceiveStreamReader ( ) ;
4647 this . _worker = new Worker ( workerDir + '/media-worker.js' , { type : 'module' } ) ;
48+ // Key is subscription ID, value is a MediaStreamTrackGenerator writer.
49+ this . _mstVideoGeneratorWriters = new Map ( ) ;
50+ this . _initRtpModule ( ) ;
51+ this . _initDatagramReader ( ) ;
4752 }
4853
4954 /**
@@ -77,15 +82,18 @@ export class QuicConnection extends EventDispatcher {
7782 await this . _authenticate ( this . _tokenString ) ;
7883 }
7984
85+ _initRtpModule ( ) {
86+ this . _worker . postMessage ( [ 'init-rtp' ] ) ;
87+ }
88+
8089 async _initReceiveStreamReader ( ) {
8190 const receiveStreamReader =
8291 this . _quicTransport . incomingBidirectionalStreams . getReader ( ) ;
83- Logger . info ( 'Reader: ' + receiveStreamReader ) ;
8492 let receivingDone = false ;
8593 while ( ! receivingDone ) {
8694 const { value : receiveStream , done : readingReceiveStreamsDone } =
8795 await receiveStreamReader . read ( ) ;
88- Logger . info ( 'New stream received' ) ;
96+ Logger . debug ( 'New stream received. ' ) ;
8997 const subscriptionIdBytes = new Uint8Array ( uuidByteLength ) ;
9098 let subscriptionIdBytesOffset = 0 ;
9199 const trackIdBytes = new Uint8Array ( uuidByteLength ) ;
@@ -173,6 +181,19 @@ export class QuicConnection extends EventDispatcher {
173181 }
174182 }
175183
184+ async _initDatagramReader ( ) {
185+ const datagramReader = this . _quicTransport . datagrams . readable . getReader ( ) ;
186+ let receivingDone = false ;
187+ while ( ! receivingDone ) {
188+ const { value : datagram , done : readingDatagramsDone } =
189+ await datagramReader . read ( ) ;
190+ this . _worker . postMessage ( [ 'rtp-packet' , datagram ] ) ;
191+ if ( readingDatagramsDone ) {
192+ receivingDone = true ;
193+ }
194+ }
195+ }
196+
176197 _createSubscription ( id , receiveStream ) {
177198 // TODO: Incomplete subscription.
178199 const subscription = new Subscription ( id , ( ) => {
@@ -207,6 +228,95 @@ export class QuicConnection extends EventDispatcher {
207228 return quicStream ;
208229 }
209230
231+ async bindFeedbackReader ( stream , publicationId ) {
232+ // The receiver side of a publication stream starts with a UUID of
233+ // publication ID, then each feedback message has a 4 bytes header indicates
234+ // its length, and followed by protobuf encoded body.
235+ const feedbackChunkReader = stream . readable . getReader ( ) ;
236+ let feedbackChunksDone = false ;
237+ let publicationIdOffset = 0 ;
238+ const headerSize = 4 ;
239+ const header = new Uint8Array ( headerSize ) ;
240+ let headerOffset = 0 ;
241+ let bodySize = 0 ;
242+ let bodyOffset = 0 ;
243+ let bodyBytes ;
244+ while ( ! feedbackChunksDone ) {
245+ let valueOffset = 0 ;
246+ const { value, done} = await feedbackChunkReader . read ( ) ;
247+ Logger . debug ( value ) ;
248+ while ( valueOffset < value . byteLength ) {
249+ if ( publicationIdOffset < uuidByteLength ) {
250+ // TODO: Check publication ID matches. For now, we just skip this ID.
251+ const readLength =
252+ Math . min ( uuidByteLength - publicationIdOffset , value . byteLength ) ;
253+ valueOffset += readLength ;
254+ publicationIdOffset += readLength ;
255+ }
256+ if ( headerOffset < headerSize ) {
257+ // Read header.
258+ const copyLength = Math . min (
259+ headerSize - headerOffset , value . byteLength - valueOffset ) ;
260+ if ( copyLength === 0 ) {
261+ continue ;
262+ }
263+ header . set (
264+ value . subarray ( valueOffset , valueOffset + copyLength ) ,
265+ headerOffset ) ;
266+ headerOffset += copyLength ;
267+ valueOffset += copyLength ;
268+ if ( headerOffset < headerSize ) {
269+ continue ;
270+ }
271+ bodySize = 0 ;
272+ bodyOffset = 0 ;
273+ for ( let i = 0 ; i < headerSize ; i ++ ) {
274+ bodySize += ( header [ i ] << ( ( headerSize - 1 - i ) * 8 ) ) ;
275+ }
276+ bodyBytes = new Uint8Array ( bodySize ) ;
277+ Logger . debug ( 'Body size ' + bodySize ) ;
278+ }
279+ if ( bodyOffset < bodySize ) {
280+ const copyLength =
281+ Math . min ( bodySize - bodyOffset , value . byteLength - valueOffset ) ;
282+ if ( copyLength === 0 ) {
283+ continue ;
284+ }
285+ Logger . debug ( 'Bytes for body: ' + copyLength ) ;
286+ bodyBytes . set (
287+ value . subarray ( valueOffset , valueOffset + copyLength ) ,
288+ bodyOffset ) ;
289+ bodyOffset += copyLength ;
290+ valueOffset += copyLength ;
291+ if ( valueOffset < bodySize ) {
292+ continue ;
293+ }
294+ // Decode body.
295+ const feedback =
296+ proto . owt . protobuf . Feedback . deserializeBinary ( bodyBytes ) ;
297+ this . handleFeedback ( feedback , publicationId ) ;
298+ }
299+ }
300+ if ( done ) {
301+ feedbackChunksDone = true ;
302+ break ;
303+ }
304+ }
305+ }
306+
307+ async handleFeedback ( feedback , publicationId ) {
308+ Logger . debug (
309+ 'Key frame request type: ' +
310+ proto . owt . protobuf . Feedback . Type . KEY_FRAME_REQUEST ) ;
311+ if ( feedback . getType ( ) ===
312+ proto . owt . protobuf . Feedback . Type . KEY_FRAME_REQUEST ) {
313+ this . _worker . postMessage (
314+ [ 'rtcp-feedback' , [ 'key-frame-request' , publicationId ] ] ) ;
315+ } else {
316+ Logger . warning ( 'Unrecognized feedback type ' + feedback . getType ( ) ) ;
317+ }
318+ }
319+
210320 async publish ( stream , options ) {
211321 // TODO: Avoid a stream to be published twice. The first 16 bit data send to
212322 // server must be it's publication ID.
@@ -225,6 +335,7 @@ export class QuicConnection extends EventDispatcher {
225335 for ( const track of stream . stream . getTracks ( ) ) {
226336 const quicStream =
227337 await this . _quicTransport . createBidirectionalStream ( ) ;
338+ this . bindFeedbackReader ( quicStream , publicationId ) ;
228339 this . _quicMediaStreamTracks . set ( track . id , quicStream ) ;
229340 quicStreams . push ( quicStream ) ;
230341 }
@@ -262,6 +373,7 @@ export class QuicConnection extends EventDispatcher {
262373 [
263374 'media-sender' ,
264375 [
376+ publicationId ,
265377 track . id ,
266378 track . kind ,
267379 processor . readable ,
@@ -317,12 +429,12 @@ export class QuicConnection extends EventDispatcher {
317429 if ( typeof options !== 'object' ) {
318430 return Promise . reject ( new TypeError ( 'Options should be an object.' ) ) ;
319431 }
320- // if (options.audio === undefined) {
321- // options.audio = !!stream.settings.audio;
322- // }
323- // if (options.video === undefined) {
324- // options.video = !!stream.settings.video;
325- // }
432+ if ( options . audio === undefined ) {
433+ options . audio = ! ! stream . settings . audio ;
434+ }
435+ if ( options . video === undefined ) {
436+ options . video = ! ! stream . settings . video ;
437+ }
326438 let mediaOptions ;
327439 let dataOptions ;
328440 if ( options . audio || options . video ) {
@@ -375,19 +487,38 @@ export class QuicConnection extends EventDispatcher {
375487 } )
376488 . then ( ( data ) => {
377489 this . _subscribeOptions . set ( data . id , options ) ;
378- Logger . debug ( 'Subscribe info is set.' ) ;
379- if ( this . _quicDataStreams . has ( data . id ) ) {
380- // QUIC stream created before signaling returns.
381- // TODO: Update subscription to accept list of QUIC streams.
382- const subscription = this . _createSubscription (
383- data . id , this . _quicDataStreams . get ( data . id ) [ 0 ] ) ;
384- resolve ( subscription ) ;
490+ if ( dataOptions ) {
491+ // A WebTransport stream is associated with a subscription for
492+ // data.
493+ if ( this . _quicDataStreams . has ( data . id ) ) {
494+ // QUIC stream created before signaling returns.
495+ // TODO: Update subscription to accept list of QUIC streams.
496+ const subscription = this . _createSubscription (
497+ data . id , this . _quicDataStreams . get ( data . id ) [ 0 ] ) ;
498+ resolve ( subscription ) ;
499+ } else {
500+ this . _quicDataStreams . set ( data . id , null ) ;
501+ // QUIC stream is not created yet, resolve promise after getting
502+ // QUIC stream.
503+ this . _subscribePromises . set (
504+ data . id , { resolve : resolve , reject : reject } ) ;
505+ }
385506 } else {
386- this . _quicDataStreams . set ( data . id , null ) ;
387- // QUIC stream is not created yet, resolve promise after getting
388- // QUIC stream.
389- this . _subscribePromises . set (
390- data . id , { resolve : resolve , reject : reject } ) ;
507+ // A MediaStream is associated with a subscription for media.
508+ // Media packets are received over WebTransport datagram.
509+ const generators = [ ] ;
510+ for ( const track of mediaOptions ) {
511+ const generator =
512+ new MediaStreamTrackGenerator ( { kind : track . type } ) ;
513+ generators . push ( generator ) ;
514+ // TODO: Update key with the correct SSRC.
515+ this . _mstVideoGeneratorWriters . set (
516+ '0' , generator . writable . getWriter ( ) ) ;
517+ }
518+ const mediaStream = new MediaStream ( generators ) ;
519+ const subscription =
520+ this . _createSubscription ( data . id , mediaStream ) ;
521+ resolve ( subscription ) ;
391522 }
392523 if ( this . _subscriptionInfoReady . has ( data . id ) ) {
393524 this . _subscriptionInfoReady . get ( data . id ) ( ) ;
@@ -454,4 +585,18 @@ export class QuicConnection extends EventDispatcher {
454585 datagramReader ( ) {
455586 return this . _quicTransport . datagrams . readable . getReader ( ) ;
456587 }
588+
589+ initHandlersForWorker ( ) {
590+ this . _worker . onmessage = ( ( e ) => {
591+ const [ command , args ] = e . data ;
592+ switch ( command ) {
593+ case 'video-frame' :
594+ // TODO: Use actual subscription ID.
595+ this . _mstVideoGeneratorWriters . get ( '0' ) . getWriter . write ( args ) ;
596+ break ;
597+ default :
598+ Logger . warn ( 'Unrecognized command ' + command ) ;
599+ }
600+ } ) ;
601+ }
457602}
0 commit comments