Skip to content

Commit

Permalink
Merge pull request #104 from zeromq/alex/monitor-sockets
Browse files Browse the repository at this point in the history
Initial version of socket monitor impl. Related to #103
  • Loading branch information
Alexei-Kornienko authored Dec 10, 2020
2 parents 8e75a13 + ae103f3 commit f5f53a6
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 32 deletions.
9 changes: 8 additions & 1 deletion src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::{MultiPeerBackend, SocketBackend, SocketType, ZmqError, ZmqResult};
use crate::{MultiPeerBackend, SocketBackend, SocketEvent, SocketType, ZmqError, ZmqResult};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::SinkExt;
use parking_lot::Mutex;
use std::sync::Arc;
Expand All @@ -17,6 +18,7 @@ pub(crate) struct GenericSocketBackend {
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

impl GenericSocketBackend {
Expand All @@ -29,6 +31,7 @@ impl GenericSocketBackend {
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
socket_monitor: Mutex::new(None),
}
}

Expand Down Expand Up @@ -85,6 +88,10 @@ impl SocketBackend for GenericSocketBackend {
fn shutdown(&self) {
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

impl MultiPeerBackend for GenericSocketBackend {
Expand Down
12 changes: 11 additions & 1 deletion src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use crate::codec::{Message, ZmqFramedRead};
use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqMessage, ZmqResult};
use crate::{
Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType, ZmqMessage,
ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down Expand Up @@ -42,6 +46,12 @@ impl Socket for DealerSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl DealerSocket {
Expand Down
73 changes: 69 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ use util::PeerIdentity;
extern crate enum_primitive_derive;

use async_trait::async_trait;
use futures::channel::mpsc;
use futures_codec::FramedWrite;
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -102,6 +104,19 @@ impl Display for SocketType {
}
}

#[derive(Debug)]
pub enum SocketEvent {
Connected(Endpoint, PeerIdentity),
ConnectDelayed,
ConnectRetried,
Listening(Endpoint),
Accepted(Endpoint, PeerIdentity),
AcceptFailed(ZmqError),
Closed,
CloseFailed,
Disconnected(PeerIdentity),
}

pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this
Expand All @@ -113,6 +128,7 @@ pub trait MultiPeerBackend: SocketBackend {
pub trait SocketBackend: Send + Sync {
fn socket_type(&self) -> SocketType;
fn shutdown(&self);
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>>;
}

#[async_trait]
Expand Down Expand Up @@ -148,9 +164,39 @@ pub trait Socket: Sized + Send {
let endpoint = endpoint.try_into()?;

let cloned_backend = self.backend();
let cback = move |result| util::peer_connected(result, cloned_backend.clone());
let cback = move |result| {
let cloned_backend = cloned_backend.clone();
async move {
let result = match result {
Ok((socket, endpoint)) => {
match util::peer_connected(socket, cloned_backend.clone()).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
};
match result {
Ok((endpoint, peer_id)) => {
if let Some(monitor) = cloned_backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Accepted(endpoint, peer_id));
}
}
Err(e) => {
if let Some(monitor) = cloned_backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::AcceptFailed(e));
}
}
}
}
};

let (endpoint, stop_handle) = transport::begin_accept(endpoint, cback).await?;

if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Listening(endpoint.clone()));
}

self.binds().insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
Expand Down Expand Up @@ -190,11 +236,30 @@ pub trait Socket: Sized + Send {
let backend = self.backend();
let endpoint = endpoint.try_into()?;

let connect_result = transport::connect(endpoint).await;
util::peer_connected(connect_result, backend).await;
Ok(())
let result = match transport::connect(endpoint).await {
Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
},
Err(e) => Err(e),
};
match result {
Ok((endpoint, peer_id)) => {
if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(endpoint, peer_id));
}
Ok(())
}
Err(e) => Err(e),
}
}

/// Creates and setups new socket monitor
///
/// Subsequent calls to this method each create a new monitor channel.
/// Sender side of previous one is dropped.
fn monitor(&mut self) -> mpsc::Receiver<SocketEvent>;

// TODO: async fn connections(&self) -> ?

/// Disconnects from the given endpoint, blocking until finished.
Expand Down
19 changes: 17 additions & 2 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use crate::error::ZmqResult;
use crate::message::*;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{BlockingSend, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqError};
use futures::channel::oneshot;
use crate::{
BlockingSend, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType, ZmqError,
};
use futures::channel::{mpsc, oneshot};

use async_trait::async_trait;
use dashmap::DashMap;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::pin::Pin;
Expand All @@ -22,6 +25,7 @@ pub(crate) struct Subscriber {

pub(crate) struct PubSocketBackend {
subscribers: DashMap<PeerIdentity, Subscriber>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

impl PubSocketBackend {
Expand Down Expand Up @@ -81,6 +85,10 @@ impl SocketBackend for PubSocketBackend {
fn shutdown(&self) {
self.subscribers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

impl MultiPeerBackend for PubSocketBackend {
Expand Down Expand Up @@ -184,6 +192,7 @@ impl Socket for PubSocket {
Self {
backend: Arc::new(PubSocketBackend {
subscribers: DashMap::new(),
socket_monitor: Mutex::new(None),
}),
binds: HashMap::new(),
}
Expand All @@ -196,6 +205,12 @@ impl Socket for PubSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[cfg(test)]
Expand Down
12 changes: 11 additions & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use crate::codec::{Message, ZmqFramedRead};
use crate::fair_queue::FairQueue;
use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{BlockingRecv, Endpoint, MultiPeerBackend, Socket, SocketType, ZmqMessage, ZmqResult};
use crate::{
BlockingRecv, Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketType, ZmqMessage,
ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down Expand Up @@ -37,6 +41,12 @@ impl Socket for PullSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[async_trait]
Expand Down
11 changes: 9 additions & 2 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use crate::backend::GenericSocketBackend;
use crate::codec::Message;
use crate::transport::AcceptStopHandle;
use crate::{
BlockingSend, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketType, ZmqMessage,
ZmqResult,
BlockingSend, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketType,
ZmqMessage, ZmqResult,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -36,6 +37,12 @@ impl Socket for PushSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle, RandomState> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

#[async_trait]
Expand Down
15 changes: 15 additions & 0 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct RepPeer {
struct RepSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}

pub struct RepSocket {
Expand All @@ -44,6 +45,7 @@ impl Socket for RepSocket {
backend: Arc::new(RepSocketBackend {
peers: DashMap::new(),
fair_queue_inner: fair_queue.inner(),
socket_monitor: Mutex::new(None),
}),
current_request: None,
fair_queue,
Expand All @@ -58,6 +60,12 @@ impl Socket for RepSocket {
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
&mut self.binds
}

fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
let (sender, receiver) = mpsc::channel(1024);
self.backend.socket_monitor.lock().replace(sender);
receiver
}
}

impl MultiPeerBackend for RepSocketBackend {
Expand All @@ -77,6 +85,9 @@ impl MultiPeerBackend for RepSocketBackend {
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}
self.peers.remove(peer_id);
}
}
Expand All @@ -89,6 +100,10 @@ impl SocketBackend for RepSocketBackend {
fn shutdown(&self) {
self.peers.clear();
}

fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit f5f53a6

Please sign in to comment.