diff --git a/src/api/controller.rs b/src/api/controller.rs index 853f1e5..7d61cc3 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -39,7 +39,7 @@ where /// Details about the receiving end are left to the implementor. pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users without blocking - fn send(&self, x: T) -> ControllerResult<()>; + fn send(&self, x: T) -> ControllerResult>; } /// Asynchronous and thread-safe handle to receive data from a stream. diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9f1a9d1..2e71429 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -58,7 +58,7 @@ pub(crate) struct BufferControllerInner { pub(crate) path: String, pub(crate) latest_version: watch::Receiver, pub(crate) local_version: watch::Receiver, - pub(crate) ops_in: mpsc::UnboundedSender, + pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender)>, pub(crate) poller: mpsc::UnboundedSender>, pub(crate) content_request: mpsc::Sender>, pub(crate) delta_request: mpsc::Sender>>, @@ -71,9 +71,10 @@ pub(crate) struct BufferControllerInner { impl Controller for BufferController {} impl AsyncSender for BufferController { - fn send(&self, op: TextChange) -> ControllerResult<()> { - self.0.ops_in.send(op)?; - Ok(()) + fn send(&self, op: TextChange) -> ControllerResult> { + let (tx, rx) = oneshot::channel(); + self.0.ops_in.send((op, tx))?; + Ok(async move { rx.await.unwrap_or(false) }) } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index 9b6b434..68d7aa6 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -23,7 +23,7 @@ struct BufferWorker { latest_version: watch::Sender, local_version: watch::Sender, ack_rx: mpsc::UnboundedReceiver, - ops_in: mpsc::UnboundedReceiver, + ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender)>, poller: mpsc::UnboundedReceiver>, pollers: Vec>, content_checkout: mpsc::Receiver>, @@ -133,7 +133,7 @@ impl BufferController { // received a text change from editor res = worker.ops_in.recv() => match res { None => break tracing::debug!("stopping: editor closed channel"), - Some(change) => worker.handle_editor_change(change, &tx).await, + Some((change, sent)) => worker.handle_editor_change(change, sent, &tx).await, }, // received a message from server: add to oplog and update latest version (+unlock pollers) @@ -170,7 +170,7 @@ impl BufferController { impl BufferWorker { #[tracing::instrument(skip(self, tx))] - async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender) { + async fn handle_editor_change(&mut self, change: TextChange, sent: oneshot::Sender, tx: &mpsc::Sender) { let last_ver = self.oplog.local_version(); // clip to buffer extents let clip_start = change.start_idx as usize; @@ -199,14 +199,17 @@ impl BufferWorker { tx.send(Operation { data: self.oplog.encode_from(ENCODE_PATCH, &last_ver), }) - .await - .unwrap_or_warn("failed to send change!"); + .await + .unwrap_or_warn("failed to send change!"); self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); self.local_version .send(self.branch.local_version()) .unwrap_or_warn("failed to update local version!"); + let _ = sent.send(true); + } else { + let _ = sent.send(false); } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index d0c544c..2a40c31 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -45,7 +45,7 @@ impl Controller for CursorController {} #[cfg_attr(feature = "async-trait", async_trait::async_trait)] impl AsyncSender for CursorController { - fn send(&self, mut cursor: Selection) -> ControllerResult<()> { + fn send(&self, mut cursor: Selection) -> ControllerResult> { if cursor.start_row > cursor.end_row || (cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col) { @@ -53,7 +53,7 @@ impl AsyncSender for CursorController { std::mem::swap(&mut cursor.start_col, &mut cursor.end_col); } - Ok(self.0.op.send(CursorPosition { + self.0.op.send(CursorPosition { buffer: BufferNode { path: cursor.buffer, }, @@ -65,7 +65,9 @@ impl AsyncSender for CursorController { row: cursor.end_row, col: cursor.end_col, }, - })?) + })?; + + Ok(std::future::ready(true)) } }