@@ -67,7 +67,7 @@ impl RaftNode {
6767 & mut self ,
6868 sender : Sender < RaftEvent > ,
6969 cancel : CancellationToken ,
70- ) -> Result < ( ) , Status > {
70+ ) -> Result < ( ) , RaftConnectionError > {
7171 // Start the gRPC server
7272 let id = self . config . candidate_id . clone ( ) ;
7373 tokio:: spawn ( async move {
@@ -81,10 +81,15 @@ impl RaftNode {
8181
8282 // Connect to all other nodes
8383 let config = self . config . clone ( ) ;
84- let raft_clients = self . raft_clients . clone ( ) ;
85- connect_to_raft_servers ( config, raft_clients) ;
84+ let clients = connect_to_raft_servers ( config) . await ?;
85+ {
86+ let mut lk = self . raft_clients . clone ( ) ;
87+ let mut raft_clients = lk. lock ( ) . unwrap ( ) ;
88+ raft_clients. extend ( clients) ;
89+ }
8690
8791 // Main loop
92+ println ! ( "Starting raft main loop" ) ;
8893 loop {
8994 if cancel. is_cancelled ( ) {
9095 break ;
@@ -178,41 +183,58 @@ impl State {
178183 }
179184}
180185
181- fn connect_to_raft_servers ( config : RaftConfig , raft_clients : RaftClients ) {
182- tokio:: spawn ( async move {
183- let Some ( roster) = config. roster else {
184- return ;
185- } ;
186+ #[ derive( Debug ) ]
187+ pub enum RaftConnectionError {
188+ EmptyRoster ,
189+ MaxAttemptsReached ( String ) ,
190+ JoinError ,
191+ }
186192
187- for node in roster {
188- // skip the current node
189- if node == config. candidate_id {
190- continue ;
191- }
193+ async fn connect_to_raft_servers (
194+ config : RaftConfig ,
195+ ) -> Result <
196+ HashMap < String , raft:: raft_client:: RaftClient < tonic:: transport:: channel:: Channel > > ,
197+ RaftConnectionError ,
198+ > {
199+ let Some ( roster) = config. roster else {
200+ return Err ( RaftConnectionError :: EmptyRoster ) ;
201+ } ;
202+
203+ let mut tasks = tokio:: task:: JoinSet :: new ( ) ;
204+ for node in roster {
205+ if node == config. candidate_id {
206+ continue ;
207+ }
208+ tasks. spawn ( async move { ( node. clone ( ) , connect_to_raft_server ( node) . await ) } ) ;
209+ }
192210
193- let nameport = node. split ( ':' ) . collect :: < Vec < & str > > ( ) ;
194- let ip = format ! ( "http://{}:{}" , nameport[ 0 ] , nameport[ 1 ] ) ;
195- info ! ( "Connecting to {ip}" ) ;
196-
197- // try to connect to the node
198- // if it fails, the node is not up yet
199- // so we will try again in the next iteration
200- let raft_clients_clone = raft_clients. clone ( ) ;
201- tokio:: spawn ( async move {
202- loop {
203- let raft_client = raft:: raft_client:: RaftClient :: connect ( ip. clone ( ) ) . await ;
204- if let Ok ( raft_client) = raft_client {
205- {
206- let mut raft_clients = raft_clients_clone. lock ( ) . unwrap ( ) ;
207- raft_clients. insert ( node, raft_client) ;
208- }
209- break ;
210- }
211- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
212- }
213- } ) ;
211+ let mut clients = HashMap :: new ( ) ;
212+ for ( node, client) in tasks. join_all ( ) . await {
213+ clients. insert ( node, client?) ;
214+ }
215+ Ok ( clients)
216+ }
217+
218+ pub async fn connect_to_raft_server (
219+ node : String ,
220+ ) -> Result < raft:: raft_client:: RaftClient < tonic:: transport:: Channel > , RaftConnectionError > {
221+ // try to connect to the node
222+ // if it fails, the node is not up yet
223+ // so we will try again in the next iteration after a backoff period
224+ let mut attempt = 0 ;
225+ let max_attempts = 6 ;
226+ while attempt < max_attempts {
227+ println ! ( "Connecting to {node}" ) ;
228+ match raft:: raft_client:: RaftClient :: connect ( format ! ( "http://{node}" ) ) . await {
229+ Ok ( raft_client) => {
230+ return Ok ( raft_client) ;
231+ }
232+ Err ( e) => println ! ( "Error connecting to {node}: {e}" ) ,
214233 }
215- } ) ;
234+ tokio:: time:: sleep ( Duration :: from_secs ( 2 ^ attempt) ) . await ;
235+ attempt += 1 ;
236+ }
237+ Err ( RaftConnectionError :: MaxAttemptsReached ( node) )
216238}
217239
218240/// Start raft gRPC server
0 commit comments