Skip to content

Commit a0c09be

Browse files
committed
Simplify connection: fetch routing table at the first usage
1 parent 58d0e0e commit a0c09be

File tree

5 files changed

+38
-113
lines changed

5 files changed

+38
-113
lines changed

lib/src/graph.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
22
use {
3-
crate::connection::{Connection, ConnectionInfo, Routing},
3+
crate::connection::{ConnectionInfo, Routing},
44
crate::graph::ConnectionPoolManager::Routed,
5-
crate::routing::{RoundRobinStrategy, RouteBuilder, RoutedConnectionManager},
6-
log::info,
5+
crate::routing::{RoundRobinStrategy, RoutedConnectionManager},
76
std::sync::Arc,
87
};
98

@@ -75,21 +74,10 @@ impl Graph {
7574
&config.tls_config,
7675
)?;
7776
if matches!(info.routing, Routing::Yes(_)) {
78-
let mut connection = Connection::new(&info).await?;
79-
let mut builder = RouteBuilder::new(info.routing, vec![]);
80-
if let Some(db) = config.db.clone() {
81-
builder = builder.with_db(db);
82-
}
83-
let rt = connection
84-
.route(builder.build(connection.version()))
85-
.await?;
86-
connection.reset().await?;
87-
info!("Connected to routing server, routing table: {:?}", rt);
8877
let pool = Routed(
8978
RoutedConnectionManager::new(
9079
&config,
91-
Arc::new(rt.clone()),
92-
Arc::new(RoundRobinStrategy::new(&rt.resolve())),
80+
Arc::new(RoundRobinStrategy::default()),
9381
)
9482
.await?,
9583
);

lib/src/routing/connection_registry.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use dashmap::DashMap;
66
use futures::lock::Mutex;
77
use log::debug;
88
use std::sync::Arc;
9+
use std::sync::atomic::{AtomicU64, Ordering};
910

1011
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1112
pub(crate) struct BoltServer {
@@ -40,19 +41,15 @@ pub type Registry = DashMap<BoltServer, ConnectionPool>;
4041
pub(crate) struct ConnectionRegistry {
4142
config: Config,
4243
creation_time: Arc<Mutex<u64>>,
43-
ttl: u64,
44+
ttl: Arc<AtomicU64>,
4445
pub(crate) connections: Registry,
4546
}
4647

4748
impl ConnectionRegistry {
4849
pub(crate) async fn new(
4950
config: &Config,
50-
routing_table: Arc<RoutingTable>,
5151
) -> Result<Self, Error> {
52-
let ttl = routing_table.ttl;
53-
let servers = routing_table.resolve();
54-
55-
let connections = Self::build_registry(config, &servers).await?;
52+
let connections = Self::build_registry(config, &[]).await?;
5653
Ok(ConnectionRegistry {
5754
config: config.clone(),
5855
creation_time: Arc::new(Mutex::new(
@@ -61,7 +58,7 @@ impl ConnectionRegistry {
6158
.unwrap()
6259
.as_secs(),
6360
)),
64-
ttl,
61+
ttl: Arc::new(AtomicU64::new(0)),
6562
connections,
6663
})
6764
}
@@ -89,8 +86,8 @@ impl ConnectionRegistry {
8986
.as_secs();
9087
debug!("Checking if routing table is expired...");
9188
if let Some(mut guard) = self.creation_time.try_lock() {
92-
if now - *guard > self.ttl {
93-
debug!("Routing table expired, refreshing...");
89+
if self.connections.is_empty() || now - *guard > self.ttl.load(Ordering::Relaxed) {
90+
debug!("Routing table expired or empty, refreshing...");
9491
let routing_table = f().await?;
9592
debug!("Routing table refreshed: {:?}", routing_table);
9693
let registry = &self.connections;
@@ -102,7 +99,12 @@ impl ConnectionRegistry {
10299
registry.insert(server.clone(), create_pool(&self.config).await?);
103100
}
104101
registry.retain(|k, _| servers.contains(k));
105-
debug!("Registry updated. New size is {}", registry.len());
102+
let _ = self.ttl.fetch_update(
103+
Ordering::Relaxed,
104+
Ordering::Relaxed,
105+
|_ttl| Some(routing_table.ttl),
106+
).unwrap();
107+
debug!("Registry updated. New size is {} with TTL {}s", registry.len(), routing_table.ttl);
106108
*guard = now;
107109
}
108110
} else {
@@ -177,16 +179,14 @@ mod tests {
177179
fetch_size: 0,
178180
tls_config: ConnectionTLSConfig::None,
179181
};
180-
let registry = ConnectionRegistry::new(&config, Arc::new(cluster_routing_table.clone()))
182+
let registry = ConnectionRegistry::new(&config)
183+
.await
184+
.unwrap();
185+
registry.update_if_expired(|| async { Ok(cluster_routing_table) })
181186
.await
182187
.unwrap();
183188
assert_eq!(registry.connections.len(), 5);
184-
let strategy = RoundRobinStrategy::new(&cluster_routing_table.resolve());
185-
let router = strategy.select_router(&registry.servers()).unwrap();
186-
assert_eq!(
187-
format!("{}:{}", router.address, router.port),
188-
routers[0].addresses[0]
189-
);
189+
let strategy = RoundRobinStrategy::default();
190190
registry.mark_unavailable(BoltServer::resolve(&writers[0]).first().unwrap());
191191
assert_eq!(registry.connections.len(), 4);
192192
let writer = strategy.select_writer(&registry.servers()).unwrap();

lib/src/routing/load_balancing/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,4 @@ use crate::routing::connection_registry::BoltServer;
55
pub trait LoadBalancingStrategy: Sync + Send {
66
fn select_reader(&self, servers: &[BoltServer]) -> Option<BoltServer>;
77
fn select_writer(&self, servers: &[BoltServer]) -> Option<BoltServer>;
8-
fn select_router(&self, servers: &[BoltServer]) -> Option<BoltServer>;
98
}

lib/src/routing/load_balancing/round_robin_strategy.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,13 @@ use crate::routing::connection_registry::BoltServer;
22
use crate::routing::load_balancing::LoadBalancingStrategy;
33
use std::sync::atomic::AtomicUsize;
44

5+
#[derive(Default)]
56
pub struct RoundRobinStrategy {
67
reader_index: AtomicUsize,
78
writer_index: AtomicUsize,
8-
router_index: AtomicUsize,
99
}
1010

1111
impl RoundRobinStrategy {
12-
pub(crate) fn new(servers: &[BoltServer]) -> Self {
13-
let readers: Vec<BoltServer> = servers
14-
.iter()
15-
.filter(|s| s.role == "READ")
16-
.cloned()
17-
.collect();
18-
let writers: Vec<BoltServer> = servers
19-
.iter()
20-
.filter(|s| s.role == "WRITE")
21-
.cloned()
22-
.collect();
23-
let routers: Vec<BoltServer> = servers
24-
.iter()
25-
.filter(|s| s.role == "ROUTE")
26-
.cloned()
27-
.collect();
28-
let reader_index = AtomicUsize::new(readers.len());
29-
let writer_index = AtomicUsize::new(writers.len());
30-
let router_index = AtomicUsize::new(routers.len());
31-
RoundRobinStrategy {
32-
reader_index,
33-
writer_index,
34-
router_index,
35-
}
36-
}
37-
3812
fn select(servers: &[BoltServer], index: &AtomicUsize) -> Option<BoltServer> {
3913
if servers.is_empty() {
4014
return None;
@@ -77,15 +51,6 @@ impl LoadBalancingStrategy for RoundRobinStrategy {
7751
.collect::<Vec<BoltServer>>();
7852
Self::select(&writers, &self.writer_index)
7953
}
80-
81-
fn select_router(&self, servers: &[BoltServer]) -> Option<BoltServer> {
82-
let routers = servers
83-
.iter()
84-
.filter(|s| s.role == "ROUTE")
85-
.cloned()
86-
.collect::<Vec<BoltServer>>();
87-
Self::select(&routers, &self.router_index)
88-
}
8954
}
9055

9156
#[cfg(test)]
@@ -123,7 +88,7 @@ mod tests {
12388
};
12489
let all_servers = cluster_routing_table.resolve();
12590
assert_eq!(all_servers.len(), 4);
126-
let strategy = RoundRobinStrategy::new(&cluster_routing_table.resolve());
91+
let strategy = RoundRobinStrategy::default();
12792

12893
let reader = strategy.select_reader(&all_servers).unwrap();
12994
assert_eq!(

lib/src/routing/routed_connection_manager.rs

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ use std::sync::Arc;
99
use std::time::Duration;
1010
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
1111
use {
12-
crate::connection::Routing,
1312
crate::routing::{RouteBuilder, RoutingTable},
1413
};
14+
use crate::connection::{Connection, ConnectionInfo};
1515

1616
#[derive(Clone)]
1717
pub struct RoutedConnectionManager {
1818
load_balancing_strategy: Arc<dyn LoadBalancingStrategy>,
1919
registry: Arc<ConnectionRegistry>,
20+
#[allow(dead_code)]
2021
bookmarks: Arc<Mutex<Vec<String>>>,
2122
backoff: Arc<ExponentialBackoff>,
2223
config: Config,
@@ -25,10 +26,9 @@ pub struct RoutedConnectionManager {
2526
impl RoutedConnectionManager {
2627
pub async fn new(
2728
config: &Config,
28-
routing_table: Arc<RoutingTable>,
2929
load_balancing_strategy: Arc<dyn LoadBalancingStrategy>,
3030
) -> Result<Self, Error> {
31-
let registry = Arc::new(ConnectionRegistry::new(config, routing_table.clone()).await?);
31+
let registry = Arc::new(ConnectionRegistry::new(config).await?);
3232
let backoff = Arc::new(
3333
ExponentialBackoffBuilder::new()
3434
.with_initial_interval(Duration::from_millis(1))
@@ -48,43 +48,20 @@ impl RoutedConnectionManager {
4848
}
4949

5050
pub async fn refresh_routing_table(&self) -> Result<RoutingTable, Error> {
51-
while let Some(router) = self.select_router() {
52-
if let Some(pool) = self.registry.get_pool(&router) {
53-
if let Ok(mut connection) = pool.get().await {
54-
debug!("Refreshing routing table from router {}", router.address);
55-
let bookmarks = self.bookmarks.lock().await;
56-
let bookmarks = bookmarks.iter().map(|b| b.as_str()).collect();
57-
let route = RouteBuilder::new(Routing::Yes(vec![]), bookmarks)
58-
.with_db(self.config.db.clone().unwrap_or_default())
59-
.build(connection.version());
60-
match connection.route(route).await {
61-
Ok(rt) => {
62-
debug!("Routing table refreshed: {:?}", rt);
63-
return Ok(rt);
64-
}
65-
Err(e) => {
66-
self.registry.mark_unavailable(&router);
67-
error!(
68-
"Failed to refresh routing table from router {}: {}",
69-
router.address, e
70-
);
71-
}
72-
}
73-
} else {
74-
self.registry.mark_unavailable(&router);
75-
error!("Failed to create connection to router `{}`", router.address);
76-
}
77-
} else {
78-
error!(
79-
"No connection manager available for router `{}` in the registry. Maybe it was marked as unavailable",
80-
router.address
81-
);
82-
}
51+
let info = ConnectionInfo::new(
52+
&self.config.uri,
53+
&self.config.user,
54+
&self.config.password,
55+
&self.config.tls_config,
56+
)?;
57+
let mut connection = Connection::new(&info).await?;
58+
let mut builder = RouteBuilder::new(info.routing, vec![]);
59+
if let Some(db) = self.config.db.clone() {
60+
builder = builder.with_db(db);
8361
}
84-
// After trying all routers, we still couldn't refresh the routing table: return an error
85-
Err(Error::ServerUnavailableError(
86-
"No router available".to_string(),
87-
))
62+
let rt = connection.route(builder.build(connection.version())).await?;
63+
debug!("Fetched a new routing table: {:?}", rt);
64+
Ok(rt)
8865
}
8966

9067
pub(crate) async fn get(
@@ -143,8 +120,4 @@ impl RoutedConnectionManager {
143120
fn select_writer(&self) -> Option<BoltServer> {
144121
self.load_balancing_strategy.select_writer(&self.registry.servers())
145122
}
146-
147-
fn select_router(&self) -> Option<BoltServer> {
148-
self.load_balancing_strategy.select_router(&self.registry.servers())
149-
}
150123
}

0 commit comments

Comments
 (0)