Skip to content

Commit b0630f4

Browse files
authored
track_local: Get rid of some unnecessary Mutexes and Options (#646)
1 parent 26d9167 commit b0630f4

File tree

3 files changed

+48
-54
lines changed

3 files changed

+48
-54
lines changed

webrtc/src/rtp_transceiver/rtp_sender/mod.rs

+37-39
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use crate::rtp_transceiver::{
2525
create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters,
2626
RTCRtpTransceiver, SSRC,
2727
};
28-
use crate::track::track_local::{
29-
InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter,
30-
};
28+
use crate::track::track_local::{InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext};
3129

3230
pub(crate) struct RTPSenderInternal {
3331
pub(crate) stop_called_rx: Arc<Notify>,
@@ -38,8 +36,8 @@ pub(crate) struct TrackEncoding {
3836
pub(crate) track: Arc<dyn TrackLocal + Send + Sync>,
3937
pub(crate) srtp_stream: Arc<SrtpWriterFuture>,
4038
pub(crate) rtcp_interceptor: Arc<dyn RTCPReader + Send + Sync>,
41-
pub(crate) stream_info: Mutex<StreamInfo>,
42-
pub(crate) context: Mutex<TrackLocalContext>,
39+
pub(crate) stream_info: StreamInfo,
40+
pub(crate) context: TrackLocalContext,
4341

4442
pub(crate) ssrc: SSRC,
4543

@@ -275,12 +273,21 @@ impl RTCRtpSender {
275273
None
276274
};
277275

276+
let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
277+
let context = TrackLocalContext {
278+
id: self.id.clone(),
279+
params: super::RTCRtpParameters::default(),
280+
ssrc: 0,
281+
write_stream,
282+
paused: self.paused.clone(),
283+
mid: None,
284+
};
278285
let encoding = TrackEncoding {
279286
track,
280287
srtp_stream,
281288
rtcp_interceptor,
282-
stream_info: Mutex::new(StreamInfo::default()),
283-
context: Mutex::new(TrackLocalContext::default()),
289+
stream_info: StreamInfo::default(),
290+
context,
284291
ssrc,
285292
rtx,
286293
};
@@ -390,9 +397,8 @@ impl RTCRtpSender {
390397
.first_mut()
391398
.ok_or(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope)?;
392399

393-
let mut context = encoding.context.lock().await;
394400
if self.has_sent() {
395-
encoding.track.unbind(&context).await?;
401+
encoding.track.unbind(&encoding.context).await?;
396402
}
397403

398404
self.seq_trans.reset_offset();
@@ -406,35 +412,34 @@ impl RTCRtpSender {
406412
.and_then(|t| t.mid());
407413

408414
let new_context = TrackLocalContext {
409-
id: context.id.clone(),
415+
id: encoding.context.id.clone(),
410416
params: self
411417
.media_engine
412418
.get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly),
413-
ssrc: context.ssrc,
414-
write_stream: context.write_stream.clone(),
419+
ssrc: encoding.context.ssrc,
420+
write_stream: encoding.context.write_stream.clone(),
415421
paused: self.paused.clone(),
416422
mid,
417423
};
418424

419425
match t.bind(&new_context).await {
420426
Err(err) => {
421427
// Re-bind the original track
422-
encoding.track.bind(&context).await?;
428+
encoding.track.bind(&encoding.context).await?;
423429

424430
Err(err)
425431
}
426432
Ok(codec) => {
427433
// Codec has changed
428-
context.params.codecs = vec![codec];
434+
encoding.context.params.codecs = vec![codec];
429435
encoding.track = Arc::clone(t);
430436
Ok(())
431437
}
432438
}
433439
} else {
434440
if self.has_sent() {
435441
for encoding in track_encodings.drain(..) {
436-
let context = encoding.context.lock().await;
437-
encoding.track.unbind(&context).await?;
442+
encoding.track.unbind(&encoding.context).await?;
438443
}
439444
} else {
440445
track_encodings.clear();
@@ -449,7 +454,7 @@ impl RTCRtpSender {
449454
if self.has_sent() {
450455
return Err(Error::ErrRTPSenderSendAlreadyCalled);
451456
}
452-
let track_encodings = self.track_encodings.lock().await;
457+
let mut track_encodings = self.track_encodings.lock().await;
453458
if track_encodings.is_empty() {
454459
return Err(Error::ErrRTPSenderTrackRemoved);
455460
}
@@ -461,41 +466,33 @@ impl RTCRtpSender {
461466
.and_then(|t| t.upgrade())
462467
.and_then(|t| t.mid());
463468

464-
for (idx, encoding) in track_encodings.iter().enumerate() {
469+
for (idx, encoding) in track_encodings.iter_mut().enumerate() {
465470
let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
466-
let mut context = TrackLocalContext {
467-
id: self.id.clone(),
468-
params: self.media_engine.get_rtp_parameters_by_kind(
469-
encoding.track.kind(),
470-
RTCRtpTransceiverDirection::Sendonly,
471-
),
472-
ssrc: parameters.encodings[idx].ssrc,
473-
write_stream: Some(
474-
Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync>
475-
),
476-
paused: self.paused.clone(),
477-
mid: mid.to_owned(),
478-
};
471+
encoding.context.params = self.media_engine.get_rtp_parameters_by_kind(
472+
encoding.track.kind(),
473+
RTCRtpTransceiverDirection::Sendonly,
474+
);
475+
encoding.context.ssrc = parameters.encodings[idx].ssrc;
476+
encoding.context.write_stream = Arc::clone(&write_stream) as _;
477+
encoding.context.mid = mid.to_owned();
479478

480-
let codec = encoding.track.bind(&context).await?;
481-
let stream_info = create_stream_info(
479+
let codec = encoding.track.bind(&encoding.context).await?;
480+
encoding.stream_info = create_stream_info(
482481
self.id.clone(),
483482
parameters.encodings[idx].ssrc,
484483
codec.payload_type,
485484
codec.capability.clone(),
486485
&parameters.rtp_parameters.header_extensions,
487486
None,
488487
);
489-
context.params.codecs = vec![codec.clone()];
488+
encoding.context.params.codecs = vec![codec.clone()];
490489

491490
let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc<dyn RTPWriter + Send + Sync>;
492491
let rtp_writer = self
493492
.interceptor
494-
.bind_local_stream(&stream_info, srtp_writer)
493+
.bind_local_stream(&encoding.stream_info, srtp_writer)
495494
.await;
496495

497-
*encoding.context.lock().await = context;
498-
*encoding.stream_info.lock().await = stream_info;
499496
*write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer);
500497

501498
if let (Some(rtx), Some(rtx_codec)) = (
@@ -573,8 +570,9 @@ impl RTCRtpSender {
573570

574571
let track_encodings = self.track_encodings.lock().await;
575572
for encoding in track_encodings.iter() {
576-
let stream_info = encoding.stream_info.lock().await;
577-
self.interceptor.unbind_local_stream(&stream_info).await;
573+
self.interceptor
574+
.unbind_local_stream(&encoding.stream_info)
575+
.await;
578576

579577
encoding.srtp_stream.close().await?;
580578

webrtc/src/track/track_local/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ pub trait TrackLocalWriter: fmt::Debug {
4747

4848
/// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
4949
/// in Interceptors.
50-
#[derive(Default, Debug, Clone)]
50+
#[derive(Debug, Clone)]
5151
pub struct TrackLocalContext {
5252
pub(crate) id: String,
5353
pub(crate) params: RTCRtpParameters,
5454
pub(crate) ssrc: SSRC,
55-
pub(crate) write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
55+
pub(crate) write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
5656
pub(crate) paused: Arc<AtomicBool>,
5757
pub(crate) mid: Option<SmolStr>,
5858
}
@@ -78,7 +78,7 @@ impl TrackLocalContext {
7878

7979
/// write_stream returns the write_stream for this TrackLocal. The implementer writes the outbound
8080
/// media packets to it
81-
pub fn write_stream(&self) -> Option<Arc<dyn TrackLocalWriter + Send + Sync>> {
81+
pub fn write_stream(&self) -> Arc<dyn TrackLocalWriter + Send + Sync> {
8282
self.write_stream.clone()
8383
}
8484

@@ -131,13 +131,13 @@ pub trait TrackLocal {
131131
/// TrackBinding is a single bind for a Track
132132
/// Bind can be called multiple times, this stores the
133133
/// result for a single bind call so that it can be used when writing
134-
#[derive(Default, Debug)]
134+
#[derive(Debug)]
135135
pub(crate) struct TrackBinding {
136136
id: String,
137137
ssrc: SSRC,
138138
payload_type: PayloadType,
139139
params: RTCRtpParameters,
140-
write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
140+
write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
141141
sender_paused: Arc<AtomicBool>,
142142
hdr_ext_ids: Vec<rtp::header::Extension>,
143143
}

webrtc/src/track/track_local/track_local_static_rtp.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,13 @@ impl TrackLocalStaticRTP {
150150
}
151151
}
152152

153-
if let Some(write_stream) = &b.write_stream {
154-
match write_stream.write_rtp_with_attributes(&pkt, attr).await {
155-
Ok(m) => {
156-
n += m;
157-
}
158-
Err(err) => {
159-
write_errs.push(err);
160-
}
153+
match b.write_stream.write_rtp_with_attributes(&pkt, attr).await {
154+
Ok(m) => {
155+
n += m;
156+
}
157+
Err(err) => {
158+
write_errs.push(err);
161159
}
162-
} else {
163-
write_errs.push(Error::new("track binding has none write_stream".to_owned()));
164160
}
165161
}
166162

0 commit comments

Comments
 (0)