Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle new-initiator/new-responder correctly #64

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/chat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ fn main() {

// Connect to server
let (connect_future, event_channel) = saltyrtc_client::connect(
"localhost",
8765,
"server.saltyrtc.org",
443,
Some(tls_connector),
&core.handle(),
salty_arc.clone(),
Expand Down Expand Up @@ -537,7 +537,7 @@ fn setup_logging(role: Role, log_to_stdout: bool) -> Config {
.unwrap();

// Instantiate filters
let info_filter = ThresholdFilter::new(LevelFilter::Info);
let info_filter = ThresholdFilter::new(LevelFilter::Debug);

// Config builder
let builder = Config::builder()
Expand Down
18 changes: 17 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ pub enum Event {
/// Server handshake is done.
///
/// The boolean indicates whether a peer is already
/// connected + authenticated.
/// connected + authenticated towards the server.
ServerHandshakeDone(bool),

/// Peer handshake is done.
Expand Down Expand Up @@ -643,6 +643,9 @@ pub fn do_handshake(
late_error = Some(e);
}
},
HandleAction::Close(code) => {
messages.push(OwnedMessage::Close(Some(CloseData::new(code.as_number(), "".to_string()))))
},
}
}

Expand Down Expand Up @@ -769,6 +772,10 @@ pub fn task_loop(
HandleAction::Reply(bbox) => out_messages.push(OwnedMessage::Binary(bbox.into_bytes())),
HandleAction::TaskMessage(msg) => {
if let TaskMessage::Close(_) = msg {
out_messages.push(OwnedMessage::Close(Some(CloseData {
status_code: 1000,
reason: "".to_string(),
})));
close_stream = true;
}

Expand All @@ -790,6 +797,13 @@ pub fn task_loop(
HandleAction::HandshakeError(_) => return boxed!(future::err(Err(
SaltyError::Crash("Got HandleAction::HandshakeError in task loop".into())
))),
HandleAction::Close(code) => {
out_messages.push(OwnedMessage::Close(Some(CloseData {
status_code: code.as_number(),
reason: "".to_string(),
})));
close_stream = true;
},
}
}

Expand Down Expand Up @@ -824,6 +838,8 @@ pub fn task_loop(
out_future
.join(in_future)
.and_then(move |_| if close_stream {
// TODO(dbrgn): Tear down all futures to stop the loop

// Stop processing stream
Err(Ok(()))
} else {
Expand Down
34 changes: 31 additions & 3 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ pub(crate) trait Signaling {
let role = self.role();
let peer: &mut dyn PeerContext = self.get_peer_with_address_mut(nonce.source()).ok_or_else(|| {
if role == Role::Initiator && nonce.source().is_responder() {
ValidationError::Fail(format!("Could not find responder with address {}", nonce.source()))
// Note: This can happen since a message from a responder may still be in flight.
ValidationError::DropMsg(format!("Could not find responder with address {}", nonce.source()))
} else {
ValidationError::Crash("Got message from invalid sender that wasn't dropped".into())
}
Expand Down Expand Up @@ -230,7 +231,8 @@ pub(crate) trait Signaling {
let role = self.role();
let peer: &mut dyn PeerContext = self.get_peer_with_address_mut(nonce.source()).ok_or_else(|| {
if role == Role::Initiator && nonce.source().is_responder() {
ValidationError::Fail(format!("Could not find responder with address {}", nonce.source()))
// Note: This can happen since a message from a responder may still be in flight.
ValidationError::DropMsg(format!("Could not find responder with address {}", nonce.source()))
} else {
ValidationError::Crash("Got message from invalid sender that wasn't dropped".into())
}
Expand Down Expand Up @@ -975,6 +977,8 @@ impl Signaling for InitiatorSignaling {
Identity::Responder(_) => {
if self.common().signaling_state() == SignalingState::Task {
// If we've already selected a peer, return it if it matches the address.
// Note: This is a deviation from the SaltyRTC protocol which allows
// connections to multiple responders.
let peer = self.responder.as_mut().map(|p| p as &mut dyn PeerContext);
let valid = match peer {
Some(ref p) => {
Expand Down Expand Up @@ -1517,6 +1521,20 @@ impl InitiatorSignaling {
}

fn process_new_responder(&mut self, address: Address) -> SignalingResult<Option<HandleAction>> {
// Drop a new responder after a handshake with one responder has already
// completed.
//
// Note: This deviates from the intention of the specification to allow
// for more than one connection towards a responder over the same
// WebSocket connection.
let signaling_state = self.common().signaling_state();
if signaling_state == SignalingState::Task {
debug!("Dropping responder {:?} in state {:?}", address, signaling_state);
return self
.send_drop_responder(address, DropReason::DroppedByInitiator)
.map(Option::Some);
}

// If a responder with the same id already exists,
// all currently cached information about and for the previous responder
// (such as cookies and the sequence number) MUST be deleted first.
Expand Down Expand Up @@ -1790,7 +1808,16 @@ impl Signaling for ResponderSignaling {
fn handle_new_initiator(&mut self, _msg: NewInitiator) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received new-initiator from server");

let mut actions: Vec<HandleAction> = vec![];
// Close when a new initiator has connected.
//
// Note: This deviates from the intention of the specification to allow
// for more than one connection towards an initiator over the same
// WebSocket connection.
let signaling_state = self.common().signaling_state();
if signaling_state == SignalingState::Task {
debug!("Received new-initiator message in state {:?}, closing", signaling_state);
return Ok(vec![HandleAction::Close(CloseCode::WsClosingNormal)])
}

// A responder who receives a 'new-initiator' message MUST proceed by
// deleting all currently cached information about and for the previous
Expand All @@ -1811,6 +1838,7 @@ impl Signaling for ResponderSignaling {
return Err(SignalingError::Crash("No auth provider set".into()));
},
}
let mut actions: Vec<HandleAction> = vec![];
if send_token {
let old_auth_provider = mem::replace(&mut self.common_mut().auth_provider, None);
if let Some(AuthProvider::Token(token)) = old_auth_provider {
Expand Down
30 changes: 30 additions & 0 deletions src/protocol/tests/signaling_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ mod client_auth {
HandleAction::HandshakeError(_) => panic!("Unexpected HandshakeError"),
HandleAction::TaskMessage(_) => panic!("Unexpected TaskMessage"),
HandleAction::Event(_) => panic!("Unexpected Event"),
HandleAction::Close(_) => panic!("Unexpected Close"),
};

let decrypted = OpenBox::<Message>::decrypt(
Expand Down Expand Up @@ -1431,3 +1432,32 @@ mod disconnected {
assert_eq!(actions[0], HandleAction::Event(Event::Disconnected(7)));
}
}

mod regressions {
use super::*;

/// When a responder has been dropped, a message may still be in flight
/// and could be received by the initiator. The initiator should ignore
/// the message and not fail with a protocol error.
/// See: https://github.com/saltyrtc/saltyrtc-client-rs/pull/59
#[test]
fn ignore_in_flight_responder_message() {
let peer_trusted_pk = PublicKey::random();
let mut ctx = TestContext::initiator(
ClientIdentity::Initiator, None,
SignalingState::PeerHandshake, ServerHandshakeState::Done
);

// Encrypt message
let responder_ks = KeyPair::new();
let msg = Message::Close(Close::from_close_code(CloseCode::NoSharedTask));
let bbox = TestMsgBuilder::new(msg).from(3).to(1)
.build(Cookie::random(),
&responder_ks,
ctx.our_ks.public_key());

// Handle message
let actions = ctx.signaling.handle_message(bbox).unwrap();
assert_eq!(actions.len(), 0);
}
}
4 changes: 3 additions & 1 deletion src/protocol/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::result::Result as StdResult;
use serde::ser::{Serialize, Serializer};
use serde::de::{Deserialize, Deserializer, Visitor, Error as SerdeError};

use crate::Event;
use crate::{Event, CloseCode};
use crate::boxes::ByteBox;
use crate::errors::SaltyError;
use crate::tasks::TaskMessage;
Expand Down Expand Up @@ -237,6 +237,8 @@ pub(crate) enum HandleAction {
Event(Event),
/// A task message was received and decoded.
TaskMessage(TaskMessage),
/// Close the websocket with a specific close code.
Close(CloseCode),
}


Expand Down