Skip to content

Commit

Permalink
Docs and bus -> message_bus
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryDodzin committed Jan 28, 2025
1 parent 91f2341 commit 2393d06
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap, StreamNotifyC
pub struct MessageBus<T: BackgroundTask> {
tx: Sender<T::MessageOut>,
rx: Receiver<T::MessageIn>,
// Note if adding any new fields do look at `MessageBus::cast`'s unsafe block.
}

impl<T: BackgroundTask> MessageBus<T> {
Expand All @@ -37,10 +38,13 @@ impl<T: BackgroundTask> MessageBus<T> {
}
}

/// Cast `&mut MessageBus<T>` as `&mut MessageBus<R>` only if they share the same message types
pub fn cast<R>(&mut self) -> &mut MessageBus<R>
where
R: BackgroundTask<MessageIn = T::MessageIn, MessageOut = T::MessageOut>,
{
// SAFETY: since MessageBus consits of only the `Sender` and `Receiver` and both should
// match.
unsafe { &mut *(self as *mut MessageBus<T> as *mut MessageBus<R>) }
}

Expand Down Expand Up @@ -148,9 +152,9 @@ where
type MessageIn = T::MessageIn;
type MessageOut = T::MessageOut;

async fn run(&mut self, bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
async fn run(&mut self, message_bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
let RestartableBackgroundTaskWrapper { task } = self;
let task_bus = bus.cast();
let task_bus = message_bus.cast();

match task.run(task_bus).await {
Err(run_error) => {
Expand Down

0 comments on commit 2393d06

Please sign in to comment.