From d66fc3a1efa1dfb33dfedf9760528f1ac2b923b6 Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:59:48 -0400 Subject: [PATCH] balancer/endpointsharding: Call ExitIdle() on child if child reports IDLE (#7782) --- balancer/endpointsharding/endpointsharding.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/balancer/endpointsharding/endpointsharding.go b/balancer/endpointsharding/endpointsharding.go index 9238d3278204..b5b92143194b 100644 --- a/balancer/endpointsharding/endpointsharding.go +++ b/balancer/endpointsharding/endpointsharding.go @@ -66,7 +66,9 @@ type endpointSharding struct { cc balancer.ClientConn bOpts balancer.BuildOptions + childMu sync.Mutex // syncs balancer.Balancer calls into children children atomic.Pointer[resolver.EndpointMap] + closed bool // inhibitChildUpdates is set during UpdateClientConnState/ResolverError // calls (calls to children will each produce an update, only want one @@ -83,6 +85,9 @@ type endpointSharding struct { // addresses it will ignore that endpoint. Otherwise, returns first error found // from a child, but fully processes the new update. func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error { + es.childMu.Lock() + defer es.childMu.Unlock() + es.inhibitChildUpdates.Store(true) defer func() { es.inhibitChildUpdates.Store(false) @@ -145,6 +150,8 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState // children and sends a single synchronous update of the childStates at the end // of the ResolverError operation. func (es *endpointSharding) ResolverError(err error) { + es.childMu.Lock() + defer es.childMu.Unlock() es.inhibitChildUpdates.Store(true) defer func() { es.inhibitChildUpdates.Store(false) @@ -162,11 +169,14 @@ func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubCon } func (es *endpointSharding) Close() { + es.childMu.Lock() + defer es.childMu.Unlock() children := es.children.Load() for _, child := range children.Values() { bal := child.(balancer.Balancer) bal.Close() } + es.closed = true } // updateState updates this component's state. It sends the aggregated state, @@ -274,6 +284,17 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) { bw.es.mu.Lock() bw.childState.State = state bw.es.mu.Unlock() + // When a child balancer says it's IDLE, ping it to exit idle and reconnect. + // TODO: In the future, perhaps make this a knob in configuration. + if ei, ok := bw.Balancer.(balancer.ExitIdler); state.ConnectivityState == connectivity.Idle && ok { + go func() { + bw.es.childMu.Lock() + if !bw.es.closed { + ei.ExitIdle() + } + bw.es.childMu.Unlock() + }() + } bw.es.updateState() }