@@ -747,28 +747,50 @@ fn spawn_channel_adapter(
747
747
// for previous clients.
748
748
let mut epoch = 0 ;
749
749
750
+ // It's possible that we receive responses with epochs from the future: Worker 0 might
751
+ // have increased its epoch before us and broadcasted it to our Timely cluster. When we
752
+ // receive a response with a future epoch, we need to wait with forwarding it until we
753
+ // have increased our own epoch sufficiently (by observing new client connections). We
754
+ // need to stash the response in the meantime.
755
+ let mut stashed_response = None ;
756
+
750
757
while let Ok ( ( command_rx, response_tx) ) = client_rx. recv ( ) {
751
758
epoch += 1 ;
752
759
753
- // Serve this connection until we see any of the channels disconnect .
754
- loop {
760
+ // Wait for a new response while forwarding received commands .
761
+ let serve_rx_channels = || loop {
755
762
crossbeam_channel:: select! {
756
763
recv( command_rx) -> msg => match msg {
757
764
Ok ( cmd) => command_tx. send( ( cmd, epoch) ) ,
758
- Err ( _) => break ,
765
+ Err ( _) => return Err ( ( ) ) ,
759
766
} ,
760
767
recv( response_rx) -> msg => {
761
- let ( resp, resp_epoch) = msg. expect( "worker connected" ) ;
762
-
763
- if resp_epoch < epoch {
764
- continue ; // response for a previous connection
765
- } else if resp_epoch > epoch {
766
- panic!( "epoch from the future: {resp_epoch} > {epoch}" ) ;
767
- }
768
+ return Ok ( msg. expect( "worker connected" ) ) ;
769
+ }
770
+ }
771
+ } ;
768
772
769
- if response_tx. send( resp) . is_err( ) {
770
- break ;
771
- }
773
+ // Serve this connection until we see any of the channels disconnect.
774
+ loop {
775
+ let ( resp, resp_epoch) = match stashed_response. take ( ) {
776
+ Some ( stashed) => stashed,
777
+ None => match serve_rx_channels ( ) {
778
+ Ok ( response) => response,
779
+ Err ( ( ) ) => break ,
780
+ } ,
781
+ } ;
782
+
783
+ if resp_epoch < epoch {
784
+ // Response for a previous connection; discard it.
785
+ continue ;
786
+ } else if resp_epoch > epoch {
787
+ // Response for a future connection; stash it and reconnect.
788
+ stashed_response = Some ( ( resp, resp_epoch) ) ;
789
+ break ;
790
+ } else {
791
+ // Response for the current connection; forward it.
792
+ if response_tx. send ( resp) . is_err ( ) {
793
+ break ;
772
794
}
773
795
}
774
796
}
0 commit comments