Skip to content

Commit

Permalink
Remove unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryDodzin committed Feb 3, 2025
1 parent abe8709 commit ffdc1b0
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,34 @@ use tokio::{
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap, StreamNotifyClose};

pub type MessageBus<T> =
MessageBusInner<<T as BackgroundTask>::MessageIn, <T as BackgroundTask>::MessageOut>;

/// A struct that is meant to be the only way the [`BackgroundTask`]s can communicate with their
/// parents. It allows the tasks to send and receive messages.
pub struct MessageBus<T: BackgroundTask> {
tx: Sender<T::MessageOut>,
rx: Receiver<T::MessageIn>,
pub struct MessageBusInner<MessageIn, MessageOut> {
tx: Sender<MessageOut>,
rx: Receiver<MessageIn>,
// Note if adding any new fields do look at `MessageBus::cast`'s unsafe block.
}

impl<T: BackgroundTask> MessageBus<T> {
impl<MessageIn, MessageOut> MessageBusInner<MessageIn, MessageOut> {
/// Attempts to send a message to this task's parent.
pub async fn send<M: Into<T::MessageOut>>(&self, msg: M) {
pub async fn send<M: Into<MessageOut>>(&self, msg: M) {
let _ = self.tx.send(msg.into()).await;
}

/// Receives a message from this task's parent.
/// [`None`] means that the channel is closed and there will be no more messages.
pub async fn recv(&mut self) -> Option<T::MessageIn> {
pub async fn recv(&mut self) -> Option<MessageIn> {
tokio::select! {
_ = self.tx.closed() => None,
msg = self.rx.recv() => msg,
}
}

/// Cast `&mut MessageBus<T>` as `&mut MessageBus<R>` only if they share the same message types
pub(crate) fn cast<R>(&mut self) -> &mut MessageBus<R>
where
R: BackgroundTask<MessageIn = T::MessageIn, MessageOut = T::MessageOut>,
{
// SAFETY: since MessageBus consits of only the `Sender` and `Receiver` and both should
// match.
unsafe { &mut *(self as *mut MessageBus<T> as *mut MessageBus<R>) }
}

/// Returns a [`Closed`] instance for this [`MessageBus`].
pub(crate) fn closed(&self) -> Closed<T> {
pub(crate) fn closed(&self) -> Closed<MessageOut> {
Closed(self.tx.clone())
}
}
Expand Down Expand Up @@ -93,9 +86,9 @@ impl<T: BackgroundTask> MessageBus<T> {
/// }
/// }
/// ```
pub(crate) struct Closed<T: BackgroundTask>(Sender<T::MessageOut>);
pub(crate) struct Closed<Out>(Sender<Out>);

impl<T: BackgroundTask> Closed<T> {
impl<T> Closed<T> {
/// Resolves the given [`Future`], unless the origin [`MessageBus`] closes first.
///
/// # Returns
Expand Down Expand Up @@ -158,20 +151,22 @@ where

async fn run(&mut self, message_bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
let RestartableBackgroundTaskWrapper { task } = self;
let task_bus = message_bus.cast();

match task.run(task_bus).await {
match task.run(message_bus).await {
Err(run_error) => {
let mut run_error = Some(run_error);

loop {
match task
.restart(run_error.take().expect("should contain an error"), task_bus)
.restart(
run_error.take().expect("should contain an error"),
message_bus,
)
.await
{
ControlFlow::Break(err) => return Err(err),
ControlFlow::Continue(()) => {
if let Err(err) = task.run(task_bus).await {
if let Err(err) = task.run(message_bus).await {
run_error = Some(err);
} else {
return Ok(());
Expand Down Expand Up @@ -206,7 +201,7 @@ impl<Id, MOut, Err> BackgroundTasks<Id, MOut, Err>
where
Id: fmt::Debug + Hash + PartialEq + Eq + Clone + Unpin,
Err: 'static + Send,
MOut: Send + Unpin,
MOut: 'static + Send + Unpin,
{
/// Registers a new background task in this struct. Returns a [`TaskSender`] that can be used to
/// send messages to the task. Dropping this sender will close the channel of messages
Expand Down Expand Up @@ -239,7 +234,7 @@ where
StreamNotifyClose::new(ReceiverStream::new(out_msg_rx)),
);

let mut message_bus = MessageBus {
let mut message_bus = MessageBus::<T> {
tx: out_msg_tx,
rx: in_msg_rx,
};
Expand Down

0 comments on commit ffdc1b0

Please sign in to comment.