This repository was archived by the owner on Feb 18, 2021. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +54
-1
lines changed Expand file tree Collapse file tree 1 file changed +54
-1
lines changed Original file line number Diff line number Diff line change @@ -494,11 +494,64 @@ function deletePeerIndex(serviceName, hostPort) {
494
494
495
495
ServiceDispatchHandler . prototype . ensurePeerConnected =
496
496
function ensurePeerConnected ( peer , reason ) {
497
+ var self = this ;
498
+
497
499
if ( peer . isConnected ( 'out' ) ) {
498
500
return ;
499
501
}
500
502
501
- peer . connectTo ( ) ;
503
+ peer . waitForIdentified ( peer . connectTo ( ) , onConnIded ) ;
504
+
505
+ function onConnIded ( err ) {
506
+ if ( err ) {
507
+ self . logger . warn (
508
+ 'failed to ensure outgoing connection to service peer' ,
509
+ self . extendLogInfo ( {
510
+ error : err ,
511
+ peerHostPort : peer . hostPort ,
512
+ refreshReason : reason
513
+ } ) ) ;
514
+ return ;
515
+ }
516
+ peer . drain ( {
517
+ reason : reason ,
518
+ direction : 'in' ,
519
+ timeout : self . drainTimeout
520
+ } , connectDrainDone ) ;
521
+ }
522
+
523
+ function connectDrainDone ( err ) {
524
+ if ( err &&
525
+ err . type === 'tchannel.drain.peer.timed-out' ) {
526
+ // TODO: stat?
527
+ self . logger . warn ( 'forcibly closing drained peer' , self . extendLogInfo ( {
528
+ error : err ,
529
+ drainReason : reason
530
+ } ) ) ;
531
+ err = null ;
532
+ }
533
+ if ( err ) {
534
+ self . logger . warn (
535
+ 'failed to drain incoming connections from service peer' , self . extendLogInfo ( {
536
+ error : err ,
537
+ peerHostPort : peer . hostPort
538
+ } ) ) ;
539
+ peer . clearDrain ( ) ;
540
+ return ;
541
+ }
542
+ peer . closeDrainedConnections ( connectDrainCloseDone ) ;
543
+ }
544
+
545
+ function connectDrainCloseDone ( err ) {
546
+ if ( err ) {
547
+ self . logger . warn (
548
+ 'failed to close drained incoming connections from service peer' , self . extendLogInfo ( {
549
+ error : err ,
550
+ peerHostPort : peer . hostPort
551
+ } ) ) ;
552
+ }
553
+ peer . clearDrain ( ) ;
554
+ }
502
555
} ;
503
556
504
557
ServiceDispatchHandler . prototype . computePartialRange =
You can’t perform that action at this time.
0 commit comments