Skip to content

Commit 469a355

Browse files
committed
Add Repo::peer_state to find out about the sync state w.r.t a peer
and doc Problem: It's often useful to know what the remote state of a document we are synchronizing is. We have a lot of information about this at the automerge level but it's not exposed to users of `Repo`. Solution: Expose automerge sync state information as well as keep track of when we last received and sent messages.
1 parent e9eca65 commit 469a355

File tree

5 files changed

+429
-57
lines changed

5 files changed

+429
-57
lines changed

src/interfaces.rs

+13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
33
use std::{
44
fmt::{Display, Formatter},
55
str::FromStr,
6+
time::Instant,
67
};
78

89
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
@@ -193,3 +194,15 @@ pub trait Storage: Send {
193194
_full_doc: Vec<u8>,
194195
) -> BoxFuture<'static, Result<(), StorageError>>;
195196
}
197+
198+
/// The state of sycnhronization of a document with a remote peer obtained via [`RepoHandle::peer_state`](crate::RepoHandle::peer_state)
199+
pub struct PeerState {
200+
/// When we last received a message from this peer
201+
pub last_received: Option<Instant>,
202+
/// When we last sent a message to this peer
203+
pub last_sent: Option<Instant>,
204+
/// The heads of the document when we last sent a message
205+
pub last_sent_heads: Option<Vec<automerge::ChangeHash>>,
206+
/// The last heads of the document that the peer said they had
207+
pub last_acked_heads: Option<Vec<automerge::ChangeHash>>,
208+
}

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub use share_policy::{SharePolicy, SharePolicyError};
88

99
pub use crate::dochandle::DocHandle;
1010
pub use crate::interfaces::{
11-
DocumentId, Message, NetworkError, RepoId, RepoMessage, Storage, StorageError,
11+
DocumentId, Message, NetworkError, PeerState, RepoId, RepoMessage, Storage, StorageError,
1212
};
1313
pub use crate::network_connect::ConnDirection;
1414
pub use crate::repo::{Repo, RepoError, RepoHandle};

src/repo.rs

+152-56
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::dochandle::{DocHandle, SharedDocument};
2-
use crate::interfaces::{DocumentId, RepoId};
2+
use crate::interfaces::{DocumentId, PeerState, RepoId};
33
use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError};
44
use crate::share_policy::ShareDecision;
55
use crate::{share_policy, SharePolicy, SharePolicyError};
@@ -21,6 +21,7 @@ use std::mem;
2121
use std::sync::atomic::{AtomicUsize, Ordering};
2222
use std::sync::Arc;
2323
use std::thread::{self, JoinHandle};
24+
use std::time::Instant;
2425
use uuid::Uuid;
2526

2627
/// Front-end of the repo.
@@ -210,11 +211,27 @@ impl RepoHandle {
210211
})
211212
.expect("Failed to send repo event.");
212213
}
214+
215+
pub fn peer_state(
216+
&self,
217+
remote_id: RepoId,
218+
document: DocumentId,
219+
) -> RepoFuture<Option<PeerState>> {
220+
let (fut, resolver) = new_repo_future_with_resolver();
221+
self.repo_sender
222+
.send(RepoEvent::GetPeerState {
223+
remote_repo_id: remote_id,
224+
document_id: document,
225+
reply: resolver,
226+
})
227+
.expect("failed to send repo event");
228+
fut
229+
}
213230
}
214231

215232
/// Events sent by repo or doc handles to the repo.
216233
pub(crate) enum RepoEvent {
217-
/// Start processing a new document.
234+
/// Start processing a ew document.
218235
NewDoc(DocumentId, DocumentInfo),
219236
/// A document changed.
220237
DocChange(DocumentId),
@@ -239,6 +256,11 @@ pub(crate) enum RepoEvent {
239256
stream: Box<dyn Send + Unpin + Stream<Item = Result<RepoMessage, NetworkError>>>,
240257
sink: Box<dyn Send + Unpin + Sink<RepoMessage, Error = NetworkError>>,
241258
},
259+
GetPeerState {
260+
remote_repo_id: RepoId,
261+
document_id: DocumentId,
262+
reply: RepoFutureResolver<Option<PeerState>>,
263+
},
242264
/// Stop the repo.
243265
Stop,
244266
}
@@ -253,6 +275,7 @@ impl fmt::Debug for RepoEvent {
253275
RepoEvent::LoadDoc(_, _) => f.write_str("RepoEvent::LoadDoc"),
254276
RepoEvent::ListAllDocs(_) => f.write_str("RepoEvent::ListAllDocs"),
255277
RepoEvent::ConnectRemoteRepo { .. } => f.write_str("RepoEvent::ConnectRemoteRepo"),
278+
RepoEvent::GetPeerState { .. } => f.write_str("RepoEvent::GetPeerState"),
256279
RepoEvent::Stop => f.write_str("RepoEvent::Stop"),
257280
}
258281
}
@@ -573,19 +596,32 @@ pub(crate) struct DocumentInfo {
573596
last_heads: Vec<ChangeHash>,
574597
}
575598

576-
/// A state machine representing a connection between a remote repo and a particular document
577599
#[derive(Debug)]
578-
enum PeerConnection {
579-
/// we've accepted the peer and are syncing with them
580-
Accepted(SyncState),
581-
/// We're waiting for a response from the share policy
582-
PendingAuth { received_messages: Vec<SyncMessage> },
600+
struct PeerConnection {
601+
repo_id: RepoId,
602+
last_recv: Option<Instant>,
603+
last_send: Option<Instant>,
604+
state: PeerConnectionState,
583605
}
584606

585607
impl PeerConnection {
586-
fn pending() -> Self {
587-
PeerConnection::PendingAuth {
588-
received_messages: vec![],
608+
fn pending(repo_id: RepoId) -> Self {
609+
Self {
610+
repo_id,
611+
last_recv: None,
612+
last_send: None,
613+
state: PeerConnectionState::PendingAuth {
614+
received_messages: vec![],
615+
},
616+
}
617+
}
618+
619+
fn ready(repo_id: RepoId) -> Self {
620+
Self {
621+
repo_id,
622+
last_recv: None,
623+
last_send: None,
624+
state: PeerConnectionState::Accepted(SyncState::new()),
589625
}
590626
}
591627

@@ -594,23 +630,97 @@ impl PeerConnection {
594630
doc: &mut Automerge,
595631
msg: SyncMessage,
596632
) -> Result<(), automerge::AutomergeError> {
597-
match self {
598-
PeerConnection::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
599-
PeerConnection::PendingAuth { received_messages } => {
633+
self.last_recv = Some(Instant::now());
634+
match &mut self.state {
635+
PeerConnectionState::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
636+
PeerConnectionState::PendingAuth { received_messages } => {
600637
received_messages.push(msg);
601638
Ok(())
602639
}
603640
}
604641
}
605642

643+
fn generate_first_sync_message(
644+
&mut self,
645+
document: &mut automerge::Automerge,
646+
) -> Option<SyncMessage> {
647+
let msg = match &mut self.state {
648+
PeerConnectionState::PendingAuth { received_messages } => {
649+
let mut sync_state = SyncState::new();
650+
for msg in received_messages.drain(..) {
651+
document
652+
.receive_sync_message(&mut sync_state, msg)
653+
.expect("Failed to receive sync message.");
654+
}
655+
let message = document.generate_sync_message(&mut sync_state);
656+
self.state = PeerConnectionState::Accepted(sync_state);
657+
message
658+
}
659+
PeerConnectionState::Accepted(sync_state) => document.generate_sync_message(sync_state),
660+
};
661+
if msg.is_some() {
662+
self.last_send = Some(Instant::now());
663+
}
664+
msg
665+
}
666+
606667
fn generate_sync_message(&mut self, doc: &Automerge) -> Option<SyncMessage> {
607-
match self {
608-
Self::Accepted(sync_state) => doc.generate_sync_message(sync_state),
609-
Self::PendingAuth { .. } => None,
668+
let msg = match &mut self.state {
669+
PeerConnectionState::Accepted(sync_state) => doc.generate_sync_message(sync_state),
670+
PeerConnectionState::PendingAuth { .. } => None,
671+
};
672+
if msg.is_some() {
673+
self.last_send = Some(Instant::now());
674+
}
675+
msg
676+
}
677+
678+
fn promote_pending_peer(&mut self) -> Option<Vec<SyncMessage>> {
679+
if let PeerConnectionState::PendingAuth { received_messages } = &mut self.state {
680+
let result = std::mem::take(received_messages);
681+
self.state = PeerConnectionState::Accepted(SyncState::new());
682+
if !result.is_empty() {
683+
self.last_send = Some(Instant::now());
684+
}
685+
Some(result)
686+
} else {
687+
tracing::warn!(remote=%self.repo_id, "Tried to promote a peer which was not pending authorization");
688+
None
689+
}
690+
}
691+
692+
/// Get the state of synchronization with a remote peer and document
693+
fn peer_state(&self) -> PeerState {
694+
let last_sent_heads = match &self.state {
695+
PeerConnectionState::Accepted(sync_state) => Some(sync_state.last_sent_heads.clone()),
696+
PeerConnectionState::PendingAuth {
697+
received_messages: _,
698+
} => None,
699+
};
700+
let last_acked_heads = match &self.state {
701+
PeerConnectionState::Accepted(sync_state) => Some(sync_state.shared_heads.clone()),
702+
PeerConnectionState::PendingAuth {
703+
received_messages: _,
704+
} => None,
705+
};
706+
PeerState {
707+
last_received: self.last_recv,
708+
last_sent: self.last_send,
709+
last_sent_heads,
710+
last_acked_heads,
610711
}
611712
}
612713
}
613714

715+
/// A state machine representing a connection between a remote repo and a particular document
716+
#[derive(Debug)]
717+
enum PeerConnectionState {
718+
/// we've accepted the peer and are syncing with them
719+
Accepted(SyncState),
720+
/// We're waiting for a response from the share policy
721+
PendingAuth { received_messages: Vec<SyncMessage> },
722+
}
723+
614724
/// A change requested by a peer connection
615725
enum PeerConnCommand {
616726
/// Request authorization from the share policy
@@ -842,7 +952,7 @@ impl DocumentInfo {
842952
Entry::Vacant(entry) => {
843953
// if this is a new peer, request authorization
844954
commands.push(PeerConnCommand::RequestAuth(repo_id.clone()));
845-
entry.insert(PeerConnection::pending())
955+
entry.insert(PeerConnection::pending(repo_id.clone()))
846956
}
847957
Entry::Occupied(entry) => entry.into_mut(),
848958
};
@@ -861,48 +971,19 @@ impl DocumentInfo {
861971
///
862972
/// Returns any messages which the peer sent while we were waiting for authorization
863973
fn promote_pending_peer(&mut self, repo_id: &RepoId) -> Option<Vec<SyncMessage>> {
864-
if let Some(PeerConnection::PendingAuth { received_messages }) =
865-
self.peer_connections.remove(repo_id)
866-
{
867-
self.peer_connections
868-
.insert(repo_id.clone(), PeerConnection::Accepted(SyncState::new()));
869-
Some(received_messages)
870-
} else {
871-
tracing::warn!(remote=%repo_id, "Tried to promote a peer which was not pending authorization");
872-
None
873-
}
974+
self.peer_connections
975+
.get_mut(repo_id)
976+
.map(|c| c.promote_pending_peer())
977+
.unwrap_or_default()
874978
}
875979

876980
/// Potentially generate an outgoing sync message.
877981
fn generate_first_sync_message(&mut self, repo_id: RepoId) -> Option<SyncMessage> {
878-
match self.peer_connections.entry(repo_id) {
879-
Entry::Vacant(entry) => {
880-
let mut sync_state = SyncState::new();
881-
let document = self.document.read();
882-
let message = document.automerge.generate_sync_message(&mut sync_state);
883-
entry.insert(PeerConnection::Accepted(sync_state));
884-
message
885-
}
886-
Entry::Occupied(mut entry) => match entry.get_mut() {
887-
PeerConnection::PendingAuth { received_messages } => {
888-
let mut document = self.document.write();
889-
let mut sync_state = SyncState::new();
890-
for msg in received_messages.drain(..) {
891-
document
892-
.automerge
893-
.receive_sync_message(&mut sync_state, msg)
894-
.expect("Failed to receive sync message.");
895-
}
896-
let message = document.automerge.generate_sync_message(&mut sync_state);
897-
entry.insert(PeerConnection::Accepted(sync_state));
898-
message
899-
}
900-
PeerConnection::Accepted(ref mut sync_state) => {
901-
let document = self.document.read();
902-
document.automerge.generate_sync_message(sync_state)
903-
}
904-
},
905-
}
982+
let conn = self
983+
.peer_connections
984+
.entry(repo_id.clone())
985+
.or_insert_with(|| PeerConnection::ready(repo_id));
986+
conn.generate_first_sync_message(&mut self.document.write().automerge)
906987
}
907988

908989
/// Generate outgoing sync message for all repos we are syncing with.
@@ -916,6 +997,10 @@ impl DocumentInfo {
916997
})
917998
.collect()
918999
}
1000+
1001+
fn get_peer_state(&self, peer: &RepoId) -> Option<PeerState> {
1002+
self.peer_connections.get(peer).map(|p| p.peer_state())
1003+
}
9191004
}
9201005

9211006
/// Signal that the stream or sink on the network adapter is ready to be polled.
@@ -1522,6 +1607,17 @@ impl Repo {
15221607
self.sinks_to_poll.insert(repo_id.clone());
15231608
self.streams_to_poll.insert(repo_id);
15241609
}
1610+
RepoEvent::GetPeerState {
1611+
remote_repo_id,
1612+
document_id,
1613+
mut reply,
1614+
} => {
1615+
reply.resolve_fut(
1616+
self.documents
1617+
.get(&document_id)
1618+
.and_then(|info| info.get_peer_state(&remote_repo_id)),
1619+
);
1620+
}
15251621
RepoEvent::Stop => {
15261622
// Handled in the main run loop.
15271623
}

tests/network/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod document_list;
1313
mod document_load;
1414
mod document_request;
1515
mod document_save;
16+
mod peer_state;
1617

1718
use test_log::test;
1819

0 commit comments

Comments
 (0)