From fe2804df12f443d3fc1e4340c15edc1ba2f3e1ed Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Thu, 14 Aug 2025 21:53:17 -0400 Subject: [PATCH 01/12] Fixed the AddRouteResultMessage panic problem too --- node/src/proxy_server/mod.rs | 200 +++++++++++++++++++++++-------- node/src/sub_lib/proxy_server.rs | 2 +- node/src/sub_lib/ttl_hashmap.rs | 63 ++++++++-- 3 files changed, 205 insertions(+), 60 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index fc0334834..2bd9f9f99 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -58,7 +58,8 @@ use std::time::{Duration, SystemTime}; use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; -pub const RETURN_ROUTE_TTL: Duration = Duration::from_secs(120); +pub const RETURN_ROUTE_TTL_FIRST_CHANCE: Duration = Duration::from_secs(120); +pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(5); pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30); @@ -89,7 +90,11 @@ pub struct ProxyServer { alias_cryptde: &'static dyn CryptDE, crashable: bool, logger: Logger, - route_ids_to_return_routes: TtlHashMap, + // Holds return-route information for requests that have not yet seen any responses + route_ids_to_return_routes_first_chance: TtlHashMap, + // Holds return-route information for requests that have seen at least one response and may + // see more in the future. The near future, because this TTL is much shorter. + route_ids_to_return_routes_stragglers: TtlHashMap, browser_proxy_sequence_offset: bool, inbound_client_data_helper_opt: Option>, stream_key_purge_delay: Duration, @@ -139,7 +144,7 @@ impl Handler for ProxyServer { type Result = (); fn handle(&mut self, msg: AddReturnRouteMessage, _ctx: &mut Self::Context) -> Self::Result { - self.route_ids_to_return_routes + self.route_ids_to_return_routes_first_chance .insert(msg.return_route_id, msg); } } @@ -163,12 +168,13 @@ impl Handler for ProxyServer { type Result = (); fn handle(&mut self, msg: AddRouteResultMessage, _ctx: &mut Self::Context) -> Self::Result { - let dns_failure = self - .dns_failure_retries - .get(&msg.stream_key) - .unwrap_or_else(|| { - panic!("AddRouteResultMessage Handler: stream key: {} not found within dns_failure_retries", msg.stream_key); - }); + let dns_failure = match self.dns_failure_retries.get(&msg.stream_key) { + Some(retry) => retry, + None => { + error!(self.logger, "AddRouteResultMessage stream key {} not found within dns_failure_retries", msg.stream_key); + return + } + }; match msg.result { Ok(route_query_response) => { @@ -272,7 +278,8 @@ impl ProxyServer { alias_cryptde, crashable, logger: Logger::new("ProxyServer"), - route_ids_to_return_routes: TtlHashMap::new(RETURN_ROUTE_TTL), + route_ids_to_return_routes_first_chance: TtlHashMap::new(RETURN_ROUTE_TTL_FIRST_CHANCE), + route_ids_to_return_routes_stragglers: TtlHashMap::new(RETURN_ROUTE_TTL_STRAGGLERS), browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, @@ -362,8 +369,12 @@ impl ProxyServer { } fn handle_dns_resolve_failure(&mut self, msg: &ExpiredCoresPackage) { + let return_route_id = match self.rri_from_remaining_route(&msg.remaining_route) { + Some(rri) => rri, + None => return, // TODO: Eventually we'll have to do something better here, but we'll probably need some heuristics. + }; let return_route_info = - match self.get_return_route_info(&msg.remaining_route, "dns resolve failure") { + match self.get_return_route_info(return_route_id, "dns resolve failure") { Some(rri) => rri, None => return, // TODO: Eventually we'll have to do something better here, but we'll probably need some heuristics. }; @@ -503,8 +514,13 @@ impl ProxyServer { "Relaying ClientResponsePayload (stream key {}, sequence {}, length {}) from Hopper to Dispatcher for client", response.stream_key, response.sequenced_packet.sequence_number, response.sequenced_packet.data.len() ); + let return_route_id = + match self.rri_from_remaining_route(&msg.remaining_route) { + Some(rri) => rri, + None => return, + }; let return_route_info = - match self.get_return_route_info(&msg.remaining_route, "client response") { + match self.get_return_route_info(return_route_id, "client response") { Some(rri) => rri, None => return, }; @@ -923,31 +939,43 @@ impl ProxyServer { } } - fn get_return_route_info( + fn rri_from_remaining_route( &self, remaining_route: &Route, - source: &str, - ) -> Option> { + ) -> Option { let mut mut_remaining_route = remaining_route.clone(); mut_remaining_route .shift(self.main_cryptde) .expect("Internal error: remaining route in ProxyServer with no hops"); - let return_route_id = match mut_remaining_route.id(self.main_cryptde) { - Ok(rri) => rri, + match mut_remaining_route.id(self.main_cryptde) { + Ok(rri) => Some(rri), Err(e) => { error!(self.logger, "Can't report services consumed: {}", e); - return None; - } - }; - match self.route_ids_to_return_routes.get(&return_route_id) { - Some(rri) => Some(rri), - None => { - error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); None } } } + fn get_return_route_info( + &mut self, + return_route_id: u32, + source: &str, + ) -> Option> { + match self.route_ids_to_return_routes_first_chance.remove(&return_route_id) { + Some(rri) => { + self.route_ids_to_return_routes_stragglers.insert(return_route_id, (*rri).clone()); + Some(rri) + }, + None => match self.route_ids_to_return_routes_stragglers.get(&return_route_id) { + Some(rri) => Some(rri), + None => { + error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); + None + } + } + } + } + fn report_response_services_consumed( &self, return_route_info: &AddReturnRouteMessage, @@ -1479,10 +1507,11 @@ mod tests { self } } + #[test] fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "PROXYSERVER"); - assert_eq!(RETURN_ROUTE_TTL, Duration::from_secs(120)); + assert_eq!(RETURN_ROUTE_TTL_FIRST_CHANCE, Duration::from_secs(120)); assert_eq!(STREAM_KEY_PURGE_DELAY, Duration::from_secs(30)); } @@ -1617,6 +1646,70 @@ mod tests { } } + #[test] + fn get_return_route_info_produces_nothing_if_nothing_exists() { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + + let result = subject.get_return_route_info(1234, "test"); + + assert!(result.is_none(), "Expected no return route info, but got: {:?}", result); + } + + #[test] + fn get_return_route_info_produces_rri_from_first_chance_if_it_exists_and_moves_into_stragglers() { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + let return_route_message = AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![ExpectedService::Nothing], + protocol: ProxyProtocol::TLS, + hostname_opt: None, + }; + subject.route_ids_to_return_routes_first_chance.insert(1234, return_route_message.clone()); + + let result = subject.get_return_route_info(1234, "test").unwrap(); + + assert_eq!(*result, return_route_message); + assert_eq!(subject.route_ids_to_return_routes_first_chance.get(&1234), None); + assert_eq!(subject.route_ids_to_return_routes_stragglers.get(&1234), Some(Rc::new(return_route_message))); + } + + #[test] + fn get_return_route_info_produces_rri_from_stragglers_if_it_exists() { + let mut subject = ProxyServer::new( + main_cryptde(), + alias_cryptde(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + let return_route_message = AddReturnRouteMessage { + return_route_id: 1234, + expected_services: vec![ExpectedService::Nothing], + protocol: ProxyProtocol::TLS, + hostname_opt: None, + }; + subject.route_ids_to_return_routes_stragglers.insert(1234, return_route_message.clone()); + + let result = subject.get_return_route_info(1234, "test").unwrap(); + + assert_eq!(*result, return_route_message); + } + #[test] fn proxy_server_receives_http_request_with_new_stream_key_from_dispatcher_then_sends_cores_package_to_hopper( ) { @@ -1865,7 +1958,7 @@ mod tests { .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -3072,10 +3165,8 @@ mod tests { } #[test] - #[should_panic( - expected = "AddRouteResultMessage Handler: stream key: AAAAAAAAAAAAAAAAAAAAAAAAAAA not found within dns_failure_retries" - )] fn route_result_message_handler_panics_when_dns_retries_hashmap_doesnt_contain_a_stream_key() { + init_test_logging(); let system = System::new("route_result_message_handler_panics_when_dns_retries_hashmap_doesnt_contain_a_stream_key"); let subject = ProxyServer::new( main_cryptde(), @@ -3096,7 +3187,9 @@ mod tests { }) .unwrap(); + System::current().stop(); system.run(); + TestLogHandler::new().exists_log_containing("ERROR: ProxyServer: AddRouteResultMessage stream key AAAAAAAAAAAAAAAAAAAAAAAAAAA not found within dns_failure_retries"); } #[test] @@ -3604,7 +3697,7 @@ mod tests { let expected_data = tls_request.to_vec(); let msg_from_dispatcher = InboundClientData { timestamp: SystemTime::now(), - client_addr: client_addr, + client_addr, reception_port: Some(TLS_PORT), sequence_number: Some(0), last_data: true, @@ -3759,7 +3852,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -3856,7 +3949,7 @@ mod tests { subject .tunneled_hosts .insert(stream_key.clone(), "hostname".to_string()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -3886,6 +3979,9 @@ mod tests { assert!(subject.keys_and_addrs.is_empty()); assert!(subject.stream_key_routes.is_empty()); assert!(subject.tunneled_hosts.is_empty()); + assert!(subject.route_ids_to_return_routes_first_chance.get(&1234).is_none()); + // TODO: This assert should be much stronger + assert!(subject.route_ids_to_return_routes_stragglers.get(&1234).is_some()); } #[test] @@ -3965,7 +4061,7 @@ mod tests { subject .tunneled_hosts .insert(stream_key.clone(), "hostname".to_string()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4076,7 +4172,7 @@ mod tests { exit_byte_rate: 100, exit_service_rate: 60000, }; - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4168,7 +4264,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4200,7 +4296,7 @@ mod tests { let rate_pack_g = rate_pack(104); let rate_pack_h = rate_pack(105); let rate_pack_i = rate_pack(106); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1235, AddReturnRouteMessage { return_route_id: 1235, @@ -4388,7 +4484,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4471,7 +4567,7 @@ mod tests { let incoming_route_e_wallet = make_wallet("E Earning"); let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4662,7 +4758,7 @@ mod tests { let rate_pack_d = rate_pack(101); let rate_pack_e = rate_pack(102); let rate_pack_f = rate_pack(103); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4776,7 +4872,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4858,7 +4954,7 @@ mod tests { .insert(stream_key.clone(), socket_addr); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -4929,7 +5025,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( return_route_id, AddReturnRouteMessage { return_route_id, @@ -5006,7 +5102,7 @@ mod tests { .insert(stream_key.clone(), socket_addr.clone()); let exit_public_key = PublicKey::from(&b"exit_key"[..]); let exit_wallet = make_wallet("exit wallet"); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( return_route_id, AddReturnRouteMessage { return_route_id, @@ -5096,7 +5192,7 @@ mod tests { expected_services: ExpectedServices::OneWay(vec![]), }, ); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5155,7 +5251,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5256,7 +5352,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5344,7 +5440,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5432,7 +5528,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, @@ -5513,7 +5609,7 @@ mod tests { .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); let remaining_route = return_route_with_id(cryptde, 4321); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 4321, AddReturnRouteMessage { return_route_id: 4321, @@ -5701,11 +5797,13 @@ mod tests { false, false, ); - subject.route_ids_to_return_routes = TtlHashMap::new(Duration::from_millis(250)); + subject.route_ids_to_return_routes_first_chance = TtlHashMap::new( + Duration::from_millis(250), + ); subject .keys_and_addrs .insert(stream_key, SocketAddr::from_str("1.2.3.4:5678").unwrap()); - subject.route_ids_to_return_routes.insert( + subject.route_ids_to_return_routes_first_chance.insert( 1234, AddReturnRouteMessage { return_route_id: 1234, diff --git a/node/src/sub_lib/proxy_server.rs b/node/src/sub_lib/proxy_server.rs index c3042859f..8b28c588a 100644 --- a/node/src/sub_lib/proxy_server.rs +++ b/node/src/sub_lib/proxy_server.rs @@ -55,7 +55,7 @@ impl ClientRequestPayload_0v1 { } } -#[derive(Message, Debug, PartialEq, Eq)] +#[derive(Message, Debug, PartialEq, Eq, Clone)] pub struct AddReturnRouteMessage { pub return_route_id: u32, pub expected_services: Vec, diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index faa79b68a..9eae4c179 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -54,6 +54,17 @@ where } } + pub fn remove(&self, key: &K) -> Option> { + self.remove_expired_entries(); + + match self.data.borrow_mut().remove(key) { + Some((result, _)) => { + Some(result) + } + None => None, + } + } + fn remove_expired_entries(&self) { let now = Instant::now(); @@ -88,17 +99,55 @@ mod tests { fn new_sets_ttl() { let subject = TtlHashMap::::new(Duration::from_millis(1000)); - assert_eq!(subject.ttl, Duration::from_millis(1000)); + let result = subject.ttl; + + assert_eq!(result, Duration::from_millis(1000)); } #[test] - fn remove_returns_none_for_entry_that_was_never_inserted() { + fn get_returns_none_for_entry_that_was_never_inserted() { let subject = TtlHashMap::::new(Duration::from_millis(1000)); - assert_eq!(subject.get(&11u32), None); + let result = subject.get(&11u32); + + assert_eq!(result, None); assert_eq!(subject.ttl(), Duration::from_millis(1000)); } + #[test] + fn remove_returns_none_if_no_such_entry_exists() { + let subject = TtlHashMap::::new(Duration::from_millis(1000)); + + let result = subject.remove(&11u32); + + assert_eq!(result, None); + } + + #[test] + fn remove_returns_existing_entry_and_removes() { + let mut subject = TtlHashMap::::new(Duration::from_millis(1000)); + subject.insert(11u32, 42u32); + + let before_result = subject.remove(&11u32); + let after_result = subject.remove(&11u32); + + assert_eq!(before_result, Some(Rc::new(42u32))); + assert_eq!(after_result, None); + } + + #[test] + fn ttl_hashmap_remove_removes_expired_entry() { + let mut subject = TtlHashMap::new(Duration::from_millis(10)); + subject.insert(42u32, "Hello"); + thread::sleep(Duration::from_millis(20)); + + let result = subject.remove(&11u32); // nonexistent key + + assert_eq!(result, None); + // Low-level get, because high-level get would remove it if .remove() didn't + assert_eq!(subject.data.borrow().get(&42u32), None); + } + #[test] fn ttl_hashmap_does_not_remove_entry_before_it_is_expired() { let mut subject = TtlHashMap::new(Duration::from_millis(10)); @@ -114,20 +163,18 @@ mod tests { #[test] fn ttl_hashmap_get_removes_expired_entry() { let mut subject = TtlHashMap::new(Duration::from_millis(10)); - subject.insert(42u32, "Hello"); - thread::sleep(Duration::from_millis(20)); - assert_eq!(subject.get(&42u32), None); + let result = subject.get(&42u32); + + assert_eq!(result, None); } #[test] fn ttl_hashmap_insert_removes_expired_entry() { let mut subject = TtlHashMap::new(Duration::from_millis(10)); - subject.insert(42u32, "Hello"); - thread::sleep(Duration::from_millis(20)); subject.insert(24u32, "World"); From 81333fe7a49d16dc90e9e66bab5741fd64e429a2 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Thu, 14 Aug 2025 22:20:44 -0400 Subject: [PATCH 02/12] Preemptive review issues --- node/src/proxy_server/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 2bd9f9f99..c75ec08ea 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -1512,6 +1512,7 @@ mod tests { fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "PROXYSERVER"); assert_eq!(RETURN_ROUTE_TTL_FIRST_CHANCE, Duration::from_secs(120)); + assert_eq!(RETURN_ROUTE_TTL_STRAGGLERS, Duration::from_secs(5)); assert_eq!(STREAM_KEY_PURGE_DELAY, Duration::from_secs(30)); } From 02d46c39cd876192a444895e19fd2a0315589aef Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Thu, 14 Aug 2025 22:51:13 -0400 Subject: [PATCH 03/12] Formatting --- node/src/proxy_server/mod.rs | 91 +++++++++++++++++++++------------ node/src/sub_lib/ttl_hashmap.rs | 7 +-- 2 files changed, 60 insertions(+), 38 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index c75ec08ea..91007d7e8 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -171,8 +171,12 @@ impl Handler for ProxyServer { let dns_failure = match self.dns_failure_retries.get(&msg.stream_key) { Some(retry) => retry, None => { - error!(self.logger, "AddRouteResultMessage stream key {} not found within dns_failure_retries", msg.stream_key); - return + error!( + self.logger, + "AddRouteResultMessage stream key {} not found within dns_failure_retries", + msg.stream_key + ); + return; } }; @@ -514,16 +518,15 @@ impl ProxyServer { "Relaying ClientResponsePayload (stream key {}, sequence {}, length {}) from Hopper to Dispatcher for client", response.stream_key, response.sequenced_packet.sequence_number, response.sequenced_packet.data.len() ); - let return_route_id = - match self.rri_from_remaining_route(&msg.remaining_route) { - Some(rri) => rri, - None => return, - }; - let return_route_info = - match self.get_return_route_info(return_route_id, "client response") { - Some(rri) => rri, - None => return, - }; + let return_route_id = match self.rri_from_remaining_route(&msg.remaining_route) { + Some(rri) => rri, + None => return, + }; + let return_route_info = match self.get_return_route_info(return_route_id, "client response") + { + Some(rri) => rri, + None => return, + }; self.report_response_services_consumed( &return_route_info, response.sequenced_packet.data.len(), @@ -939,10 +942,7 @@ impl ProxyServer { } } - fn rri_from_remaining_route( - &self, - remaining_route: &Route, - ) -> Option { + fn rri_from_remaining_route(&self, remaining_route: &Route) -> Option { let mut mut_remaining_route = remaining_route.clone(); mut_remaining_route .shift(self.main_cryptde) @@ -961,18 +961,25 @@ impl ProxyServer { return_route_id: u32, source: &str, ) -> Option> { - match self.route_ids_to_return_routes_first_chance.remove(&return_route_id) { + match self + .route_ids_to_return_routes_first_chance + .remove(&return_route_id) + { Some(rri) => { - self.route_ids_to_return_routes_stragglers.insert(return_route_id, (*rri).clone()); + self.route_ids_to_return_routes_stragglers + .insert(return_route_id, (*rri).clone()); Some(rri) - }, - None => match self.route_ids_to_return_routes_stragglers.get(&return_route_id) { + } + None => match self + .route_ids_to_return_routes_stragglers + .get(&return_route_id) + { Some(rri) => Some(rri), None => { error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); None } - } + }, } } @@ -1660,11 +1667,16 @@ mod tests { let result = subject.get_return_route_info(1234, "test"); - assert!(result.is_none(), "Expected no return route info, but got: {:?}", result); + assert!( + result.is_none(), + "Expected no return route info, but got: {:?}", + result + ); } #[test] - fn get_return_route_info_produces_rri_from_first_chance_if_it_exists_and_moves_into_stragglers() { + fn get_return_route_info_produces_rri_from_first_chance_if_it_exists_and_moves_into_stragglers() + { let mut subject = ProxyServer::new( main_cryptde(), alias_cryptde(), @@ -1679,13 +1691,21 @@ mod tests { protocol: ProxyProtocol::TLS, hostname_opt: None, }; - subject.route_ids_to_return_routes_first_chance.insert(1234, return_route_message.clone()); + subject + .route_ids_to_return_routes_first_chance + .insert(1234, return_route_message.clone()); let result = subject.get_return_route_info(1234, "test").unwrap(); assert_eq!(*result, return_route_message); - assert_eq!(subject.route_ids_to_return_routes_first_chance.get(&1234), None); - assert_eq!(subject.route_ids_to_return_routes_stragglers.get(&1234), Some(Rc::new(return_route_message))); + assert_eq!( + subject.route_ids_to_return_routes_first_chance.get(&1234), + None + ); + assert_eq!( + subject.route_ids_to_return_routes_stragglers.get(&1234), + Some(Rc::new(return_route_message)) + ); } #[test] @@ -1704,7 +1724,9 @@ mod tests { protocol: ProxyProtocol::TLS, hostname_opt: None, }; - subject.route_ids_to_return_routes_stragglers.insert(1234, return_route_message.clone()); + subject + .route_ids_to_return_routes_stragglers + .insert(1234, return_route_message.clone()); let result = subject.get_return_route_info(1234, "test").unwrap(); @@ -3980,9 +4002,15 @@ mod tests { assert!(subject.keys_and_addrs.is_empty()); assert!(subject.stream_key_routes.is_empty()); assert!(subject.tunneled_hosts.is_empty()); - assert!(subject.route_ids_to_return_routes_first_chance.get(&1234).is_none()); + assert!(subject + .route_ids_to_return_routes_first_chance + .get(&1234) + .is_none()); // TODO: This assert should be much stronger - assert!(subject.route_ids_to_return_routes_stragglers.get(&1234).is_some()); + assert!(subject + .route_ids_to_return_routes_stragglers + .get(&1234) + .is_some()); } #[test] @@ -5798,9 +5826,8 @@ mod tests { false, false, ); - subject.route_ids_to_return_routes_first_chance = TtlHashMap::new( - Duration::from_millis(250), - ); + subject.route_ids_to_return_routes_first_chance = + TtlHashMap::new(Duration::from_millis(250)); subject .keys_and_addrs .insert(stream_key, SocketAddr::from_str("1.2.3.4:5678").unwrap()); diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index 9eae4c179..dc06d18aa 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -57,12 +57,7 @@ where pub fn remove(&self, key: &K) -> Option> { self.remove_expired_entries(); - match self.data.borrow_mut().remove(key) { - Some((result, _)) => { - Some(result) - } - None => None, - } + self.data.borrow_mut().remove(key).map(|(result, _)| result) } fn remove_expired_entries(&self) { From 442e63c6ed3b35262bc74d48bd60c8ffb3206593 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 18 Aug 2025 06:58:05 -0400 Subject: [PATCH 04/12] example.com -> www.example.com --- .../tests/bookkeeping_test.rs | 4 ++-- .../tests/communication_failure_test.rs | 4 ++-- .../tests/data_routing_test.rs | 4 ++-- node/src/blockchain/blockchain_bridge.rs | 2 +- node/src/proxy_server/mod.rs | 12 ++++++------ node/src/stream_reader.rs | 6 +++--- node/src/test_utils/mod.rs | 2 +- node/tests/http_through_node_test.rs | 2 +- node/tests/initialization_test.rs | 2 +- node/tests/tls_through_node_test.rs | 4 ++-- node/tests/ui_gateway_test.rs | 2 +- 11 files changed, 22 insertions(+), 22 deletions(-) diff --git a/multinode_integration_tests/tests/bookkeeping_test.rs b/multinode_integration_tests/tests/bookkeeping_test.rs index 6c7552eae..97b39f5c9 100644 --- a/multinode_integration_tests/tests/bookkeeping_test.rs +++ b/multinode_integration_tests/tests/bookkeeping_test.rs @@ -29,13 +29,13 @@ fn provided_and_consumed_services_are_recorded_in_databases() { let mut client = originating_node.make_client(8080, STANDARD_CLIENT_TIMEOUT_MILLIS); client.set_timeout(Duration::from_secs(10)); - let request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n".as_bytes(); + let request = "GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n".as_bytes(); client.send_chunk(request); let response = String::from_utf8(client.wait_for_chunk()).unwrap(); assert!( response.contains("

Example Domain

"), - "Not from example.com:\n{}", + "Not from www.example.com:\n{}", response ); diff --git a/multinode_integration_tests/tests/communication_failure_test.rs b/multinode_integration_tests/tests/communication_failure_test.rs index 65d31dd4f..f687cf05d 100644 --- a/multinode_integration_tests/tests/communication_failure_test.rs +++ b/multinode_integration_tests/tests/communication_failure_test.rs @@ -103,7 +103,7 @@ fn neighborhood_notified_of_newly_missing_node() { //Establish a client on the originating Node and send some ill-fated traffic. let mut client = originating_node.make_client(8080, STANDARD_CLIENT_TIMEOUT_MILLIS); - client.send_chunk("GET http://example.com HTTP/1.1\r\n\r\n".as_bytes()); + client.send_chunk("GET http://www.example.com HTTP/1.1\r\n\r\n".as_bytes()); // Now direct the witness Node to wait for Gossip about the disappeared Node. let (disappearance_gossip, _) = witness_node @@ -405,7 +405,7 @@ fn dns_resolution_failure_no_longer_blacklists_exit_node_for_all_hosts() { ), ); - client.send_chunk("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n".as_bytes()); + client.send_chunk("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n".as_bytes()); let cheapest_node = node_list.first().unwrap(); let cheapest_node_expired_cores_package = cheapest_node .wait_for_specific_package( diff --git a/multinode_integration_tests/tests/data_routing_test.rs b/multinode_integration_tests/tests/data_routing_test.rs index cdefcd354..98dd460a1 100644 --- a/multinode_integration_tests/tests/data_routing_test.rs +++ b/multinode_integration_tests/tests/data_routing_test.rs @@ -175,7 +175,7 @@ fn tls_end_to_end_routing_test() { .expect("Could not set read timeout to 1000ms"); let connector = TlsConnector::new().expect("Could not build TlsConnector"); match connector.connect( - "example.com", + "www.example.com", stream.try_clone().expect("Couldn't clone TcpStream"), ) { Ok(s) => { @@ -199,7 +199,7 @@ fn tls_end_to_end_routing_test() { tls_stream.expect("Couldn't handshake") }; - let request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n".as_bytes(); + let request = "GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n".as_bytes(); tls_stream .write(request.clone()) .expect("Could not write request to TLS stream"); diff --git a/node/src/blockchain/blockchain_bridge.rs b/node/src/blockchain/blockchain_bridge.rs index e4275b036..a0e501393 100644 --- a/node/src/blockchain/blockchain_bridge.rs +++ b/node/src/blockchain/blockchain_bridge.rs @@ -662,7 +662,7 @@ mod tests { #[test] fn blockchain_interface_is_constructed_with_a_blockchain_service_url() { init_test_logging(); - let blockchain_service_url = "https://example.com"; + let blockchain_service_url = "https://www.example.com"; let subject = BlockchainBridge::initialize_blockchain_interface( Some(blockchain_service_url.to_string()), TEST_DEFAULT_CHAIN, diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 91007d7e8..f130f017a 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -6504,15 +6504,15 @@ mod tests { #[test] fn hostname_works() { - assert_on_hostname("https://example.com/folder/file.html", "example.com"); - assert_on_hostname("example.com/index.php?arg=test", "example.com"); + assert_on_hostname("https://www.example.com/folder/file.html", "www.example.com"); + assert_on_hostname("www.example.com/index.php?arg=test", "www.example.com"); assert_on_hostname("sub.example.com/index.php?arg=test", "sub.example.com"); assert_on_hostname("1.1.1.1", "1.1.1.1"); assert_on_hostname("", ""); assert_on_hostname("example", "example"); assert_on_hostname( - "htttttps://example.com/folder/file.html", - "htttttps://example.com/folder/file.html", + "htttttps://www.example.com/folder/file.html", + "htttttps://www.example.com/folder/file.html", ); } @@ -6554,11 +6554,11 @@ mod tests { Err("localhost".to_string()) ); assert_eq!( - Hostname::new("example.com").validate_non_loopback_host(), + Hostname::new("www.example.com").validate_non_loopback_host(), Ok(()) ); assert_eq!( - Hostname::new("https://example.com").validate_non_loopback_host(), + Hostname::new("https://www.example.com").validate_non_loopback_host(), Ok(()) ); } diff --git a/node/src/stream_reader.rs b/node/src/stream_reader.rs index 34a7b62bd..ea96b7d5a 100644 --- a/node/src/stream_reader.rs +++ b/node/src/stream_reader.rs @@ -538,7 +538,7 @@ mod tests { Box::new(TlsDiscriminatorFactory::new()), Box::new(HttpRequestDiscriminatorFactory::new()), ]; - let http_connect_request = Vec::from("CONNECT example.com:443 HTTP/1.1\r\n\r\n".as_bytes()); + let http_connect_request = Vec::from("CONNECT www.example.com:443 HTTP/1.1\r\n\r\n".as_bytes()); // Magic TLS Sauce stolen from Configuration let tls_request = Vec::from(&[0x16, 0x03, 0x01, 0x00, 0x03, 0x01, 0x02, 0x03][..]); let reader = ReadHalfWrapperMock { @@ -594,7 +594,7 @@ mod tests { let discriminator_factories: Vec> = vec![Box::new(HttpRequestDiscriminatorFactory::new())]; let request1 = Vec::from("GET http://here.com HTTP/1.1\r\n\r\n".as_bytes()); - let request2 = Vec::from("GET http://example.com HTTP/1.1\r\n\r\n".as_bytes()); + let request2 = Vec::from("GET http://www.example.com HTTP/1.1\r\n\r\n".as_bytes()); let reader = ReadHalfWrapperMock { poll_read_results: vec![ (request1.clone(), Ok(Async::Ready(request1.len()))), @@ -651,7 +651,7 @@ mod tests { last_data: false, is_clandestine: false, sequence_number: Some(1), - data: Vec::from("GET http://example.com HTTP/1.1\r\n\r\n".as_bytes()), + data: Vec::from("GET http://www.example.com HTTP/1.1\r\n\r\n".as_bytes()), } ); } diff --git a/node/src/test_utils/mod.rs b/node/src/test_utils/mod.rs index 1bf32b4b5..595bf3699 100644 --- a/node/src/test_utils/mod.rs +++ b/node/src/test_utils/mod.rs @@ -713,7 +713,7 @@ pub mod unshared_test_utils { ClientRequestPayload_0v1 { stream_key: StreamKey::make_meaningful_stream_key("request"), sequenced_packet: SequencedPacket::new(make_garbage_data(bytes), 0, true), - target_hostname: Some("example.com".to_string()), + target_hostname: Some("www.example.com".to_string()), target_port: HTTP_PORT, protocol: ProxyProtocol::HTTP, originator_public_key: cryptde.public_key().clone(), diff --git a/node/tests/http_through_node_test.rs b/node/tests/http_through_node_test.rs index f49cb25b9..9f2d3669f 100644 --- a/node/tests/http_through_node_test.rs +++ b/node/tests/http_through_node_test.rs @@ -24,7 +24,7 @@ fn http_through_node_integration() { stream .set_read_timeout(Some(Duration::from_millis(1000))) .unwrap(); - let request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n".as_bytes(); + let request = "GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n".as_bytes(); stream.write(request.clone()).unwrap(); let buf = read_until_timeout(&mut stream); diff --git a/node/tests/initialization_test.rs b/node/tests/initialization_test.rs index 5b57d5b96..b124195db 100644 --- a/node/tests/initialization_test.rs +++ b/node/tests/initialization_test.rs @@ -76,7 +76,7 @@ fn initialization_sequence_integration() { ("neighborhood-mode", Some("zero-hop")), ("log-level", Some("trace")), ("data-directory", Some(&data_directory.to_str().unwrap())), - ("blockchain-service-url", Some("https://example.com")), + ("blockchain-service-url", Some("https://www.example.com")), ])) .unwrap(); let financials_request = UiFinancialsRequest { diff --git a/node/tests/tls_through_node_test.rs b/node/tests/tls_through_node_test.rs index 1509e38c7..d8975c9b0 100644 --- a/node/tests/tls_through_node_test.rs +++ b/node/tests/tls_through_node_test.rs @@ -33,7 +33,7 @@ fn tls_through_node_integration() { .expect("Could not set read timeout to 1000ms"); let connector = TlsConnector::new().expect("Could not build TlsConnector"); match connector.connect( - "example.com", + "www.example.com", stream.try_clone().expect("Couldn't clone TcpStream"), ) { Ok(s) => { @@ -57,7 +57,7 @@ fn tls_through_node_integration() { tls_stream.expect("Couldn't handshake") }; - let request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n".as_bytes(); + let request = "GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n".as_bytes(); tls_stream .write(request.clone()) .expect("Could not write request to TLS stream"); diff --git a/node/tests/ui_gateway_test.rs b/node/tests/ui_gateway_test.rs index fc6802075..07b603bff 100644 --- a/node/tests/ui_gateway_test.rs +++ b/node/tests/ui_gateway_test.rs @@ -142,7 +142,7 @@ fn daemon_does_not_allow_node_to_keep_his_client_alive_integration() { ("chain", Some("polygon-mainnet")), ("neighborhood-mode", Some("standard")), ("log-level", Some("trace")), - ("blockchain-service-url", Some("https://example.com")), + ("blockchain-service-url", Some("https://www.example.com")), ("data-directory", Some(&data_directory.to_str().unwrap())), ])) .unwrap(); From ac4c096f77cf8bf024a75d97a96d2af6e165c54a Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 18 Aug 2025 06:59:25 -0400 Subject: [PATCH 05/12] Formatting --- node/src/proxy_server/mod.rs | 5 ++++- node/src/stream_reader.rs | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index f130f017a..cfed990e7 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -6504,7 +6504,10 @@ mod tests { #[test] fn hostname_works() { - assert_on_hostname("https://www.example.com/folder/file.html", "www.example.com"); + assert_on_hostname( + "https://www.example.com/folder/file.html", + "www.example.com", + ); assert_on_hostname("www.example.com/index.php?arg=test", "www.example.com"); assert_on_hostname("sub.example.com/index.php?arg=test", "sub.example.com"); assert_on_hostname("1.1.1.1", "1.1.1.1"); diff --git a/node/src/stream_reader.rs b/node/src/stream_reader.rs index ea96b7d5a..cac9211a6 100644 --- a/node/src/stream_reader.rs +++ b/node/src/stream_reader.rs @@ -538,7 +538,8 @@ mod tests { Box::new(TlsDiscriminatorFactory::new()), Box::new(HttpRequestDiscriminatorFactory::new()), ]; - let http_connect_request = Vec::from("CONNECT www.example.com:443 HTTP/1.1\r\n\r\n".as_bytes()); + let http_connect_request = + Vec::from("CONNECT www.example.com:443 HTTP/1.1\r\n\r\n".as_bytes()); // Magic TLS Sauce stolen from Configuration let tls_request = Vec::from(&[0x16, 0x03, 0x01, 0x00, 0x03, 0x01, 0x02, 0x03][..]); let reader = ReadHalfWrapperMock { From 1234f5cb7fb5a279ac9f45313b843aa7668b621a Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 25 Aug 2025 23:48:58 -0400 Subject: [PATCH 06/12] Added some debug logging to track RRI --- node/src/proxy_server/mod.rs | 18 +++++++-- node/src/sub_lib/ttl_hashmap.rs | 66 +++++++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index cfed990e7..7435610da 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -267,6 +267,8 @@ impl ProxyServer { crashable: bool, is_running_in_integration_test: bool, ) -> ProxyServer { + let ps_logger = Logger::new("ProxyServer"); + let hm_logger = ps_logger.clone(); ProxyServer { subs: None, client_request_payload_factory: Box::new(ClientRequestPayloadFactoryReal::new()), @@ -281,9 +283,15 @@ impl ProxyServer { main_cryptde, alias_cryptde, crashable, - logger: Logger::new("ProxyServer"), + logger: ps_logger, route_ids_to_return_routes_first_chance: TtlHashMap::new(RETURN_ROUTE_TTL_FIRST_CHANCE), - route_ids_to_return_routes_stragglers: TtlHashMap::new(RETURN_ROUTE_TTL_STRAGGLERS), + route_ids_to_return_routes_stragglers: TtlHashMap::new_with_retire( + RETURN_ROUTE_TTL_STRAGGLERS, + move |k, _| { + debug!(hm_logger,"Return route info {} expired from straggler cache", *k); + true + }, + ), browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, @@ -968,13 +976,17 @@ impl ProxyServer { Some(rri) => { self.route_ids_to_return_routes_stragglers .insert(return_route_id, (*rri).clone()); + debug!(self.logger, "Return route info {} found in first-chance cache; graduated to straggler cache", return_route_id); Some(rri) } None => match self .route_ids_to_return_routes_stragglers .get(&return_route_id) { - Some(rri) => Some(rri), + Some(rri) => { + debug!(self.logger, "Return route info {} found in straggler cache", return_route_id); + Some(rri) + }, None => { error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); None diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index dc06d18aa..6d5d404f2 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -15,6 +15,7 @@ where last_check: RefCell, data: RefCell, Instant)>>, ttl: Duration, + retire_closure: Box bool> } impl TtlHashMap @@ -27,6 +28,19 @@ where last_check: RefCell::new(Instant::now()), data: RefCell::new(HashMap::new()), ttl, + retire_closure: Box::new(|_, _| true), + } + } + + pub fn new_with_retire(ttl: Duration, retire_closure: F) -> TtlHashMap + where + F: 'static + Fn(&K, &V) -> bool, + { + TtlHashMap { + last_check: RefCell::new(Instant::now()), + data: RefCell::new(HashMap::new()), + ttl, + retire_closure: Box::new(retire_closure), } } @@ -80,13 +94,22 @@ where }; expired.iter().for_each(|key| { - self.data.borrow_mut().remove(key); + let mut data = self.data.borrow_mut(); + match data.remove(key) { + Some((value, _)) => { + if !(self.retire_closure)(key, value.as_ref()) { + data.insert(key.clone(), (value, now)); + } + } + None => ()// already removed + } }); } } #[cfg(test)] mod tests { + use std::sync::{Arc, Mutex}; use super::*; use std::thread; @@ -145,7 +168,12 @@ mod tests { #[test] fn ttl_hashmap_does_not_remove_entry_before_it_is_expired() { - let mut subject = TtlHashMap::new(Duration::from_millis(10)); + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire( + Duration::from_millis(10), + move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } + ); subject.insert(42u32, "Hello"); subject.insert(24u32, "World"); @@ -153,22 +181,51 @@ mod tests { assert_eq!(subject.get(&42u32).unwrap().as_ref(), &"Hello"); assert_eq!(subject.get(&24u32).unwrap().as_ref(), &"World"); assert_eq!(subject.ttl(), Duration::from_millis(10)); + assert_eq!(*retire_closure_has_run.lock().unwrap(), false); } #[test] fn ttl_hashmap_get_removes_expired_entry() { - let mut subject = TtlHashMap::new(Duration::from_millis(10)); + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire( + Duration::from_millis(10), + move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } + ); subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); let result = subject.get(&42u32); assert_eq!(result, None); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); + } + + #[test] + fn ttl_hashmap_get_does_not_remove_expired_entry_if_closure_returns_false() { + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire( + Duration::from_millis(10), + move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; false } + ); + subject.insert(42u32, "Hello"); + thread::sleep(Duration::from_millis(20)); + + let result = subject.get(&42u32); + + assert_eq!(result, Some(Rc::new("Hello"))); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); } #[test] fn ttl_hashmap_insert_removes_expired_entry() { - let mut subject = TtlHashMap::new(Duration::from_millis(10)); + let retire_closure_has_run = Arc::new(Mutex::new(false)); + let retire_closure_has_run_inner = retire_closure_has_run.clone(); + let mut subject = TtlHashMap::new_with_retire( + Duration::from_millis(10), + move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } + ); subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); @@ -179,6 +236,7 @@ mod tests { subject.data.borrow().get(&24u32).unwrap().0.as_ref(), &"World" ); + assert_eq!(*retire_closure_has_run.lock().unwrap(), true); } #[test] From 70049a9a909bba7cb73dba03bf96bf7d41031867 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Tue, 26 Aug 2025 00:08:14 -0400 Subject: [PATCH 07/12] Formatting --- node/src/proxy_server/mod.rs | 12 ++++++++--- node/src/sub_lib/ttl_hashmap.rs | 38 ++++++++++++++++----------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 7435610da..23e50fb13 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -288,7 +288,10 @@ impl ProxyServer { route_ids_to_return_routes_stragglers: TtlHashMap::new_with_retire( RETURN_ROUTE_TTL_STRAGGLERS, move |k, _| { - debug!(hm_logger,"Return route info {} expired from straggler cache", *k); + debug!( + hm_logger, + "Return route info {} expired from straggler cache", *k + ); true }, ), @@ -984,9 +987,12 @@ impl ProxyServer { .get(&return_route_id) { Some(rri) => { - debug!(self.logger, "Return route info {} found in straggler cache", return_route_id); + debug!( + self.logger, + "Return route info {} found in straggler cache", return_route_id + ); Some(rri) - }, + } None => { error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); None diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index 6d5d404f2..2a519c1e2 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -15,7 +15,7 @@ where last_check: RefCell, data: RefCell, Instant)>>, ttl: Duration, - retire_closure: Box bool> + retire_closure: Box bool>, } impl TtlHashMap @@ -101,7 +101,7 @@ where data.insert(key.clone(), (value, now)); } } - None => ()// already removed + None => (), // already removed } }); } @@ -109,8 +109,8 @@ where #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; use super::*; + use std::sync::{Arc, Mutex}; use std::thread; #[test] @@ -170,10 +170,10 @@ mod tests { fn ttl_hashmap_does_not_remove_entry_before_it_is_expired() { let retire_closure_has_run = Arc::new(Mutex::new(false)); let retire_closure_has_run_inner = retire_closure_has_run.clone(); - let mut subject = TtlHashMap::new_with_retire( - Duration::from_millis(10), - move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } - ); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); subject.insert(24u32, "World"); @@ -188,10 +188,10 @@ mod tests { fn ttl_hashmap_get_removes_expired_entry() { let retire_closure_has_run = Arc::new(Mutex::new(false)); let retire_closure_has_run_inner = retire_closure_has_run.clone(); - let mut subject = TtlHashMap::new_with_retire( - Duration::from_millis(10), - move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } - ); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); @@ -205,10 +205,10 @@ mod tests { fn ttl_hashmap_get_does_not_remove_expired_entry_if_closure_returns_false() { let retire_closure_has_run = Arc::new(Mutex::new(false)); let retire_closure_has_run_inner = retire_closure_has_run.clone(); - let mut subject = TtlHashMap::new_with_retire( - Duration::from_millis(10), - move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; false } - ); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + false + }); subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); @@ -222,10 +222,10 @@ mod tests { fn ttl_hashmap_insert_removes_expired_entry() { let retire_closure_has_run = Arc::new(Mutex::new(false)); let retire_closure_has_run_inner = retire_closure_has_run.clone(); - let mut subject = TtlHashMap::new_with_retire( - Duration::from_millis(10), - move |_, _| { *(retire_closure_has_run_inner.lock().unwrap()) = true; true } - ); + let mut subject = TtlHashMap::new_with_retire(Duration::from_millis(10), move |_, _| { + *(retire_closure_has_run_inner.lock().unwrap()) = true; + true + }); subject.insert(42u32, "Hello"); thread::sleep(Duration::from_millis(20)); From 30f832bdbf694e44f296a17a59c6b506f7ac13eb Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Tue, 26 Aug 2025 07:20:48 -0400 Subject: [PATCH 08/12] Clippy --- node/src/sub_lib/ttl_hashmap.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index 2a519c1e2..1c32dea0d 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -8,6 +8,7 @@ use std::rc::Rc; use std::time::Duration; use std::time::Instant; +#[allow(clippy::type_complexity)] pub struct TtlHashMap where K: Hash + Clone, From 7c3fa52c431200aef7e7aad36af21602ce3a198b Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Wed, 27 Aug 2025 07:11:45 -0400 Subject: [PATCH 09/12] Made RRI logs more consistent and increased straggler timeout to 30s --- node/src/proxy_server/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 23e50fb13..05c4b9b95 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -59,7 +59,7 @@ use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; pub const RETURN_ROUTE_TTL_FIRST_CHANCE: Duration = Duration::from_secs(120); -pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(5); +pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(30); pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30); @@ -290,7 +290,7 @@ impl ProxyServer { move |k, _| { debug!( hm_logger, - "Return route info {} expired from straggler cache", *k + "Return route info RRI{} expired from straggler cache", *k ); true }, @@ -763,7 +763,7 @@ impl ProxyServer { }; debug!( args.logger, - "Adding expectant return route info: {:?}", return_route_info + "Adding expectant return route info: RRI{:?}", return_route_info ); add_return_route_sub .try_send(return_route_info) @@ -979,7 +979,7 @@ impl ProxyServer { Some(rri) => { self.route_ids_to_return_routes_stragglers .insert(return_route_id, (*rri).clone()); - debug!(self.logger, "Return route info {} found in first-chance cache; graduated to straggler cache", return_route_id); + debug!(self.logger, "Return route info RRI{} found in first-chance cache; graduated to straggler cache", return_route_id); Some(rri) } None => match self @@ -989,12 +989,12 @@ impl ProxyServer { Some(rri) => { debug!( self.logger, - "Return route info {} found in straggler cache", return_route_id + "Return route info RRI{} found in straggler cache", return_route_id ); Some(rri) } None => { - error!(self.logger, "Can't report services consumed: received response with bogus return-route ID {} for {}. Ignoring", return_route_id, source); + error!(self.logger, "Can't report services consumed: received response with bogus return-route ID RRI{} for {}. Ignoring", return_route_id, source); None } }, @@ -1687,7 +1687,7 @@ mod tests { assert!( result.is_none(), - "Expected no return route info, but got: {:?}", + "Expected no return route info, but got: RRI{:?}", result ); } From f9cab9a822ed9e33a9a8ae2a220d81f388b81cf4 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Thu, 28 Aug 2025 08:02:13 -0400 Subject: [PATCH 10/12] Typo corrections --- node/src/proxy_server/mod.rs | 3 ++- node/src/sub_lib/http_packet_framer.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 05c4b9b95..6c775ef27 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -146,6 +146,7 @@ impl Handler for ProxyServer { fn handle(&mut self, msg: AddReturnRouteMessage, _ctx: &mut Self::Context) -> Self::Result { self.route_ids_to_return_routes_first_chance .insert(msg.return_route_id, msg); + debug!(self.logger,"Added return route info RRI{} to first-chance cache",msg.return_route_id); } } @@ -1210,7 +1211,7 @@ impl IBCDHelper for IBCDHelperReal { } let args = TransmitToHopperArgs::new(proxy, payload, client_addr, timestamp, retire_stream_key); - let add_return_route_sub = proxy.out_subs("ProxysServer").add_return_route.clone(); + let add_return_route_sub = proxy.out_subs("ProxyServer").add_return_route.clone(); let pld = &args.payload; if let Some(route_query_response) = proxy.stream_key_routes.get(&pld.stream_key) { debug!( diff --git a/node/src/sub_lib/http_packet_framer.rs b/node/src/sub_lib/http_packet_framer.rs index 29685f46b..28d4f895f 100644 --- a/node/src/sub_lib/http_packet_framer.rs +++ b/node/src/sub_lib/http_packet_framer.rs @@ -104,7 +104,7 @@ impl HttpPacketFramer { lines: Vec::new(), }, start_finder, - logger: Logger::new("HttpRequestFramer"), + logger: Logger::new("HttpPacketFramer"), } } From 5a89e1a86cf4b2f62588a264c22dd95f563e107b Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Sun, 31 Aug 2025 20:24:50 -0400 Subject: [PATCH 11/12] Interim commit --- node/src/hopper/live_cores_package.rs | 2 +- node/src/neighborhood/mod.rs | 75 +++------------------------ node/src/proxy_server/mod.rs | 72 ++++++++++++------------- node/src/sub_lib/neighborhood.rs | 2 +- node/src/sub_lib/route.rs | 41 +++++++-------- node/src/test_utils/mod.rs | 6 +-- 6 files changed, 63 insertions(+), 135 deletions(-) diff --git a/node/src/hopper/live_cores_package.rs b/node/src/hopper/live_cores_package.rs index 9ec210678..113a00ea6 100644 --- a/node/src/hopper/live_cores_package.rs +++ b/node/src/hopper/live_cores_package.rs @@ -310,10 +310,10 @@ mod tests { ), cryptde, Some(paying_wallet.clone()), - 1234, Some(contract_address), ) .unwrap(); + let mut route = route.set_return_route_id(cryptde, 1234); route.shift(&relay_cryptde).unwrap(); let subject = LiveCoresPackage::new(route.clone(), encrypted_payload.clone()); diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index bb85ea8ba..72e30ed06 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -980,7 +980,6 @@ impl Neighborhood { } fn zero_hop_route_response(&mut self) -> RouteQueryResponse { - let return_route_id = self.advance_return_route_id(); let route = Route::round_trip( RouteSegment::new( vec![self.cryptde.public_key(), self.cryptde.public_key()], @@ -992,7 +991,6 @@ impl Neighborhood { ), self.cryptde, None, - return_route_id, None, ) .expect("Couldn't create route"); @@ -1001,7 +999,6 @@ impl Neighborhood { expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - return_route_id, ), } } @@ -1067,21 +1064,18 @@ impl Neighborhood { Err(e) => return Err(e), }; - let return_route_id = self.advance_return_route_id(); Ok(RouteQueryResponse { route: Route::round_trip( over, back, self.cryptde, self.consuming_wallet_opt.clone(), - return_route_id, Some(self.chain.rec().contract), ) .expect("Internal error: bad route"), expected_services: ExpectedServices::RoundTrip( expected_request_services, expected_response_services, - return_route_id, ), }) } @@ -1324,6 +1318,7 @@ impl Neighborhood { } fn advance_return_route_id(&mut self) -> u32 { + todo!("Move this logic into the ProxyServer"); let return_route_id = self.next_return_route_id; self.next_return_route_id = return_route_id.wrapping_add(1); return_route_id @@ -3382,10 +3377,10 @@ mod tests { ), cryptde, None, - 0, None, ) - .unwrap(), + .unwrap() + .set_return_route_id(cryptde, 0), expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, @@ -3403,7 +3398,6 @@ mod tests { ), ExpectedService::Nothing, ], - 0, ), }; assert_eq!(expected_response, result); @@ -3455,39 +3449,18 @@ mod tests { ), cryptde, None, - 0, None, ) - .unwrap(), + .unwrap() + .set_return_route_id(cryptde, 0), expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - 0, ), }; assert_eq!(result, expected_response); } - #[test] - fn zero_hop_routing_handles_return_route_id_properly() { - let mut subject = make_standard_subject(); - let result0 = subject.zero_hop_route_response(); - let result1 = subject.zero_hop_route_response(); - - let return_route_id_0 = match result0.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - - let return_route_id_1 = match result1.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - - assert_eq!(return_route_id_0, 0); - assert_eq!(return_route_id_1, 1); - } - /* Database: @@ -3546,10 +3519,10 @@ mod tests { segment(&[r, q, p], &Component::ProxyServer), cryptde, consuming_wallet_opt, - 0, Some(contract_address), ) - .unwrap(), + .unwrap() + .set_return_route_id(cryptde, 0), expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, @@ -3577,7 +3550,6 @@ mod tests { ), ExpectedService::Nothing, ], - 0, ), }; assert_eq!(expected_response, result); @@ -3619,37 +3591,6 @@ mod tests { Tests will be written from the viewpoint of O. */ - #[test] - fn return_route_ids_increase() { - let cryptde = main_cryptde(); - let system = System::new("return_route_ids_increase"); - let (_, _, _, mut subject) = make_o_r_e_subject(); - subject.min_hops = Hops::TwoHops; - let addr: Addr = subject.start(); - let sub: Recipient = addr.recipient::(); - - let data_route_0 = sub.send(RouteQueryMessage::data_indefinite_route_request(None, 2000)); - let data_route_1 = sub.send(RouteQueryMessage::data_indefinite_route_request(None, 3000)); - - System::current().stop_with_code(0); - system.run(); - let result_0 = data_route_0.wait().unwrap().unwrap(); - let result_1 = data_route_1.wait().unwrap().unwrap(); - let juicy_parts = |result: RouteQueryResponse| { - let last_element = result.route.hops.last().unwrap(); - let last_element_dec = cryptde.decode(last_element).unwrap(); - let network_return_route_id: u32 = - serde_cbor::de::from_slice(last_element_dec.as_slice()).unwrap(); - let metadata_return_route_id = match result.expected_services { - ExpectedServices::RoundTrip(_, _, id) => id, - _ => panic!("expected RoundTrip got OneWay"), - }; - (network_return_route_id, metadata_return_route_id) - }; - assert_eq!(juicy_parts(result_0), (0, 0)); - assert_eq!(juicy_parts(result_1), (1, 1)); - } - #[test] fn handle_neighborhood_graph_message_works() { let test_name = "handle_neighborhood_graph_message_works"; @@ -7169,7 +7110,7 @@ mod tests { let (over, back) = match response.expected_services { ExpectedServices::OneWay(_) => panic!("Expecting RoundTrip"), - ExpectedServices::RoundTrip(o, b, _) => (o[1].clone(), b[1].clone()), + ExpectedServices::RoundTrip(o, b) => (o[1].clone(), b[1].clone()), }; let extract_key = |es: ExpectedService| match es { ExpectedService::Routing(pk, _, _) => pk, diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 6c775ef27..95ad79f46 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -7,6 +7,7 @@ pub mod server_impersonator_http; pub mod server_impersonator_tls; pub mod tls_protocol_pack; +use std::cell::Cell; use crate::proxy_server::client_request_payload_factory::{ ClientRequestPayloadFactory, ClientRequestPayloadFactoryReal, }; @@ -59,7 +60,7 @@ use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; pub const RETURN_ROUTE_TTL_FIRST_CHANCE: Duration = Duration::from_secs(120); -pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(30); +pub const RETURN_ROUTE_TTL_STRAGGLERS: Duration = Duration::from_secs(5); pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(30); @@ -98,6 +99,7 @@ pub struct ProxyServer { browser_proxy_sequence_offset: bool, inbound_client_data_helper_opt: Option>, stream_key_purge_delay: Duration, + next_return_route_id: Cell, is_running_in_integration_test: bool, } @@ -144,9 +146,10 @@ impl Handler for ProxyServer { type Result = (); fn handle(&mut self, msg: AddReturnRouteMessage, _ctx: &mut Self::Context) -> Self::Result { + let return_route_id = msg.return_route_id; self.route_ids_to_return_routes_first_chance .insert(msg.return_route_id, msg); - debug!(self.logger,"Added return route info RRI{} to first-chance cache",msg.return_route_id); + debug!(self.logger,"Added return route info RRI{} to first-chance cache", return_route_id); } } @@ -299,6 +302,7 @@ impl ProxyServer { browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, + next_return_route_id: Cell::new(1), is_running_in_integration_test, } } @@ -749,13 +753,21 @@ impl ProxyServer { } } + fn get_next_return_route_id(&self) -> u32 { + let return_route_id = self.next_return_route_id.get(); + *(self.next_return_route_id.get_mut()) = return_route_id.wrapping_add(1); + return_route_id + } + fn try_transmit_to_hopper( + &self, args: TransmitToHopperArgs, add_return_route_sub: Recipient, route_query_response: RouteQueryResponse, ) -> Result<(), String> { match route_query_response.expected_services { - ExpectedServices::RoundTrip(over, back, return_route_id) => { + ExpectedServices::RoundTrip(over, back) => { + let return_route_id = self.get_next_return_route_id(); let return_route_info = AddReturnRouteMessage { return_route_id, expected_services: back, @@ -959,7 +971,7 @@ impl ProxyServer { mut_remaining_route .shift(self.main_cryptde) .expect("Internal error: remaining route in ProxyServer with no hops"); - match mut_remaining_route.id(self.main_cryptde) { + match mut_remaining_route.return_route_id(self.main_cryptde) { Ok(rri) => Some(rri), Err(e) => { error!(self.logger, "Can't report services consumed: {}", e); @@ -1109,7 +1121,7 @@ impl RouteQueryResponseResolver for RouteQueryResponseResolverReal { let stream_key = args.payload.stream_key; let result = match route_result_opt { Ok(Some(route_query_response)) => { - match ProxyServer::try_transmit_to_hopper( + match self.try_transmit_to_hopper( args, add_return_route_sub, route_query_response.clone(), @@ -1768,7 +1780,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let (proxy_server_mock, _, proxy_server_recording_arc) = make_recorder(); @@ -1872,7 +1883,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let route = Route { hops: vec![] }; @@ -2494,7 +2504,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2594,10 +2603,10 @@ mod tests { ), main_cryptde, Some(consuming_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap(); + .unwrap() + .set_return_route_id(main_cryptde, 1234); let (neighborhood_mock, _, neighborhood_recording_arc) = make_recorder(); let neighborhood_mock = neighborhood_mock.route_query_response(Some(RouteQueryResponse { route: route.clone(), @@ -2614,7 +2623,6 @@ mod tests { ExpectedService::Nothing, ExpectedService::Exit(PublicKey::new(&[3]), earning_wallet, rate_pack(102)), ], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -2700,7 +2708,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![expected_service.clone()], vec![expected_service], - 123, ), }); let (neighborhood_mock, _, _) = make_recorder(); @@ -2869,7 +2876,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), }; let (hopper_mock, hopper_awaiter, hopper_recording_arc) = make_recorder(); @@ -2988,7 +2994,6 @@ mod tests { ), ExpectedService::Nothing, ], - 0, ), }; let source_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -3079,7 +3084,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing], vec![ExpectedService::Nothing], - 0, ), }; let source_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -3161,7 +3165,6 @@ mod tests { rate_pack(3), )], vec![], - 0, ); let neighborhood_mock = neighborhood_mock.route_query_response(Some(route_query_response.clone())); @@ -3458,11 +3461,11 @@ mod tests { RouteSegment::new(vec![public_key, public_key], Component::ProxyServer), cryptde, None, - 1234, None, ) - .unwrap(), - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + .unwrap() + .set_return_route_id(cryptde, 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }; let neighborhood_mock = neighborhood_mock.route_query_response(Some(route_query_response)); let dispatcher = Recorder::new(); @@ -3560,7 +3563,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let stream_key = StreamKey::make_meaningless_stream_key(); @@ -3646,7 +3648,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let stream_key = StreamKey::make_meaningless_stream_key(); @@ -3731,7 +3732,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let client_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -3985,7 +3985,7 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -4103,7 +4103,7 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -4206,7 +4206,7 @@ mod tests { stream_key.clone(), RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -5370,7 +5370,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( expected_services.clone(), expected_services.clone(), - 1234, ), }; let neighborhood_mock = neighborhood_mock @@ -5540,7 +5539,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( expected_services.clone(), expected_services.clone(), - 1234, ), }; let neighborhood_mock = neighborhood_mock @@ -5904,7 +5902,7 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -5957,7 +5955,7 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); let affected_route = Route::round_trip( @@ -5971,10 +5969,10 @@ mod tests { ), main_cryptde(), Some(make_paying_wallet(b"consuming")), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap(); + .unwrap() + .set_return_route_id(main_cryptde(), 1234); let affected_expected_services = vec![ExpectedService::Exit( affected_cryptde.public_key().clone(), make_paying_wallet(b"1234"), @@ -5987,7 +5985,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( affected_expected_services, vec![], - 1234, ), }, ); @@ -6082,7 +6079,7 @@ mod tests { unaffected_stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 1234), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); let affected_route = Route::round_trip( @@ -6096,10 +6093,10 @@ mod tests { ), main_cryptde(), Some(make_paying_wallet(b"consuming")), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap(); + .unwrap() + .set_return_route_id(main_cryptde(), 1234); let affected_expected_services = vec![ExpectedService::Exit( affected_cryptde.public_key().clone(), make_paying_wallet(b"1234"), @@ -6112,7 +6109,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( affected_expected_services, vec![], - 1234, ), }, ); @@ -6221,7 +6217,7 @@ mod tests { stream_key, RouteQueryResponse { route: Route { hops: vec![] }, - expected_services: ExpectedServices::RoundTrip(vec![], vec![], 0), + expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); subject @@ -6395,7 +6391,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); @@ -6464,7 +6459,6 @@ mod tests { expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], - 1234, ), })); let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); diff --git a/node/src/sub_lib/neighborhood.rs b/node/src/sub_lib/neighborhood.rs index 79623cda3..f8e93f951 100644 --- a/node/src/sub_lib/neighborhood.rs +++ b/node/src/sub_lib/neighborhood.rs @@ -505,7 +505,7 @@ pub enum ExpectedService { #[derive(Clone, Debug, PartialEq, Eq)] pub enum ExpectedServices { OneWay(Vec), - RoundTrip(Vec, Vec, u32), + RoundTrip(Vec, Vec), } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/node/src/sub_lib/route.rs b/node/src/sub_lib/route.rs index ecbd0261e..83ebc03d8 100644 --- a/node/src/sub_lib/route.rs +++ b/node/src/sub_lib/route.rs @@ -32,7 +32,6 @@ impl Route { cryptde, None, None, - None, ) } @@ -47,7 +46,6 @@ impl Route { None, cryptde, consuming_wallet, - None, contract_address, ) } @@ -57,7 +55,6 @@ impl Route { route_segment_back: RouteSegment, cryptde: &dyn CryptDE, // Must be the CryptDE of the originating Node: used to encrypt return_route_id. consuming_wallet: Option, - return_route_id: u32, contract_address: Option
, ) -> Result { Self::construct( @@ -65,12 +62,11 @@ impl Route { Some(route_segment_back), cryptde, consuming_wallet, - Some(return_route_id), contract_address, ) } - pub fn id(&self, cryptde: &dyn CryptDE) -> Result { + pub fn return_route_id(&self, cryptde: &dyn CryptDE) -> Result { if let Some(first) = self.hops.first() { match decodex(cryptde, first) { Ok(n) => Ok(n), @@ -81,6 +77,12 @@ impl Route { } } + pub fn set_return_route_id(mut self, cryptde: &dyn CryptDE, return_route_id: u32) -> Self { + let return_route_id_enc = Self::encrypt_return_route_id(return_route_id, cryptde); + self.hops.push(return_route_id_enc); + self + } + // This cryptde must be the CryptDE of the next hop to come off the Route. pub fn next_hop(&self, cryptde: &dyn CryptDE) -> Result { match self.hops.first() { @@ -148,7 +150,6 @@ impl Route { back: Option, cryptde: &dyn CryptDE, consuming_wallet: Option, - return_route_id_opt: Option, contract_address: Option
, ) -> Result { if let Some(error) = Route::validate_route_segments(&over, &back) { @@ -176,7 +177,6 @@ impl Route { Route::hops_to_route( hops[0..].to_vec(), &over.keys[0], - return_route_id_opt, cryptde, ) } @@ -295,7 +295,6 @@ impl Route { fn hops_to_route( hops: Vec, top_hop_key: &PublicKey, - return_route_id_opt: Option, cryptde: &dyn CryptDE, ) -> Result { let mut hops_enc: Vec = Vec::new(); @@ -307,10 +306,6 @@ impl Route { }); hop_key = &data_hop.public_key; } - if let Some(return_route_id) = return_route_id_opt { - let return_route_id_enc = Self::encrypt_return_route_id(return_route_id, cryptde); - hops_enc.push(return_route_id_enc); - } Ok(Route { hops: hops_enc }) } @@ -353,38 +348,38 @@ mod tests { use serde_cbor; #[test] - fn id_decodes_return_route_id() { + fn return_route_id_works() { let cryptde = main_cryptde(); let subject = Route { hops: vec![Route::encrypt_return_route_id(42, cryptde)], }; - assert_eq!(subject.id(cryptde), Ok(42)); + assert_eq!(subject.return_route_id(cryptde), Ok(42)); } #[test] - fn id_returns_empty_route_error_when_the_route_is_empty() { + fn return_route_id_returns_empty_route_error_when_the_route_is_empty() { let cryptde = main_cryptde(); let subject = Route { hops: vec![] }; assert_eq!( - subject.id(cryptde), + subject.return_route_id(cryptde), Err("Response route did not contain a return route ID".to_string()) ); } #[test] #[should_panic(expected = "Could not decrypt with ebe5f9a0e2 data beginning with ebe5f9a0e1")] - fn id_returns_error_when_the_id_fails_to_decrypt() { + fn return_route_id_returns_error_when_the_id_fails_to_decrypt() { let cryptde1 = CryptDENull::from(&PublicKey::new(b"key a"), TEST_DEFAULT_CHAIN); let cryptde2 = CryptDENull::from(&PublicKey::new(b"key b"), TEST_DEFAULT_CHAIN); let subject = Route { hops: vec![Route::encrypt_return_route_id(42, &cryptde1)], }; - let _ = subject.id(&cryptde2); + let _ = subject.return_route_id(&cryptde2); } #[test] @@ -420,7 +415,6 @@ mod tests { RouteSegment::new(vec![&c_key, &d_key], Component::ProxyServer), cryptde, Some(paying_wallet.clone()), - 0, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .err() @@ -472,10 +466,10 @@ mod tests { RouteSegment::new(vec![&d_key, &e_key, &f_key, &a_key], Component::ProxyServer), cryptde, Some(paying_wallet.clone()), - return_route_id, Some(contract_address.clone()), ) .unwrap(); + let subject = subject.set_return_route_id(cryptde, return_route_id); assert_eq!( subject.hops[0], @@ -745,10 +739,10 @@ mod tests { RouteSegment::new(vec![&key2, &key1], Component::ProxyServer), cryptde, Some(paying_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); + let original = original.set_return_route_id(cryptde, 1234); let serialized = serde_cbor::ser::to_vec(&original).unwrap(); @@ -794,16 +788,17 @@ Encrypted with 0x03040506: LiveHop { public_key: 0x, payer: Some(Payer { wallet: let key1 = PublicKey::new(&[1, 2, 3, 4]); let key2 = PublicKey::new(&[2, 3, 4, 5]); let key3 = PublicKey::new(&[3, 4, 5, 6]); + let cryptde = CryptDENull::from(&key1, TEST_DEFAULT_CHAIN); let paying_wallet = make_paying_wallet(b"wallet"); let subject = Route::round_trip( RouteSegment::new(vec![&key1, &key2, &key3], Component::ProxyClient), RouteSegment::new(vec![&key3, &key2, &key1], Component::ProxyServer), - &CryptDENull::from(&key1, TEST_DEFAULT_CHAIN), + &cryptde, Some(paying_wallet), - 1234, Some(TEST_DEFAULT_CHAIN.rec().contract), ) .unwrap(); + let subject = subject.set_return_route_id(&cryptde, 1234); let result = subject.to_string(vec![ &CryptDENull::from(&key1, TEST_DEFAULT_CHAIN), diff --git a/node/src/test_utils/mod.rs b/node/src/test_utils/mod.rs index 595bf3699..1589ea5cc 100644 --- a/node/src/test_utils/mod.rs +++ b/node/src/test_utils/mod.rs @@ -240,14 +240,13 @@ pub fn zero_hop_route_response( RouteSegment::new(vec![public_key, public_key], Component::ProxyServer), cryptde, None, - 0, None, ) - .unwrap(), + .unwrap() + .set_return_route_id(cryptde, 0), expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], - 0, ), } } @@ -1268,7 +1267,6 @@ mod tests { ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing,], vec![ExpectedService::Nothing, ExpectedService::Nothing,], - 0 ) ); } From 8f774b66b071e215508faea2cdcd5d291da4d07a Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Fri, 5 Sep 2025 21:20:37 -0400 Subject: [PATCH 12/12] Tests pass --- node/src/hopper/live_cores_package.rs | 2 +- node/src/neighborhood/mod.rs | 68 ++------ node/src/proxy_server/mod.rs | 223 ++++++++++++++++++-------- node/src/sub_lib/route.rs | 8 +- 4 files changed, 173 insertions(+), 128 deletions(-) diff --git a/node/src/hopper/live_cores_package.rs b/node/src/hopper/live_cores_package.rs index 113a00ea6..8ed830b94 100644 --- a/node/src/hopper/live_cores_package.rs +++ b/node/src/hopper/live_cores_package.rs @@ -302,7 +302,7 @@ mod tests { let encrypted_payload = encodex(cryptde, &first_stop_key, &payload).unwrap(); let paying_wallet = make_paying_wallet(b"wallet"); let contract_address = TEST_DEFAULT_CHAIN.rec().contract; - let mut route = Route::round_trip( + let route = Route::round_trip( RouteSegment::new(vec![&relay_key, &first_stop_key], Component::Neighborhood), RouteSegment::new( vec![&first_stop_key, &relay_key, &second_stop_key], diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index 72e30ed06..f81911c0c 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -105,7 +105,6 @@ pub struct Neighborhood { mode: NeighborhoodModeLight, min_hops: Hops, db_patch_size: u8, - next_return_route_id: u32, overall_connection_status: OverallConnectionStatus, chain: Chain, crashable: bool, @@ -431,7 +430,6 @@ impl Neighborhood { mode, min_hops, db_patch_size, - next_return_route_id: 0, overall_connection_status, chain: config.blockchain_bridge_config.chain, crashable: config.crash_point == CrashPoint::Message, @@ -1317,13 +1315,6 @@ impl Neighborhood { } } - fn advance_return_route_id(&mut self) -> u32 { - todo!("Move this logic into the ProxyServer"); - let return_route_id = self.next_return_route_id; - self.next_return_route_id = return_route_id.wrapping_add(1); - return_route_id - } - pub fn find_exit_locations<'a>( &'a self, source: &'a PublicKey, @@ -3379,8 +3370,7 @@ mod tests { None, None, ) - .unwrap() - .set_return_route_id(cryptde, 0), + .unwrap(), expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, @@ -3451,8 +3441,7 @@ mod tests { None, None, ) - .unwrap() - .set_return_route_id(cryptde, 0), + .unwrap(), expected_services: ExpectedServices::RoundTrip( vec![ExpectedService::Nothing, ExpectedService::Nothing], vec![ExpectedService::Nothing, ExpectedService::Nothing], @@ -3521,8 +3510,7 @@ mod tests { consuming_wallet_opt, Some(contract_address), ) - .unwrap() - .set_return_route_id(cryptde, 0), + .unwrap(), expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, @@ -3552,7 +3540,7 @@ mod tests { ], ), }; - assert_eq!(expected_response, result); + assert_eq!(result, expected_response); } #[test] @@ -3571,18 +3559,6 @@ mod tests { ); } - #[test] - fn next_return_route_id_wraps_around() { - let mut subject = make_standard_subject(); - subject.next_return_route_id = 0xFFFFFFFF; - - let end = subject.advance_return_route_id(); - let beginning = subject.advance_return_route_id(); - - assert_eq!(end, 0xFFFFFFFF); - assert_eq!(beginning, 0x00000000); - } - /* Database: @@ -4474,7 +4450,6 @@ mod tests { error_expectation, "Cannot make multi_hop with unknown neighbor" ); - assert_eq!(subject.next_return_route_id, 0); } #[test] @@ -6958,7 +6933,7 @@ mod tests { let hops = result.clone().unwrap().route.hops; let actual_keys: Vec = match hops.as_slice() { - [hop, exit, hop_back, origin, empty, _accounting] => vec![ + [hop, exit, hop_back, origin, empty] => vec![ decodex::(main_cryptde(), hop) .expect("hop") .public_key, @@ -6975,7 +6950,11 @@ mod tests { .expect("empty") .public_key, ], - l => panic!("our match is wrong, real size is {}, {:?}", l.len(), l), + l => panic!( + "our match is wrong, real size is {} instead of 5, {:?}", + l.len(), + l + ), }; let expected_public_keys = vec![ next_door_neighbor.public_key().clone(), @@ -7013,24 +6992,23 @@ mod tests { } }; /* - This is how the route_hops vector looks like: [C1, C2, ..., C(nodes_count), ..., C2, C1, accounting] + This is how the route_hops vector looks like: [C1, C2, ..., C(nodes_count), ..., C2, C1] Let's consider for 3-hop route ==> Nodes Count --> 4 Route Length --> 8 - Route Hops --> [C1, C2, C3, C4, C3, C2, C1, accounting] + Route Hops --> [C1, C2, C3, C4, C3, C2, C1] Over Route --> [C1, C2, C3] Back Route --> [C4, C3, C2, C1] */ - let mut route_hops = result.unwrap().route.hops; + let route_hops = result.unwrap().route.hops; let route_length = route_hops.len(); - let _accounting = route_hops.pop(); let over_route = &route_hops[..hops]; let back_route = &route_hops[hops..]; let over_cryptdes = cryptdes_from_node_records(&nodes[..hops]); let mut back_cryptdes = cryptdes_from_node_records(&nodes); back_cryptdes.reverse(); - assert_eq!(route_length, 2 * nodes_count); + assert_eq!(route_length, 2 * nodes_count - 1); assert_hops(over_cryptdes, over_route); assert_hops(back_cryptdes, back_route); } @@ -7504,24 +7482,6 @@ mod tests { subject } - fn make_o_r_e_subject() -> (NodeRecord, NodeRecord, NodeRecord, Neighborhood) { - let mut subject = make_standard_subject(); - let o = &subject.neighborhood_database.root().clone(); - let r = &make_node_record(4567, false); - let e = &make_node_record(5678, false); - { - let db = &mut subject.neighborhood_database; - db.add_node(r.clone()).unwrap(); - db.add_node(e.clone()).unwrap(); - let mut dual_edge = |a: &NodeRecord, b: &NodeRecord| { - db.add_arbitrary_full_neighbor(a.public_key(), b.public_key()) - }; - dual_edge(o, r); - dual_edge(r, e); - } - (o.clone(), r.clone(), e.clone(), subject) - } - fn segment(nodes: &[&NodeRecord], component: &Component) -> RouteSegment { RouteSegment::new( nodes.into_iter().map(|n| n.public_key()).collect(), diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 95ad79f46..301ce98ec 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -7,7 +7,6 @@ pub mod server_impersonator_http; pub mod server_impersonator_tls; pub mod tls_protocol_pack; -use std::cell::Cell; use crate::proxy_server::client_request_payload_factory::{ ClientRequestPayloadFactory, ClientRequestPayloadFactoryReal, }; @@ -51,6 +50,7 @@ use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::MutabilityConflictHelper; use regex::Regex; +use std::cell::Cell; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::rc::Rc; @@ -149,7 +149,10 @@ impl Handler for ProxyServer { let return_route_id = msg.return_route_id; self.route_ids_to_return_routes_first_chance .insert(msg.return_route_id, msg); - debug!(self.logger,"Added return route info RRI{} to first-chance cache", return_route_id); + debug!( + self.logger, + "Added return route info RRI{} to first-chance cache", return_route_id + ); } } @@ -755,28 +758,27 @@ impl ProxyServer { fn get_next_return_route_id(&self) -> u32 { let return_route_id = self.next_return_route_id.get(); - *(self.next_return_route_id.get_mut()) = return_route_id.wrapping_add(1); + self.next_return_route_id + .set(return_route_id.wrapping_add(1)); return_route_id } fn try_transmit_to_hopper( - &self, args: TransmitToHopperArgs, add_return_route_sub: Recipient, route_query_response: RouteQueryResponse, ) -> Result<(), String> { match route_query_response.expected_services { ExpectedServices::RoundTrip(over, back) => { - let return_route_id = self.get_next_return_route_id(); let return_route_info = AddReturnRouteMessage { - return_route_id, + return_route_id: args.return_route_id, expected_services: back, protocol: args.payload.protocol, hostname_opt: args.payload.target_hostname.clone(), }; debug!( args.logger, - "Adding expectant return route info: RRI{:?}", return_route_info + "Adding expectant return route info: RRI{}", args.return_route_id ); add_return_route_sub .try_send(return_route_info) @@ -882,9 +884,11 @@ impl ProxyServer { let payload = args.payload; let payload_size = payload.sequenced_packet.data.len(); let stream_key = payload.stream_key; + let route_with_return_route_id = + route.set_return_route_id(args.main_cryptde, args.return_route_id); let pkg = IncipientCoresPackage::new( args.main_cryptde, - route, + route_with_return_route_id, payload.into(), &payload_destination_key, ) @@ -1121,7 +1125,7 @@ impl RouteQueryResponseResolver for RouteQueryResponseResolverReal { let stream_key = args.payload.stream_key; let result = match route_result_opt { Ok(Some(route_query_response)) => { - match self.try_transmit_to_hopper( + match ProxyServer::try_transmit_to_hopper( args, add_return_route_sub, route_query_response.clone(), @@ -1167,6 +1171,7 @@ impl IBCDHelperReal { } } } + impl IBCDHelper for IBCDHelperReal { fn handle_normal_client_data( &self, @@ -1290,6 +1295,7 @@ impl IBCDHelper for IBCDHelperReal { pub struct TransmitToHopperArgs { pub main_cryptde: &'static dyn CryptDE, pub payload: ClientRequestPayload_0v1, + pub return_route_id: u32, pub client_addr: SocketAddr, pub timestamp: SystemTime, pub is_decentralized: bool, @@ -1318,9 +1324,11 @@ impl TransmitToHopperArgs { } else { None }; + let return_route_id = proxy_server.get_next_return_route_id(); Self { main_cryptde: proxy_server.main_cryptde, payload, + return_route_id, client_addr, timestamp, logger: proxy_server.logger.clone(), @@ -1419,6 +1427,7 @@ impl Hostname { #[cfg(test)] mod tests { use super::*; + use crate::blockchain::bip32::Bip32EncryptionKeyProvider; use crate::match_every_type_id; use crate::proxy_server::protocol_pack::ServerImpersonator; use crate::proxy_server::server_impersonator_http::ServerImpersonatorHttp; @@ -1811,7 +1820,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -1928,7 +1937,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -2391,7 +2400,7 @@ mod tests { hopper_recording.get_record::(0), &IncipientCoresPackage::new( main_cryptde, - expected_route.route, + expected_route.route.set_return_route_id(main_cryptde, 1), MessageType::ClientRequest(VersionedData::new( &crate::sub_lib::migrations::client_request_payload::MIGRATIONS, &ClientRequestPayload_0v1 { @@ -2471,7 +2480,7 @@ mod tests { hopper_recording.get_record::(0), &IncipientCoresPackage::new( main_cryptde, - expected_route.route, + expected_route.route.set_return_route_id(main_cryptde, 1), MessageType::ClientRequest(VersionedData::new( &crate::sub_lib::migrations::client_request_payload::MIGRATIONS, &ClientRequestPayload_0v1 { @@ -2534,7 +2543,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -2652,7 +2661,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &payload_destination_key, ) @@ -2871,8 +2880,10 @@ mod tests { let alias_cryptde = alias_cryptde(); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let destination_key = PublicKey::from(&b"our destination"[..]); + let route = Route { hops: vec![] }; + let route_with_rrid = route.clone().set_return_route_id(main_cryptde, 4444); let route_query_response = RouteQueryResponse { - route: Route { hops: vec![] }, + route, expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], @@ -2905,7 +2916,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - Route { hops: vec![] }, + route_with_rrid, expected_payload.into(), &destination_key, ) @@ -2926,6 +2937,7 @@ mod tests { subject .stream_key_routes .insert(stream_key, route_query_response); + subject.next_return_route_id = Cell::new(4444); let subject_addr: Addr = subject.start(); let peer_actors = peer_actors_builder().hopper(hopper_mock).build(); subject_addr.try_send(BindMessage { peer_actors }).unwrap(); @@ -2945,52 +2957,84 @@ mod tests { fn proxy_server_sends_message_to_accountant_about_all_services_consumed_on_the_route_over() { let cryptde = main_cryptde(); let now = SystemTime::now(); - let exit_earning_wallet = make_wallet("exit earning wallet"); - let route_1_earning_wallet = make_wallet("route 1 earning wallet"); - let route_2_earning_wallet = make_wallet("route 2 earning wallet"); + let routing_node_1_public_key = PublicKey::new(&[1]); + let routing_node_2_public_key = PublicKey::new(&[2]); + let exit_node_public_key = PublicKey::new(&[3]); + let key_bytes = b"__originating consuming wallet__"; + let keypair = Bip32EncryptionKeyProvider::from_raw_secret(key_bytes).unwrap(); + let originating_consuming_wallet = Wallet::from(keypair); + let routing_node_1_earning_wallet = make_wallet("route 1 earning wallet"); + let routing_node_2_earning_wallet = make_wallet("route 2 earning wallet"); + let exit_node_earning_wallet = make_wallet("exit earning wallet"); + let routing_node_1_rate_pack = rate_pack(101); + let routing_node_2_rate_pack = rate_pack(102); + let exit_node_rate_pack = rate_pack(103); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let (accountant_mock, _, accountant_recording_arc) = make_recorder(); let (hopper_mock, _, hopper_recording_arc) = make_recorder(); let (proxy_server_mock, _, proxy_server_recording_arc) = make_recorder(); - let routing_node_1_rate_pack = rate_pack(101); - let routing_node_2_rate_pack = rate_pack(102); - let exit_node_rate_pack = rate_pack(103); + let over_route_segment = RouteSegment::new( + vec![ + &cryptde.public_key(), + &routing_node_1_public_key, + &routing_node_2_public_key, + &exit_node_public_key, + ], + Component::ProxyClient, + ); + let back_route_segment = RouteSegment::new( + vec![ + &exit_node_public_key, + &routing_node_2_public_key, + &routing_node_1_public_key, + &cryptde.public_key(), + ], + Component::ProxyServer, + ); + let route = Route::round_trip( + over_route_segment, + back_route_segment, + cryptde, + Some(originating_consuming_wallet), + Some(TEST_DEFAULT_CHAIN.rec().contract), + ) + .unwrap(); let route_query_response = RouteQueryResponse { - route: make_meaningless_route(), + route, expected_services: ExpectedServices::RoundTrip( vec![ ExpectedService::Nothing, ExpectedService::Routing( - PublicKey::new(&[1]), - route_1_earning_wallet.clone(), - routing_node_1_rate_pack, + routing_node_1_public_key.clone(), + routing_node_1_earning_wallet.clone(), + routing_node_1_rate_pack.clone(), ), ExpectedService::Routing( - PublicKey::new(&[2]), - route_2_earning_wallet.clone(), - routing_node_2_rate_pack, + routing_node_2_public_key.clone(), + routing_node_2_earning_wallet.clone(), + routing_node_2_rate_pack.clone(), ), ExpectedService::Exit( - PublicKey::new(&[3]), - exit_earning_wallet.clone(), - exit_node_rate_pack, + exit_node_public_key.clone(), + exit_node_earning_wallet.clone(), + exit_node_rate_pack.clone(), ), ], vec![ ExpectedService::Exit( - PublicKey::new(&[3]), - make_wallet("some wallet 1"), - rate_pack(104), + exit_node_public_key.clone(), + exit_node_earning_wallet.clone(), + exit_node_rate_pack, ), ExpectedService::Routing( - PublicKey::new(&[2]), - make_wallet("some wallet 2"), - rate_pack(105), + routing_node_2_public_key.clone(), + routing_node_2_earning_wallet.clone(), + routing_node_2_rate_pack, ), ExpectedService::Routing( - PublicKey::new(&[1]), - make_wallet("some wallet 3"), - rate_pack(106), + routing_node_1_public_key.clone(), + routing_node_1_earning_wallet.clone(), + routing_node_1_rate_pack, ), ExpectedService::Nothing, ], @@ -3019,6 +3063,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 4444, client_addr: source_addr, timestamp: now, is_decentralized: true, @@ -3038,8 +3083,31 @@ mod tests { System::current().stop(); system.run(); let recording = hopper_recording_arc.lock().unwrap(); - let record = recording.get_record::(0); + let mut record = recording.get_record::(0).clone(); let payload_enc_length = record.payload.len(); + let _ = record.route.shift(cryptde); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_1_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_2_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &exit_node_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_2_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(&CryptDENull::from( + &routing_node_1_public_key, + TEST_DEFAULT_CHAIN, + )); + let _ = record.route.shift(cryptde); + assert_eq!(record.route.return_route_id(cryptde).unwrap(), 4444); let recording = accountant_recording_arc.lock().unwrap(); let record = recording.get_record::(0); assert_eq!(recording.len(), 1); @@ -3048,7 +3116,7 @@ mod tests { &ReportServicesConsumedMessage { timestamp: now, exit: ExitServiceConsumed { - earning_wallet: exit_earning_wallet, + earning_wallet: exit_node_earning_wallet, payload_size: exit_payload_size, service_rate: exit_node_rate_pack.exit_service_rate, byte_rate: exit_node_rate_pack.exit_byte_rate @@ -3056,12 +3124,12 @@ mod tests { routing_payload_size: payload_enc_length, routing: vec![ RoutingServiceConsumed { - earning_wallet: route_1_earning_wallet, + earning_wallet: routing_node_1_earning_wallet, service_rate: routing_node_1_rate_pack.routing_service_rate, byte_rate: routing_node_1_rate_pack.routing_byte_rate, }, RoutingServiceConsumed { - earning_wallet: route_2_earning_wallet, + earning_wallet: routing_node_2_earning_wallet, service_rate: routing_node_2_rate_pack.routing_service_rate, byte_rate: routing_node_2_rate_pack.routing_byte_rate, } @@ -3106,6 +3174,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 3333, client_addr: source_addr, timestamp: SystemTime::now(), is_decentralized: false, @@ -3129,7 +3198,7 @@ mod tests { assert_eq!( record, &AddReturnRouteMessage { - return_route_id: 0, + return_route_id: 3333, expected_services: vec![ExpectedService::Nothing], protocol: ProxyProtocol::HTTP, hostname_opt: Some("nowhere.com".to_string()) @@ -3394,6 +3463,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 2222, client_addr: source_addr, timestamp: SystemTime::now(), is_decentralized: true, @@ -3578,7 +3648,7 @@ mod tests { data: expected_data.clone(), }; let expected_tls_request = PlainData::new(tls_request); - let route = Route { hops: vec![] }; + let route = Route { hops: vec![] }.set_return_route_id(main_cryptde, 1); let expected_payload = ClientRequestPayload_0v1 { stream_key: stream_key.clone(), sequenced_packet: SequencedPacket { @@ -3678,7 +3748,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -3762,7 +3832,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route.clone(), + route.clone().set_return_route_id(main_cryptde, 1), expected_payload.into(), &destination_key, ) @@ -5725,6 +5795,7 @@ mod tests { ) { init_test_logging(); let cryptde = main_cryptde(); + let return_route_id = 272727; let (dispatcher, _, dispatcher_recording_arc) = make_recorder(); let (accountant, _, accountant_recording_arc) = make_recorder(); let system = System::new("report_response_services_consumed_complains_and_drops_package_if_return_route_id_is_unrecognized"); @@ -5756,7 +5827,7 @@ mod tests { let expired_cores_package = ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 1234), + return_route_with_id(cryptde, return_route_id), client_response_payload, 0, ); @@ -5766,7 +5837,7 @@ mod tests { System::current().stop(); system.run(); - TestLogHandler::new().exists_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID 1234 for client response. Ignoring"); + TestLogHandler::new().exists_log_containing(format!("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID RRI{} for client response. Ignoring", return_route_id).as_str()); assert_eq!(dispatcher_recording_arc.lock().unwrap().len(), 0); assert_eq!(accountant_recording_arc.lock().unwrap().len(), 0); } @@ -5886,7 +5957,7 @@ mod tests { ); subject_addr.try_send(expired_cores_package).unwrap(); - TestLogHandler::new().await_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID 1234 for client response. Ignoring", 1000); + TestLogHandler::new().await_log_containing("ERROR: ProxyServer: Can't report services consumed: received response with bogus return-route ID RRI1234 for client response. Ignoring", 1000); } #[test] @@ -5971,8 +6042,7 @@ mod tests { Some(make_paying_wallet(b"consuming")), Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap() - .set_return_route_id(main_cryptde(), 1234); + .unwrap(); let affected_expected_services = vec![ExpectedService::Exit( affected_cryptde.public_key().clone(), make_paying_wallet(b"1234"), @@ -5982,10 +6052,7 @@ mod tests { affected_stream_key, RouteQueryResponse { route: affected_route.clone(), - expected_services: ExpectedServices::RoundTrip( - affected_expected_services, - vec![], - ), + expected_services: ExpectedServices::RoundTrip(affected_expected_services, vec![]), }, ); subject @@ -6018,7 +6085,10 @@ mod tests { system.run(); let recording = hopper_recording_arc.lock().unwrap(); let record = recording.get_record::(0); - assert_eq!(record.route, affected_route); + assert_eq!( + record.route, + affected_route.set_return_route_id(main_cryptde(), 1) + ); let payload = decodex::(&affected_cryptde, &record.payload).unwrap(); match payload { MessageType::ClientRequest(vd) => assert_eq!( @@ -6082,6 +6152,7 @@ mod tests { expected_services: ExpectedServices::RoundTrip(vec![], vec![]), }, ); + subject.next_return_route_id = Cell::new(1234); let affected_route = Route::round_trip( RouteSegment::new( vec![main_cryptde().public_key(), affected_cryptde.public_key()], @@ -6095,8 +6166,7 @@ mod tests { Some(make_paying_wallet(b"consuming")), Some(TEST_DEFAULT_CHAIN.rec().contract), ) - .unwrap() - .set_return_route_id(main_cryptde(), 1234); + .unwrap(); let affected_expected_services = vec![ExpectedService::Exit( affected_cryptde.public_key().clone(), make_paying_wallet(b"1234"), @@ -6106,10 +6176,7 @@ mod tests { affected_stream_key, RouteQueryResponse { route: affected_route.clone(), - expected_services: ExpectedServices::RoundTrip( - affected_expected_services, - vec![], - ), + expected_services: ExpectedServices::RoundTrip(affected_expected_services, vec![]), }, ); subject.logger = Logger::new(test_name); @@ -6137,7 +6204,10 @@ mod tests { system.run(); let recording = hopper_recording_arc.lock().unwrap(); let record = recording.get_record::(0); - assert_eq!(record.route, affected_route); + assert_eq!( + record.route, + affected_route.set_return_route_id(main_cryptde(), 1234) + ); let payload = decodex::(&affected_cryptde, &record.payload).unwrap(); match payload { MessageType::ClientRequest(vd) => assert_eq!( @@ -6286,6 +6356,7 @@ mod tests { let args = TransmitToHopperArgs { main_cryptde: cryptde, payload, + return_route_id: 8888, client_addr: SocketAddr::from_str("1.2.3.4:1234").unwrap(), timestamp: SystemTime::now(), is_decentralized: false, @@ -6698,6 +6769,24 @@ mod tests { ); } + #[test] + fn get_next_return_route_id_wraps_around() { + let mut mut_subject = + ProxyServer::new(main_cryptde(), alias_cryptde(), true, None, false, false); + mut_subject.next_return_route_id = Cell::new(0xFFFFFFFE); + let subject = mut_subject; + + let end_minus_one = subject.get_next_return_route_id(); + let end = subject.get_next_return_route_id(); + let beginning = subject.get_next_return_route_id(); + let beginning_plus_one = subject.get_next_return_route_id(); + + assert_eq!(end_minus_one, 0xFFFFFFFE); + assert_eq!(end, 0xFFFFFFFF); + assert_eq!(beginning, 0x00000000); + assert_eq!(beginning_plus_one, 0x00000001); + } + fn make_exit_service_from_key(public_key: PublicKey) -> ExpectedService { ExpectedService::Exit(public_key, make_wallet("exit wallet"), rate_pack(100)) } diff --git a/node/src/sub_lib/route.rs b/node/src/sub_lib/route.rs index 83ebc03d8..a9314d29e 100644 --- a/node/src/sub_lib/route.rs +++ b/node/src/sub_lib/route.rs @@ -53,7 +53,7 @@ impl Route { pub fn round_trip( route_segment_over: RouteSegment, route_segment_back: RouteSegment, - cryptde: &dyn CryptDE, // Must be the CryptDE of the originating Node: used to encrypt return_route_id. + cryptde: &dyn CryptDE, // Doesn't matter which CryptDE: only used for encoding. consuming_wallet: Option, contract_address: Option
, ) -> Result { @@ -174,11 +174,7 @@ impl Route { contract_address, ); - Route::hops_to_route( - hops[0..].to_vec(), - &over.keys[0], - cryptde, - ) + Route::hops_to_route(hops[0..].to_vec(), &over.keys[0], cryptde) } fn over_segment<'a>(