diff --git a/node/src/hopper/live_cores_package.rs b/node/src/hopper/live_cores_package.rs index 9ec210678..8ed830b94 100644 --- a/node/src/hopper/live_cores_package.rs +++ b/node/src/hopper/live_cores_package.rs @@ -302,7 +302,7 @@ mod tests { let encrypted_payload = encodex(cryptde, &first_stop_key, &payload).unwrap(); let paying_wallet = make_paying_wallet(b"wallet"); let contract_address = TEST_DEFAULT_CHAIN.rec().contract; - let mut route = Route::round_trip( + let route = Route::round_trip( RouteSegment::new(vec![&relay_key, &first_stop_key], Component::Neighborhood), RouteSegment::new( vec![&first_stop_key, &relay_key, &second_stop_key], @@ -310,10 +310,10 @@ mod tests { ), cryptde, Some(paying_wallet.clone()), - 1234, Some(contract_address), ) .unwrap(); + let mut route = route.set_return_route_id(cryptde, 1234); route.shift(&relay_cryptde).unwrap(); let subject = LiveCoresPackage::new(route.clone(), encrypted_payload.clone()); diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index bb85ea8ba..f81911c0c 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -105,7 +105,6 @@ pub struct Neighborhood { mode: NeighborhoodModeLight, min_hops: Hops, db_patch_size: u8, - next_return_route_id: u32, overall_connection_status: OverallConnectionStatus, chain: Chain, crashable: bool, @@ -431,7 +430,6 @@ impl Neighborhood { mode, min_hops, db_patch_size, - next_return_route_id: 0, overall_connection_status, chain: config.blockchain_bridge_config.chain, crashable: config.crash_point == CrashPoint::Message, @@ -980,7 +978,6 @@ impl Neighborhood { } fn zero_hop_route_response(&mut self) -> RouteQueryResponse { - let return_route_id = self.advance_return_route_id(); let route = Route::round_trip( RouteSegment::new( vec![self.cryptde.public_key(), self.cryptde.public_key()], @@ -992,7 +989,6 @@ impl Neighborhood { ), self.cryptde, None, - return_route_id, None, ) .expect("Couldn't create route"); @@ -1001,7 +997,6 @@ impl Neighborhood { expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - return_route_id, ), } } @@ -1067,21 +1062,18 @@ impl Neighborhood { Err(e) => return Err(e), }; - let return_route_id = self.advance_return_route_id(); Ok(RouteQueryResponse { route: Route::round_trip( over, back, self.cryptde, self.consuming_wallet_opt.clone(), - return_route_id, Some(self.chain.rec().contract), ) .expect("Internal error: bad route"), expected_services: ExpectedServices::RoundTrip( expected_request_services, expected_response_services, - return_route_id, ), }) } @@ -1323,12 +1315,6 @@ impl Neighborhood { } } - fn advance_return_route_id(&mut self) -> u32 { - let return_route_id = self.next_return_route_id; - self.next_return_route_id = return_route_id.wrapping_add(1); - return_route_id - } - pub fn find_exit_locations<'a>( &'a self, source: &'a PublicKey, @@ -3382,7 +3368,6 @@ mod tests { ), cryptde, None, - 0, None, ) .unwrap(), @@ -3403,7 +3388,6 @@ mod tests { ), ExpectedService::Nothing, ], - 0, ), }; assert_eq!(expected_response, result); @@ -3455,39 +3439,17 @@ mod tests { ), cryptde, None, - 0, None, ) .unwrap(), expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - 0, ), }; assert_eq!(result, expected_response); } - #[test] - fn zero_hop_routing_handles_return_route_id_properly() { - let mut subject = make_standard_subject(); - let result0 = subject.zero_hop_route_response(); - let result1 = subject.zero_hop_route_response(); - - let return_route_id_0 = match result0.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - - let return_route_id_1 = match result1.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - - assert_eq!(return_route_id_0, 0); - assert_eq!(return_route_id_1, 1); - } - /* Database: @@ -3546,7 +3508,6 @@ mod tests { segment(&[r, q, p], &Component::ProxyServer), cryptde, consuming_wallet_opt, - 0, Some(contract_address), ) .unwrap(), @@ -3577,10 +3538,9 @@ mod tests { ), ExpectedService::Nothing, ], - 0, ), }; - assert_eq!(expected_response, result); + assert_eq!(result, expected_response); } #[test] @@ -3599,18 +3559,6 @@ mod tests { ); } - #[test] - fn next_return_route_id_wraps_around() { - let mut subject = make_standard_subject(); - subject.next_return_route_id = 0xFFFFFFFF; - - let end = subject.advance_return_route_id(); - let beginning = subject.advance_return_route_id(); - - assert_eq!(end, 0xFFFFFFFF); - assert_eq!(beginning, 0x00000000); - } - /* Database: @@ -3619,37 +3567,6 @@ mod tests { Tests will be written from the viewpoint of O. */ - #[test] - fn return_route_ids_increase() { - let cryptde = main_cryptde(); - let system = System::new("return_route_ids_increase"); - let (_, _, _, mut subject) = make_o_r_e_subject(); - subject.min_hops = Hops::TwoHops; - let addr: Addr = subject.start(); - let sub: Recipient = addr.recipient::(); - - let data_route_0 = sub.send(RouteQueryMessage::data_indefinite_route_request(None, 2000)); - let data_route_1 = sub.send(RouteQueryMessage::data_indefinite_route_request(None, 3000)); - - System::current().stop_with_code(0); - system.run(); - let result_0 = data_route_0.wait().unwrap().unwrap(); - let result_1 = data_route_1.wait().unwrap().unwrap(); - let juicy_parts = |result: RouteQueryResponse| { - let last_element = result.route.hops.last().unwrap(); - let last_element_dec = cryptde.decode(last_element).unwrap(); - let network_return_route_id: u32 = - serde_cbor::de::from_slice(last_element_dec.as_slice()).unwrap(); - let metadata_return_route_id = match result.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - (network_return_route_id, metadata_return_route_id) - }; - assert_eq!(juicy_parts(result_0), (0, 0)); - assert_eq!(juicy_parts(result_1), (1, 1)); - } - #[test] fn handle_neighborhood_graph_message_works() { let test_name = "handle_neighborhood_graph_message_works"; @@ -4533,7 +4450,6 @@ mod tests { error_expectation, "Cannot make multi_hop with unknown neighbor" ); - assert_eq!(subject.next_return_route_id, 0); } #[test] @@ -7017,7 +6933,7 @@ mod tests { let hops = result.clone().unwrap().route.hops; let actual_keys: Vec = match hops.as_slice() { - [hop, exit, hop_back, origin, empty, _accounting] => vec![ + [hop, exit, hop_back, origin, empty] => vec![ decodex::(main_cryptde(), hop) .expect("hop") .public_key, @@ -7034,7 +6950,11 @@ mod tests { .expect("empty") .public_key, ], - l => panic!("our match is wrong, real size is {}, {:?}", l.len(), l), + l => panic!( + "our match is wrong, real size is {} instead of 5, {:?}", + l.len(), + l + ), }; let expected_public_keys = vec![ next_door_neighbor.public_key().clone(), @@ -7072,24 +6992,23 @@ mod tests { } }; /* - This is how the route_hops vector looks like: [C1, C2, ..., C(nodes_count), ..., C2, C1, accounting] + This is how the route_hops vector looks like: [C1, C2, ..., C(nodes_count), ..., C2, C1] Let's consider for 3-hop route ==> Nodes Count --> 4 Route Length --> 8 - Route Hops --> [C1, C2, C3, C4, C3, C2, C1, accounting] + Route Hops --> [C1, C2, C3, C4, C3, C2, C1] Over Route --> [C1, C2, C3] Back Route --> [C4, C3, C2, C1] */ - let mut route_hops = result.unwrap().route.hops; + let route_hops = result.unwrap().route.hops; let route_length = route_hops.len(); - let _accounting = route_hops.pop(); let over_route = &route_hops[..hops]; let back_route = &route_hops[hops..]; let over_cryptdes = cryptdes_from_node_records(&nodes[..hops]); let mut back_cryptdes = cryptdes_from_node_records(&nodes); back_cryptdes.reverse(); - assert_eq!(route_length, 2 * nodes_count); + assert_eq!(route_length, 2 * nodes_count - 1); assert_hops(over_cryptdes, over_route); assert_hops(back_cryptdes, back_route); } @@ -7169,7 +7088,7 @@ mod tests { let (over, back) = match response.expected_services { ExpectedServices::OneWay(_) => panic!("Expecting RoundTrip"), - ExpectedServices::RoundTrip(o, b, _) => (o[1].clone(), b[1].clone()), + ExpectedServices::RoundTrip(o, b) => (o[1].clone(), b[1].clone()), }; let extract_key = |es: ExpectedService| match es { ExpectedService::Routing(pk, _, _) => pk, @@ -7563,24 +7482,6 @@ mod tests { subject } - fn make_o_r_e_subject() -> (NodeRecord, NodeRecord, NodeRecord, Neighborhood) { - let mut subject = make_standard_subject(); - let o = &subject.neighborhood_database.root().clone(); - let r = &make_node_record(4567, false); - let e = &make_node_record(5678, false); - { - let db = &mut subject.neighborhood_database; - db.add_node(r.clone()).unwrap(); - db.add_node(e.clone()).unwrap(); - let mut dual_edge = |a: &NodeRecord, b: &NodeRecord| { - db.add_arbitrary_full_neighbor(a.public_key(), b.public_key()) - }; - dual_edge(o, r); - dual_edge(r, e); - } - (o.clone(), r.clone(), e.clone(), subject) - } - fn segment(nodes: &[&NodeRecord], component: &Component) -> RouteSegment { RouteSegment::new( nodes.into_iter().map(|n| n.public_key()).collect(), diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 1d29c2415..301ce98ec 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -50,6 +50,7 @@ use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::MutabilityConflictHelper; use regex::Regex; +use std::cell::Cell; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::rc::Rc; @@ -58,7 +59,8 @@ use std::time::{Duration, SystemTime}; use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; -pub const RETURN_ROUTE_TTL: Duration = Duration::from_secs(120); +pub const RETURN_ROUTE_TTL_FIRST_CHANCE: Duration = Duration::from_secs(120); +pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(5); pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30); @@ -89,10 +91,15 @@ pub struct ProxyServer { alias_cryptde: &'static dyn CryptDE, crashable: bool, logger: Logger, - route_ids_to_return_routes: TtlHashMap, + // Holds return-route information for requests that have not yet seen any responses + route_ids_to_return_routes_first_chance: TtlHashMap, + // Holds return-route information for requests that have seen at least one response and may + // see more in the future. The near future, because this TTL is much shorter. + route_ids_to_return_routes_stragglers: TtlHashMap, browser_proxy_sequence_offset: bool, inbound_client_data_helper_opt: Option>, stream_key_purge_delay: Duration, + next_return_route_id: Cell, is_running_in_integration_test: bool, } @@ -139,8 +146,13 @@ impl Handler for ProxyServer { type Result = (); fn handle(&mut self, msg: AddReturnRouteMessage, _ctx: &mut Self::Context) -> Self::Result { - self.route_ids_to_return_routes + let return_route_id = msg.return_route_id; + self.route_ids_to_return_routes_first_chance .insert(msg.return_route_id, msg); + debug!( + self.logger, + "Added return route info RRI{} to first-chance cache", return_route_id + ); } } @@ -163,12 +175,17 @@ impl Handler for ProxyServer { type Result = (); fn handle(&mut self, msg: AddRouteResultMessage, _ctx: &mut Self::Context) -> Self::Result { - let dns_failure = self - .dns_failure_retries - .get(&msg.stream_key) - .unwrap_or_else(|| { - panic!("AddRouteResultMessage Handler: stream key: {} not found within dns_failure_retries", msg.stream_key); - }); + let dns_failure = match self.dns_failure_retries.get(&msg.stream_key) { + Some(retry) => retry, + None => { + error!( + self.logger, + "AddRouteResultMessage stream key {} not found within dns_failure_retries", + msg.stream_key + ); + return; + } + }; match msg.result { Ok(route_query_response) => { @@ -257,6 +274,8 @@ impl ProxyServer { crashable: bool, is_running_in_integration_test: bool, ) -> ProxyServer { + let ps_logger = Logger::new("ProxyServer"); + let hm_logger = ps_logger.clone(); ProxyServer { subs: None, client_request_payload_factory: Box::new(ClientRequestPayloadFactoryReal::new()), @@ -271,11 +290,22 @@ impl ProxyServer { main_cryptde, alias_cryptde, crashable, - logger: Logger::new("ProxyServer"), - route_ids_to_return_routes: TtlHashMap::new(RETURN_ROUTE_TTL), + logger: ps_logger, + route_ids_to_return_routes_first_chance: TtlHashMap::new(RETURN_ROUTE_TTL_FIRST_CHANCE), + route_ids_to_return_routes_stragglers: TtlHashMap::new_with_retire( + RETURN_ROUTE_TTL_STRAGGLERS, + move |k, _| { + debug!( + hm_logger, + "Return route info RRI{} expired from straggler cache", *k + ); + true + }, + ), browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, + next_return_route_id: Cell::new(1), is_running_in_integration_test, } } @@ -362,8 +392,12 @@ impl ProxyServer { } fn handle_dns_resolve_failure(&mut self, msg: &ExpiredCoresPackage) { + let return_route_id = match self.rri_from_remaining_route(&msg.remaining_route) { + Some(rri) => rri, + None => return, // TODO: Eventually we'll have to do something better here, but we'll probably need some heuristics. + }; let return_route_info = - match self.get_return_route_info(&msg.remaining_route, "dns resolve failure") { + match self.get_return_route_info(return_route_id, "dns resolve failure") { Some(rri) => rri, None => return, // TODO: Eventually we'll have to do something better here, but we'll probably need some heuristics. }; @@ -503,11 +537,15 @@ impl ProxyServer { "Relaying ClientResponsePayload (stream key {}, sequence {}, length {}) from Hopper to Dispatcher for client", response.stream_key, response.sequenced_packet.sequence_number, response.sequenced_packet.data.len() ); - let return_route_info = - match self.get_return_route_info(&msg.remaining_route, "client response") { - Some(rri) => rri, - None => return, - }; + let return_route_id = match self.rri_from_remaining_route(&msg.remaining_route) { + Some(rri) => rri, + None => return, + }; + let return_route_info = match self.get_return_route_info(return_route_id, "client response") + { + Some(rri) => rri, + None => return, + }; self.report_response_services_consumed( &return_route_info, response.sequenced_packet.data.len(), @@ -718,22 +756,29 @@ impl ProxyServer { } } + fn get_next_return_route_id(&self) -> u32 { + let return_route_id = self.next_return_route_id.get(); + self.next_return_route_id + .set(return_route_id.wrapping_add(1)); + return_route_id + } + fn try_transmit_to_hopper( args: TransmitToHopperArgs, add_return_route_sub: Recipient, route_query_response: RouteQueryResponse, ) -> Result<(), String> { match route_query_response.expected_services { - ExpectedServices::RoundTrip(over, back, return_route_id) => { + ExpectedServices::RoundTrip(over, back) => { let return_route_info = AddReturnRouteMessage { - return_route_id, + return_route_id: args.return_route_id, expected_services: back, protocol: args.payload.protocol, hostname_opt: args.payload.target_hostname.clone(), }; debug!( args.logger, - "Adding expectant return route info: {:?}", return_route_info + "Adding expectant return route info: RRI{}", args.return_route_id ); add_return_route_sub .try_send(return_route_info) @@ -839,9 +884,11 @@ impl ProxyServer { let payload = args.payload; let payload_size = payload.sequenced_packet.data.len(); let stream_key = payload.stream_key; + let route_with_return_route_id = + route.set_return_route_id(args.main_cryptde, args.return_route_id); let pkg = IncipientCoresPackage::new( args.main_cryptde, - route, + route_with_return_route_id, payload.into(), &payload_destination_key, ) @@ -923,31 +970,54 @@ impl ProxyServer { } } - fn get_return_route_info( - &self, - remaining_route: &Route, - source: &str, - ) -> Option> { + fn rri_from_remaining_route(&self, remaining_route: &Route) -> Option { let mut mut_remaining_route = remaining_route.clone(); mut_remaining_route .shift(self.main_cryptde) .expect("Internal error: remaining route in ProxyServer with no hops"); - let return_route_id = match mut_remaining_route.id(self.main_cryptde) { - Ok(rri) => rri, + match mut_remaining_route.return_route_id(self.main_cryptde) { + Ok(rri) => Some(rri), Err(e) => { error!(self.logger, "Can't report services consumed: {}", e); - return None; - } - }; - match self.route_ids_to_return_routes.get(&return_route_id) { - Some(rri) => Some(rri), - None => { - error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); None } } } + fn get_return_route_info( + &mut self, + return_route_id: u32, + source: &str, + ) -> Option> { + match self + .route_ids_to_return_routes_first_chance + .remove(&return_route_id) + { + Some(rri) => { + self.route_ids_to_return_routes_stragglers + .insert(return_route_id, (*rri).clone()); + debug!(self.logger, "Return route info RRI{} found in first-chance cache; graduated to straggler cache", return_route_id); + Some(rri) + } + None => match self + .route_ids_to_return_routes_stragglers + .get(&return_route_id) + { + Some(rri) => { + debug!( + self.logger, + "Return route info RRI{} found in straggler cache", return_route_id + ); + Some(rri) + } + None => { + error!(self.logger, "Can't report services consumed: received response with bogus return-route ID RRI{} for {}. Ignoring", return_route_id, source); + None + } + }, + } + } + fn report_response_services_consumed( &self, return_route_info: &AddReturnRouteMessage, @@ -1101,6 +1171,7 @@ impl IBCDHelperReal { } } } + impl IBCDHelper for IBCDHelperReal { fn handle_normal_client_data( &self, @@ -1157,7 +1228,7 @@ impl IBCDHelper for IBCDHelperReal { } let args = TransmitToHopperArgs::new(proxy, payload, client_addr, timestamp, retire_stream_key); - let add_return_route_sub = proxy.out_subs("ProxysServer").add_return_route.clone(); + let add_return_route_sub = proxy.out_subs("ProxyServer").add_return_route.clone(); let pld = &args.payload; if let Some(route_query_response) = proxy.stream_key_routes.get(&pld.stream_key) { debug!( @@ -1224,6 +1295,7 @@ impl IBCDHelper for IBCDHelperReal { pub struct TransmitToHopperArgs { pub main_cryptde: &'static dyn CryptDE, pub payload: ClientRequestPayload_0v1, + pub return_route_id: u32, pub client_addr: SocketAddr, pub timestamp: SystemTime, pub is_decentralized: bool, @@ -1252,9 +1324,11 @@ impl TransmitToHopperArgs { } else { None }; + let return_route_id = proxy_server.get_next_return_route_id(); Self { main_cryptde: proxy_server.main_cryptde, payload, + return_route_id, client_addr, timestamp, logger: proxy_server.logger.clone(), @@ -1353,6 +1427,7 @@ impl Hostname { #[cfg(test)] mod tests { use super::*; + use crate::blockchain::bip32::Bip32EncryptionKeyProvider; use crate::match_every_type_id; use crate::proxy_server::protocol_pack::ServerImpersonator; use crate::proxy_server::server_impersonator_http::ServerImpersonatorHttp; @@ -1479,10 +1554,12 @@ mod tests { self } } + #[test] fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "PROXYSERVER"); - assert_eq!(RETURN_ROUTE_TTL, Duration::from_secs(120)); + assert_eq!(RETURN_ROUTE_TTL_FIRST_CHANCE, Duration::from_secs(120)); + assert_eq!(RETURN_ROUTE_TTL_STRAGGLERS, Duration::from_secs(5)); assert_eq!(STREAM_KEY_PURGE_DELAY, Duration::from_secs(30)); } @@ -1617,6 +1694,85 @@ mod tests { } } + #[test] + fn get_return_route_info_produces_nothing_if_nothing_exists() { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + + let result = subject.get_return_route_info(1234, "test"); + + assert!( + result.is_none(), + "Expected no return route info, but got: RRI{:?}", + result + ); + } + + #[test] + fn get_return_route_info_produces_rri_from_first_chance_if_it_exists_and_moves_into_stragglers() + { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + let return_route_message = AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![ExpectedService::Nothing], + protocol: ProxyProtocol::TLS, + hostname_opt: None, + }; + subject + .route_ids_to_return_routes_first_chance + .insert(1234, return_route_message.clone()); + + let result = subject.get_return_route_info(1234, "test").unwrap(); + + assert_eq!(*result, return_route_message); + assert_eq!( + subject.route_ids_to_return_routes_first_chance.get(&1234), + None + ); + assert_eq!( + subject.route_ids_to_return_routes_stragglers.get(&1234), + Some(Rc::new(return_route_message)) + ); + } + + #[test] + fn get_return_route_info_produces_rri_from_stragglers_if_it_exists() { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + let return_route_message = AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![ExpectedService::Nothing], + protocol: ProxyProtocol::TLS, + hostname_opt: None, + }; + subject + .route_ids_to_return_routes_stragglers + .insert(1234, return_route_message.clone()); + + let result = subject.get_return_route_info(1234, "test").unwrap(); + + assert_eq!(*result, return_route_message); + } + #[test] fn proxy_server_receives_http_request_with_new_stream_key_from_dispatcher_then_sends_cores_package_to_hopper( ) { @@ -1633,7 +1789,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let (proxy_server_mock, _, proxy_server_recording_arc) = make_recorder(); @@ -1665,7 +1820,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -1737,7 +1892,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let route = Route { hops: vec![] }; @@ -1783,7 +1937,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -1865,7 +2019,7 @@ mod tests { .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -2246,7 +2400,7 @@ mod tests { hopper_recording.get_record::(0), &IncipientCoresPackage::new( main_cryptde, - expected_route.route, + expected_route.route.set_return_route_id(main_cryptde, 1), MessageType::ClientRequest(VersionedData::new( &crate::sub_lib::migrations::client_request_payload::MIGRATIONS, &ClientRequestPayload_0v1 { @@ -2326,7 +2480,7 @@ mod tests { hopper_recording.get_record::(0), &IncipientCoresPackage::new( main_cryptde, - expected_route.route, + expected_route.route.set_return_route_id(main_cryptde, 1), MessageType::ClientRequest(VersionedData::new( &crate::sub_lib::migrations::client_request_payload::MIGRATIONS, &ClientRequestPayload_0v1 { @@ -2359,7 +2513,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2390,7 +2543,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -2459,10 +2612,10 @@ mod tests { ), main_cryptde, Some(consuming_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap(); + .unwrap() + .set_return_route_id(main_cryptde, 1234); let (neighborhood_mock, _, neighborhood_recording_arc) = make_recorder(); let neighborhood_mock = neighborhood_mock.route_query_response(Some(RouteQueryResponse { route: route.clone(), @@ -2479,7 +2632,6 @@ mod tests { ExpectedService::Nothing, ExpectedService::Exit(PublicKey::new(&[3]), earning_wallet, rate_pack(102)), ], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2509,7 +2661,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &payload_destination_key, ) @@ -2565,7 +2717,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![expected_service.clone()], vec![expected_service], - 123, ), }); let (neighborhood_mock, _, _) = make_recorder(); @@ -2729,12 +2880,13 @@ mod tests { let alias_cryptde = alias_cryptde(); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let destination_key = PublicKey::from(&b"our destination"[..]); + let route = Route { hops: vec![] }; + let route_with_rrid = route.clone().set_return_route_id(main_cryptde, 4444); let route_query_response = RouteQueryResponse { - route: Route { hops: vec![] }, + route, expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), }; let (hopper_mock, hopper_awaiter, hopper_recording_arc) = make_recorder(); @@ -2764,7 +2916,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - Route { hops: vec![] }, + route_with_rrid, expected_payload.into(), &destination_key, ) @@ -2785,6 +2937,7 @@ mod tests { subject .stream_key_routes .insert(stream_key, route_query_response); + subject.next_return_route_id = Cell::new(4444); let subject_addr: Addr = subject.start(); let peer_actors = peer_actors_builder().hopper(hopper_mock).build(); subject_addr.try_send(BindMessage { peer_actors }).unwrap(); @@ -2804,56 +2957,87 @@ mod tests { fn proxy_server_sends_message_to_accountant_about_all_services_consumed_on_the_route_over() { let cryptde = main_cryptde(); let now = SystemTime::now(); - let exit_earning_wallet = make_wallet("exit earning wallet"); - let route_1_earning_wallet = make_wallet("route 1 earning wallet"); - let route_2_earning_wallet = make_wallet("route 2 earning wallet"); + let routing_node_1_public_key = PublicKey::new(&[1]); + let routing_node_2_public_key = PublicKey::new(&[2]); + let exit_node_public_key = PublicKey::new(&[3]); + let key_bytes = b"__originating consuming wallet__"; + let keypair = Bip32EncryptionKeyProvider::from_raw_secret(key_bytes).unwrap(); + let originating_consuming_wallet = Wallet::from(keypair); + let routing_node_1_earning_wallet = make_wallet("route 1 earning wallet"); + let routing_node_2_earning_wallet = make_wallet("route 2 earning wallet"); + let exit_node_earning_wallet = make_wallet("exit earning wallet"); + let routing_node_1_rate_pack = rate_pack(101); + let routing_node_2_rate_pack = rate_pack(102); + let exit_node_rate_pack = rate_pack(103); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let (accountant_mock, _, accountant_recording_arc) = make_recorder(); let (hopper_mock, _, hopper_recording_arc) = make_recorder(); let (proxy_server_mock, _, proxy_server_recording_arc) = make_recorder(); - let routing_node_1_rate_pack = rate_pack(101); - let routing_node_2_rate_pack = rate_pack(102); - let exit_node_rate_pack = rate_pack(103); + let over_route_segment = RouteSegment::new( + vec![ + &cryptde.public_key(), + &routing_node_1_public_key, + &routing_node_2_public_key, + &exit_node_public_key, + ], + Component::ProxyClient, + ); + let back_route_segment = RouteSegment::new( + vec![ + &exit_node_public_key, + &routing_node_2_public_key, + &routing_node_1_public_key, + &cryptde.public_key(), + ], + Component::ProxyServer, + ); + let route = Route::round_trip( + over_route_segment, + back_route_segment, + cryptde, + Some(originating_consuming_wallet), + Some(TEST_DEFAULT_CHAIN.rec().contract), + ) + .unwrap(); let route_query_response = RouteQueryResponse { - route: make_meaningless_route(), + route, expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, ExpectedService::Routing( - PublicKey::new(&[1]), - route_1_earning_wallet.clone(), - routing_node_1_rate_pack, + routing_node_1_public_key.clone(), + routing_node_1_earning_wallet.clone(), + routing_node_1_rate_pack.clone(), ), ExpectedService::Routing( - PublicKey::new(&[2]), - route_2_earning_wallet.clone(), - routing_node_2_rate_pack, + routing_node_2_public_key.clone(), + routing_node_2_earning_wallet.clone(), + routing_node_2_rate_pack.clone(), ), ExpectedService::Exit( - PublicKey::new(&[3]), - exit_earning_wallet.clone(), - exit_node_rate_pack, + exit_node_public_key.clone(), + exit_node_earning_wallet.clone(), + exit_node_rate_pack.clone(), ), ], vec![ ExpectedService::Exit( - PublicKey::new(&[3]), - make_wallet("some wallet 1"), - rate_pack(104), + exit_node_public_key.clone(), + exit_node_earning_wallet.clone(), + exit_node_rate_pack, ), ExpectedService::Routing( - PublicKey::new(&[2]), - make_wallet("some wallet 2"), - rate_pack(105), + routing_node_2_public_key.clone(), + routing_node_2_earning_wallet.clone(), + routing_node_2_rate_pack, ), ExpectedService::Routing( - PublicKey::new(&[1]), - make_wallet("some wallet 3"), - rate_pack(106), + routing_node_1_public_key.clone(), + routing_node_1_earning_wallet.clone(), + routing_node_1_rate_pack, ), ExpectedService::Nothing, ], - 0, ), }; let source_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2879,6 +3063,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 4444, client_addr: source_addr, timestamp: now, is_decentralized: true, @@ -2898,8 +3083,31 @@ mod tests { System::current().stop(); system.run(); let recording = hopper_recording_arc.lock().unwrap(); - let record = recording.get_record::(0); + let mut record = recording.get_record::(0).clone(); let payload_enc_length = record.payload.len(); + let _ = record.route.shift(cryptde); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_1_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_2_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &exit_node_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_2_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_1_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(cryptde); + assert_eq!(record.route.return_route_id(cryptde).unwrap(), 4444); let recording = accountant_recording_arc.lock().unwrap(); let record = recording.get_record::(0); assert_eq!(recording.len(), 1); @@ -2908,7 +3116,7 @@ mod tests { &ReportServicesConsumedMessage { timestamp: now, exit: ExitServiceConsumed { - earning_wallet: exit_earning_wallet, + earning_wallet: exit_node_earning_wallet, payload_size: exit_payload_size, service_rate: exit_node_rate_pack.exit_service_rate, byte_rate: exit_node_rate_pack.exit_byte_rate @@ -2916,12 +3124,12 @@ mod tests { routing_payload_size: payload_enc_length, routing: vec![ RoutingServiceConsumed { - earning_wallet: route_1_earning_wallet, + earning_wallet: routing_node_1_earning_wallet, service_rate: routing_node_1_rate_pack.routing_service_rate, byte_rate: routing_node_1_rate_pack.routing_byte_rate, }, RoutingServiceConsumed { - earning_wallet: route_2_earning_wallet, + earning_wallet: routing_node_2_earning_wallet, service_rate: routing_node_2_rate_pack.routing_service_rate, byte_rate: routing_node_2_rate_pack.routing_byte_rate, } @@ -2944,7 +3152,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing], vec![ExpectedService::Nothing], - 0, ), }; let source_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2967,6 +3174,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 3333, client_addr: source_addr, timestamp: SystemTime::now(), is_decentralized: false, @@ -2990,7 +3198,7 @@ mod tests { assert_eq!( record, &AddReturnRouteMessage { - return_route_id: 0, + return_route_id: 3333, expected_services: vec![ExpectedService::Nothing], protocol: ProxyProtocol::HTTP, hostname_opt: Some("nowhere.com".to_string()) @@ -3026,7 +3234,6 @@ mod tests { rate_pack(3), )], vec![], - 0, ); let neighborhood_mock = neighborhood_mock.route_query_response(Some(route_query_response.clone())); @@ -3072,10 +3279,8 @@ mod tests { } #[test] - #[should_panic( - expected = "AddRouteResultMessage Handler: stream key: AAAAAAAAAAAAAAAAAAAAAAAAAAA not found within dns_failure_retries" - )] fn route_result_message_handler_panics_when_dns_retries_hashmap_doesnt_contain_a_stream_key() { + init_test_logging(); let system = System::new("route_result_message_handler_panics_when_dns_retries_hashmap_doesnt_contain_a_stream_key"); let subject = ProxyServer::new( main_cryptde(), @@ -3096,7 +3301,9 @@ mod tests { }) .unwrap(); + System::current().stop(); system.run(); + TestLogHandler::new().exists_log_containing("ERROR: ProxyServer: AddRouteResultMessage stream key AAAAAAAAAAAAAAAAAAAAAAAAAAA not found within dns_failure_retries"); } #[test] @@ -3256,6 +3463,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 2222, client_addr: source_addr, timestamp: SystemTime::now(), is_decentralized: true, @@ -3323,11 +3531,11 @@ mod tests { RouteSegment::new(vec![public_key, public_key], Component::ProxyServer), cryptde, None, - 1234, None, ) - .unwrap(), - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + .unwrap() + .set_return_route_id(cryptde, 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }; let neighborhood_mock = neighborhood_mock.route_query_response(Some(route_query_response)); let dispatcher = Recorder::new(); @@ -3425,7 +3633,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let stream_key = StreamKey::make_meaningless_stream_key(); @@ -3441,7 +3648,7 @@ mod tests { data: expected_data.clone(), }; let expected_tls_request = PlainData::new(tls_request); - let route = Route { hops: vec![] }; + let route = Route { hops: vec![] }.set_return_route_id(main_cryptde, 1); let expected_payload = ClientRequestPayload_0v1 { stream_key: stream_key.clone(), sequenced_packet: SequencedPacket { @@ -3511,7 +3718,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let stream_key = StreamKey::make_meaningless_stream_key(); @@ -3542,7 +3748,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -3596,7 +3802,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let client_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -3604,7 +3809,7 @@ mod tests { let expected_data = tls_request.to_vec(); let msg_from_dispatcher = InboundClientData { timestamp: SystemTime::now(), - client_addr: client_addr, + client_addr, reception_port: Some(TLS_PORT), sequence_number: Some(0), last_data: true, @@ -3627,7 +3832,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -3759,7 +3964,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -3850,13 +4055,13 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject .tunneled_hosts .insert(stream_key.clone(), "hostname".to_string()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -3886,6 +4091,15 @@ mod tests { assert!(subject.keys_and_addrs.is_empty()); assert!(subject.stream_key_routes.is_empty()); assert!(subject.tunneled_hosts.is_empty()); + assert!(subject + .route_ids_to_return_routes_first_chance + .get(&1234) + .is_none()); + // TODO: This assert should be much stronger + assert!(subject + .route_ids_to_return_routes_stragglers + .get(&1234) + .is_some()); } #[test] @@ -3959,13 +4173,13 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject .tunneled_hosts .insert(stream_key.clone(), "hostname".to_string()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4062,7 +4276,7 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -4076,7 +4290,7 @@ mod tests { exit_byte_rate: 100, exit_service_rate: 60000, }; - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4168,7 +4382,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4200,7 +4414,7 @@ mod tests { let rate_pack_g = rate_pack(104); let rate_pack_h = rate_pack(105); let rate_pack_i = rate_pack(106); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1235, AddReturnRouteMessage { return_route_id: 1235, @@ -4388,7 +4602,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4471,7 +4685,7 @@ mod tests { let incoming_route_e_wallet = make_wallet("E Earning"); let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4662,7 +4876,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4776,7 +4990,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4858,7 +5072,7 @@ mod tests { .insert(stream_key.clone(), socket_addr); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4929,7 +5143,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( return_route_id, AddReturnRouteMessage { return_route_id, @@ -5006,7 +5220,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( return_route_id, AddReturnRouteMessage { return_route_id, @@ -5096,7 +5310,7 @@ mod tests { expected_services: ExpectedServices::OneWay(vec![]), }, ); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5155,7 +5369,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5226,7 +5440,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( expected_services.clone(), expected_services.clone(), - 1234, ), }; let neighborhood_mock = neighborhood_mock @@ -5256,7 +5469,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5344,7 +5557,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5396,7 +5609,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( expected_services.clone(), expected_services.clone(), - 1234, ), }; let neighborhood_mock = neighborhood_mock @@ -5432,7 +5644,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5513,7 +5725,7 @@ mod tests { .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); let remaining_route = return_route_with_id(cryptde, 4321); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 4321, AddReturnRouteMessage { return_route_id: 4321, @@ -5583,6 +5795,7 @@ mod tests { ) { init_test_logging(); let cryptde = main_cryptde(); + let return_route_id = 272727; let (dispatcher, _, dispatcher_recording_arc) = make_recorder(); let (accountant, _, accountant_recording_arc) = make_recorder(); let system = System::new("report_response_services_consumed_complains_and_drops_package_if_return_route_id_is_unrecognized"); @@ -5614,7 +5827,7 @@ mod tests { let expired_cores_package = ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 1234), + return_route_with_id(cryptde, return_route_id), client_response_payload, 0, ); @@ -5624,7 +5837,7 @@ mod tests { System::current().stop(); system.run(); - TestLogHandler::new().exists_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID 1234 for client response. Ignoring"); + TestLogHandler::new().exists_log_containing(format!("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID RRI{} for client response. Ignoring", return_route_id).as_str()); assert_eq!(dispatcher_recording_arc.lock().unwrap().len(), 0); assert_eq!(accountant_recording_arc.lock().unwrap().len(), 0); } @@ -5701,11 +5914,12 @@ mod tests { false, false, ); - subject.route_ids_to_return_routes = TtlHashMap::new(Duration::from_millis(250)); + subject.route_ids_to_return_routes_first_chance = + TtlHashMap::new(Duration::from_millis(250)); subject .keys_and_addrs .insert(stream_key, SocketAddr::from_str("1.2.3.4:5678").unwrap()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5743,7 +5957,7 @@ mod tests { ); subject_addr.try_send(expired_cores_package).unwrap(); - TestLogHandler::new().await_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID 1234 for client response. Ignoring", 1000); + TestLogHandler::new().await_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID RRI1234 for client response. Ignoring", 1000); } #[test] @@ -5759,7 +5973,7 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -5812,7 +6026,7 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); let affected_route = Route::round_trip( @@ -5826,7 +6040,6 @@ mod tests { ), main_cryptde(), Some(make_paying_wallet(b"consuming")), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); @@ -5839,11 +6052,7 @@ mod tests { affected_stream_key, RouteQueryResponse { route: affected_route.clone(), - expected_services: ExpectedServices::RoundTrip( - affected_expected_services, - vec![], - 1234, - ), + expected_services: ExpectedServices::RoundTrip(affected_expected_services, vec![]), }, ); subject @@ -5876,7 +6085,10 @@ mod tests { system.run(); let recording = hopper_recording_arc.lock().unwrap(); let record = recording.get_record::(0); - assert_eq!(record.route, affected_route); + assert_eq!( + record.route, + affected_route.set_return_route_id(main_cryptde(), 1) + ); let payload = decodex::(&affected_cryptde, &record.payload).unwrap(); match payload { MessageType::ClientRequest(vd) => assert_eq!( @@ -5937,9 +6149,10 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); + subject.next_return_route_id = Cell::new(1234); let affected_route = Route::round_trip( RouteSegment::new( vec![main_cryptde().public_key(), affected_cryptde.public_key()], @@ -5951,7 +6164,6 @@ mod tests { ), main_cryptde(), Some(make_paying_wallet(b"consuming")), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); @@ -5964,11 +6176,7 @@ mod tests { affected_stream_key, RouteQueryResponse { route: affected_route.clone(), - expected_services: ExpectedServices::RoundTrip( - affected_expected_services, - vec![], - 1234, - ), + expected_services: ExpectedServices::RoundTrip(affected_expected_services, vec![]), }, ); subject.logger = Logger::new(test_name); @@ -5996,7 +6204,10 @@ mod tests { system.run(); let recording = hopper_recording_arc.lock().unwrap(); let record = recording.get_record::(0); - assert_eq!(record.route, affected_route); + assert_eq!( + record.route, + affected_route.set_return_route_id(main_cryptde(), 1234) + ); let payload = decodex::(&affected_cryptde, &record.payload).unwrap(); match payload { MessageType::ClientRequest(vd) => assert_eq!( @@ -6076,7 +6287,7 @@ mod tests { stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 0), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -6145,6 +6356,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 8888, client_addr: SocketAddr::from_str("1.2.3.4:1234").unwrap(), timestamp: SystemTime::now(), is_decentralized: false, @@ -6250,7 +6462,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -6319,7 +6530,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -6388,8 +6598,8 @@ mod tests { assert_on_hostname("", ""); assert_on_hostname("example", "example"); assert_on_hostname( - "htttttps://example.com/folder/file.html", - "htttttps://example.com/folder/file.html", + "htttttps://www.example.com/folder/file.html", + "htttttps://www.example.com/folder/file.html", ); } @@ -6559,6 +6769,24 @@ mod tests { ); } + #[test] + fn get_next_return_route_id_wraps_around() { + let mut mut_subject = + ProxyServer::new(main_cryptde(), alias_cryptde(), true, None, false, false); + mut_subject.next_return_route_id = Cell::new(0xFFFFFFFE); + let subject = mut_subject; + + let end_minus_one = subject.get_next_return_route_id(); + let end = subject.get_next_return_route_id(); + let beginning = subject.get_next_return_route_id(); + let beginning_plus_one = subject.get_next_return_route_id(); + + assert_eq!(end_minus_one, 0xFFFFFFFE); + assert_eq!(end, 0xFFFFFFFF); + assert_eq!(beginning, 0x00000000); + assert_eq!(beginning_plus_one, 0x00000001); + } + fn make_exit_service_from_key(public_key: PublicKey) -> ExpectedService { ExpectedService::Exit(public_key, make_wallet("exit wallet"), rate_pack(100)) } diff --git a/node/src/sub_lib/http_packet_framer.rs b/node/src/sub_lib/http_packet_framer.rs index 29685f46b..28d4f895f 100644 --- a/node/src/sub_lib/http_packet_framer.rs +++ b/node/src/sub_lib/http_packet_framer.rs @@ -104,7 +104,7 @@ impl HttpPacketFramer { lines: Vec::new(), }, start_finder, - logger: Logger::new("HttpRequestFramer"), + logger: Logger::new("HttpPacketFramer"), } } diff --git a/node/src/sub_lib/neighborhood.rs b/node/src/sub_lib/neighborhood.rs index 79623cda3..f8e93f951 100644 --- a/node/src/sub_lib/neighborhood.rs +++ b/node/src/sub_lib/neighborhood.rs @@ -505,7 +505,7 @@ pub enum ExpectedService { #[derive(Clone, Debug, PartialEq, Eq)] pub enum ExpectedServices { OneWay(Vec), - RoundTrip(Vec, Vec, u32), + RoundTrip(Vec, Vec), } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/node/src/sub_lib/proxy_server.rs b/node/src/sub_lib/proxy_server.rs index c3042859f..8b28c588a 100644 --- a/node/src/sub_lib/proxy_server.rs +++ b/node/src/sub_lib/proxy_server.rs @@ -55,7 +55,7 @@ impl ClientRequestPayload_0v1 { } } -#[derive(Message, Debug, PartialEq, Eq)] +#[derive(Message, Debug, PartialEq, Eq, Clone)] pub struct AddReturnRouteMessage { pub return_route_id: u32, pub expected_services: Vec, diff --git a/node/src/sub_lib/route.rs b/node/src/sub_lib/route.rs index ecbd0261e..a9314d29e 100644 --- a/node/src/sub_lib/route.rs +++ b/node/src/sub_lib/route.rs @@ -32,7 +32,6 @@ impl Route { cryptde, None, None, - None, ) } @@ -47,7 +46,6 @@ impl Route { None, cryptde, consuming_wallet, - None, contract_address, ) } @@ -55,9 +53,8 @@ impl Route { pub fn round_trip( route_segment_over: RouteSegment, route_segment_back: RouteSegment, - cryptde: &dyn CryptDE, // Must be the CryptDE of the originating Node: used to encrypt return_route_id. + cryptde: &dyn CryptDE, // Doesn't matter which CryptDE: only used for encoding. consuming_wallet: Option, - return_route_id: u32, contract_address: Option
, ) -> Result { Self::construct( @@ -65,12 +62,11 @@ impl Route { Some(route_segment_back), cryptde, consuming_wallet, - Some(return_route_id), contract_address, ) } - pub fn id(&self, cryptde: &dyn CryptDE) -> Result { + pub fn return_route_id(&self, cryptde: &dyn CryptDE) -> Result { if let Some(first) = self.hops.first() { match decodex(cryptde, first) { Ok(n) => Ok(n), @@ -81,6 +77,12 @@ impl Route { } } + pub fn set_return_route_id(mut self, cryptde: &dyn CryptDE, return_route_id: u32) -> Self { + let return_route_id_enc = Self::encrypt_return_route_id(return_route_id, cryptde); + self.hops.push(return_route_id_enc); + self + } + // This cryptde must be the CryptDE of the next hop to come off the Route. pub fn next_hop(&self, cryptde: &dyn CryptDE) -> Result { match self.hops.first() { @@ -148,7 +150,6 @@ impl Route { back: Option, cryptde: &dyn CryptDE, consuming_wallet: Option, - return_route_id_opt: Option, contract_address: Option
, ) -> Result { if let Some(error) = Route::validate_route_segments(&over, &back) { @@ -173,12 +174,7 @@ impl Route { contract_address, ); - Route::hops_to_route( - hops[0..].to_vec(), - &over.keys[0], - return_route_id_opt, - cryptde, - ) + Route::hops_to_route(hops[0..].to_vec(), &over.keys[0], cryptde) } fn over_segment<'a>( @@ -295,7 +291,6 @@ impl Route { fn hops_to_route( hops: Vec, top_hop_key: &PublicKey, - return_route_id_opt: Option, cryptde: &dyn CryptDE, ) -> Result { let mut hops_enc: Vec = Vec::new(); @@ -307,10 +302,6 @@ impl Route { }); hop_key = &data_hop.public_key; } - if let Some(return_route_id) = return_route_id_opt { - let return_route_id_enc = Self::encrypt_return_route_id(return_route_id, cryptde); - hops_enc.push(return_route_id_enc); - } Ok(Route { hops: hops_enc }) } @@ -353,38 +344,38 @@ mod tests { use serde_cbor; #[test] - fn id_decodes_return_route_id() { + fn return_route_id_works() { let cryptde = main_cryptde(); let subject = Route { hops: vec![Route::encrypt_return_route_id(42, cryptde)], }; - assert_eq!(subject.id(cryptde), Ok(42)); + assert_eq!(subject.return_route_id(cryptde), Ok(42)); } #[test] - fn id_returns_empty_route_error_when_the_route_is_empty() { + fn return_route_id_returns_empty_route_error_when_the_route_is_empty() { let cryptde = main_cryptde(); let subject = Route { hops: vec![] }; assert_eq!( - subject.id(cryptde), + subject.return_route_id(cryptde), Err("Response route did not contain a return route ID".to_string()) ); } #[test] #[should_panic(expected = "Could not decrypt with ebe5f9a0e2 data beginning with ebe5f9a0e1")] - fn id_returns_error_when_the_id_fails_to_decrypt() { + fn return_route_id_returns_error_when_the_id_fails_to_decrypt() { let cryptde1 = CryptDENull::from(&PublicKey::new(b"key a"), TEST_DEFAULT_CHAIN); let cryptde2 = CryptDENull::from(&PublicKey::new(b"key b"), TEST_DEFAULT_CHAIN); let subject = Route { hops: vec![Route::encrypt_return_route_id(42, &cryptde1)], }; - let _ = subject.id(&cryptde2); + let _ = subject.return_route_id(&cryptde2); } #[test] @@ -420,7 +411,6 @@ mod tests { RouteSegment::new(vec![&c_key, &d_key], Component::ProxyServer), cryptde, Some(paying_wallet.clone()), - 0, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .err() @@ -472,10 +462,10 @@ mod tests { RouteSegment::new(vec![&d_key, &e_key, &f_key, &a_key], Component::ProxyServer), cryptde, Some(paying_wallet.clone()), - return_route_id, Some(contract_address.clone()), ) .unwrap(); + let subject = subject.set_return_route_id(cryptde, return_route_id); assert_eq!( subject.hops[0], @@ -745,10 +735,10 @@ mod tests { RouteSegment::new(vec![&key2, &key1], Component::ProxyServer), cryptde, Some(paying_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); + let original = original.set_return_route_id(cryptde, 1234); let serialized = serde_cbor::ser::to_vec(&original).unwrap(); @@ -794,16 +784,17 @@ Encrypted with 0x03040506: LiveHop { public_key: 0x, payer: Some(Payer { wallet: let key1 = PublicKey::new(&[1, 2, 3, 4]); let key2 = PublicKey::new(&[2, 3, 4, 5]); let key3 = PublicKey::new(&[3, 4, 5, 6]); + let cryptde = CryptDENull::from(&key1, TEST_DEFAULT_CHAIN); let paying_wallet = make_paying_wallet(b"wallet"); let subject = Route::round_trip( RouteSegment::new(vec![&key1, &key2, &key3], Component::ProxyClient), RouteSegment::new(vec![&key3, &key2, &key1], Component::ProxyServer), - &CryptDENull::from(&key1, TEST_DEFAULT_CHAIN), + &cryptde, Some(paying_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); + let subject = subject.set_return_route_id(&cryptde, 1234); let result = subject.to_string(vec![ &CryptDENull::from(&key1, TEST_DEFAULT_CHAIN), diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index faa79b68a..1c32dea0d 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -8,6 +8,7 @@ use std::rc::Rc; use std::time::Duration; use std::time::Instant; +#[allow(clippy::type_complexity)] pub struct TtlHashMap where K: Hash + Clone, @@ -15,6 +16,7 @@ where last_check: RefCell, data: RefCell, Instant)>>, ttl: Duration, + retire_closure: Box bool>, } impl TtlHashMap @@ -27,6 +29,19 @@ where last_check: RefCell::new(Instant::now()), data: RefCell::new(HashMap::new()), ttl, + retire_closure: Box::new(|_, _| true), + } + } + + pub fn new_with_retire(ttl: Duration, retire_closure: F) -> TtlHashMap + where + F: 'static + Fn(&K, &V) -> bool, + { + TtlHashMap { + last_check: RefCell::new(Instant::now()), + data: RefCell::new(HashMap::new()), + ttl, + retire_closure: Box::new(retire_closure), } } @@ -54,6 +69,12 @@ where } } + pub fn remove(&self, key: &K) -> Option> { + self.remove_expired_entries(); + + self.data.borrow_mut().remove(key).map(|(result, _)| result) + } + fn remove_expired_entries(&self) { let now = Instant::now(); @@ -74,7 +95,15 @@ where }; expired.iter().for_each(|key| { - self.data.borrow_mut().remove(key); + let mut data = self.data.borrow_mut(); + match data.remove(key) { + Some((value, _)) => { + if !(self.retire_closure)(key, value.as_ref()) { + data.insert(key.clone(), (value, now)); + } + } + None => (), // already removed + } }); } } @@ -82,26 +111,70 @@ where #[cfg(test)] mod tests { use super::*; + use std::sync::{Arc, Mutex}; use std::thread; #[test] fn new_sets_ttl() { let subject = TtlHashMap::::new(Duration::from_millis(1000)); - assert_eq!(subject.ttl, Duration::from_millis(1000)); + let result = subject.ttl; + + assert_eq!(result, Duration::from_millis(1000)); } #[test] - fn remove_returns_none_for_entry_that_was_never_inserted() { + fn get_returns_none_for_entry_that_was_never_inserted() { let subject = TtlHashMap::::new(Duration::from_millis(1000)); - assert_eq!(subject.get(&11u32), None); + let result = subject.get(&11u32); + + assert_eq!(result, None); assert_eq!(subject.ttl(), Duration::from_millis(1000)); } #[test] - fn ttl_hashmap_does_not_remove_entry_before_it_is_expired() { + fn remove_returns_none_if_no_such_entry_exists() { + let subject = TtlHashMap::::new(Duration::from_millis(1000)); + + let result = subject.remove(&11u32); + + assert_eq!(result, None); + } + + #[test] + fn remove_returns_existing_entry_and_removes() { + let mut subject = TtlHashMap::::new(Duration::from_millis(1000)); + subject.insert(11u32, 42u32); + + let before_result = subject.remove(&11u32); + let after_result = subject.remove(&11u32); + + assert_eq!(before_result, Some(Rc::new(42u32))); + assert_eq!(after_result, None); + } + + #[test] + fn ttl_hashmap_remove_removes_expired_entry() { let mut subject = TtlHashMap::new(Duration::from_millis(10)); + subject.insert(42u32, "Hello"); + thread::sleep(Duration::from_millis(20)); + + let result = subject.remove(&11u32); // nonexistent key + + assert_eq!(result, None); + // Low-level get, because high-level get would remove it if .remove() didn't + assert_eq!(subject.data.borrow().get(&42u32), None); + } + + #[test] + fn ttl_hashmap_does_not_remove_entry_before_it_is_expired() { + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); subject.insert(24u32, "World"); @@ -109,25 +182,52 @@ mod tests { assert_eq!(subject.get(&42u32).unwrap().as_ref(), &"Hello"); assert_eq!(subject.get(&24u32).unwrap().as_ref(), &"World"); assert_eq!(subject.ttl(), Duration::from_millis(10)); + assert_eq!(*retire_closure_has_run.lock().unwrap(), false); } #[test] fn ttl_hashmap_get_removes_expired_entry() { - let mut subject = TtlHashMap::new(Duration::from_millis(10)); - + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); + thread::sleep(Duration::from_millis(20)); + + let result = subject.get(&42u32); + + assert_eq!(result, None); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); + } + #[test] + fn ttl_hashmap_get_does_not_remove_expired_entry_if_closure_returns_false() { + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + false + }); + subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); - assert_eq!(subject.get(&42u32), None); + let result = subject.get(&42u32); + + assert_eq!(result, Some(Rc::new("Hello"))); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); } #[test] fn ttl_hashmap_insert_removes_expired_entry() { - let mut subject = TtlHashMap::new(Duration::from_millis(10)); - + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); - thread::sleep(Duration::from_millis(20)); subject.insert(24u32, "World"); @@ -137,6 +237,7 @@ mod tests { subject.data.borrow().get(&24u32).unwrap().0.as_ref(), &"World" ); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); } #[test] diff --git a/node/src/test_utils/mod.rs b/node/src/test_utils/mod.rs index 595bf3699..1589ea5cc 100644 --- a/node/src/test_utils/mod.rs +++ b/node/src/test_utils/mod.rs @@ -240,14 +240,13 @@ pub fn zero_hop_route_response( RouteSegment::new(vec![public_key, public_key], Component::ProxyServer), cryptde, None, - 0, None, ) - .unwrap(), + .unwrap() + .set_return_route_id(cryptde, 0), expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - 0, ), } } @@ -1268,7 +1267,6 @@ mod tests { ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing,], vec![ExpectedService::Nothing, ExpectedService::Nothing,], - 0 ) ); }