Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ type mockConn struct {
ReceiveOverride map[string]func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
}

func (m *mockConn) SetServerUnHealthy() {}

func (m *mockConn) IsServerUnHealthy() bool {
return true
}

func (m *mockConn) Override(c conn) {
if m.OverrideFn != nil {
m.OverrideFn(c)
Expand Down
26 changes: 25 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,28 @@ func (c *clusterClient) _refresh() (err error) {
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)

conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt)}
if c.rOpt != nil {
for _, nodeInfo := range g.nodes[1:] {
// do not include unhealhty connections in this refresh cycle
if cc, ok := c.conns[nodeInfo.Addr]; ok {
if cc.conn.IsServerUnHealthy() {
continue
}
}
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)}
}
} else {
for _, nodeInfo := range g.nodes[1:] {
// do not include unhealhty connections in this refresh cycle
if cc, ok := c.conns[nodeInfo.Addr]; ok {
if cc.conn.IsServerUnHealthy() {
continue
}
}
Copy link
Collaborator

@rueian rueian Jun 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srikar-jilugu, thanks for the PR. I think the idea is good. A few thoughts:

  1. Skipping the conns[nodeInfo.Addr] assignments is not the correct place to exclude unhealthy connections in this refresh cycle. The actual places to skip those connections are the assignments of pslots[i] and rslots[i].
  2. Furthermore, we can't skip unhealthy connections if they are connected to the masters. What we can only do is avoid picking unhealthy connections for replicas. That is, the only two places we can skip those connections are:
    pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn

    rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
  3. I think there should not be a SetServerUnHealthy method. It should be done privately in the conn.
  4. I think there should not be an UnHealthyNodeInterval option, and 15s is too long. I guess we need a periodic active check.
  5. IsLoading could be better than IsServerUnHealthy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1,2: I believe, this change would only check the health status for the replica nodes, as we are initiating connrole for master seperately here.

conns[master] = connrole{conn: c.connFn(master, c.opt)}

Also in the case when there is only one replica and it is unhealthy, we are falling back to master while assigning connections to pslots and rslots. So this shouldn't cause a situation where we are out of conn assignments or ignore a master node for assignment.

3: Any suggestion on how we can detect that the node is unhealthy, I think, this method would allow us to extend it to other scenarios (for eg: setting it to unhealthy, when multiple retries are failing) as well in future.
4. Agreed, this need not be exposed
5. As mentioned in the point 3, I think we can use this status in some other cases in future and not just loading. wdyt?

Copy link
Collaborator

@rueian rueian Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this shouldn't cause a situation where we are out of conn assignments or ignore a master node for assignment.

Yes, but we still have a final mutation for the conn map at L236-L245. Please do the unhealthy check after that:

valkey-go/cluster.go

Lines 236 to 245 in ed9f107

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh.conn = cc.conn
conns[addr] = fresh
} else {
removes = append(removes, cc.conn)
}
}
c.mu.RUnlock()

Any suggestion on how we can detect that the node is unhealthy, I think, this method would allow us to extend it to other scenarios.

There are two ways to check if a node is loading: check the loading_eta_seconds in an INFO response, or check each response if a loading error occurs. I don't have a good idea for detecting other unhealthy scenarios. I guess whether a node is unhealthy or not even varies from user to user. Different users may want different behaviors. Could we just use IsLoading for now to avoid overloading the term Unhealthy until we come up with an API that allows users to mark a node as unhealthy and specify what to do with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rueian updated the load status handling based on your feedback, roughly

  1. triggering the load state setting from an initial loading error response from any command, and setting the timeout for this status to be equal to the eta (afaik the node might not respond to any requests during this period)
  2. checking the node load status and considering only the active nodes just before pslots and rslots are assigned

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will test this out on our workloads as well, before I make this PR open

conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)}
}
}
Expand Down Expand Up @@ -524,6 +537,11 @@ process:
resp = results.s[1]
resultsp.Put(results)
goto process
case RedirectLoadingRetry:
// mark the associated node temporarily unhealthy
cc.SetServerUnHealthy()
c.refresh(ctx) // on-demand refresh
fallthrough
case RedirectRetry:
if c.retry && cmd.IsReadOnly() {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error())
Expand Down Expand Up @@ -1229,8 +1247,13 @@ func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr
mode = RedirectMove
} else if addr, ok = err.IsAsk(); ok {
mode = RedirectAsk
} else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() {
} else if err.IsClusterDown() || err.IsTryAgain() {
mode = RedirectRetry
// if err.IsLoading() {
// c.refresh(ctx) // refresh the cluster topology if loading.
// }
} else if err.IsLoading() {
mode = RedirectLoadingRetry
}
} else if ctx.Err() == nil {
mode = RedirectRetry
Expand Down Expand Up @@ -1447,6 +1470,7 @@ const (
RedirectMove
RedirectAsk
RedirectRetry
RedirectLoadingRetry

panicMsgCxSlot = "cross slot command in Dedicated is prohibited"
panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited"
Expand Down
26 changes: 24 additions & 2 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type conn interface {
Addr() string
SetOnCloseHook(func(error))
OptInCmd() cmds.Completed
SetServerUnHealthy()
IsServerUnHealthy() bool
}

var _ conn = (*mux)(nil)
Expand All @@ -61,8 +63,10 @@ type mux struct {
maxp int
maxm int

usePool bool
optIn bool
usePool bool
optIn bool
serverUnhealthystatus atomic.Uint32
unhealthyServerTimeout time.Duration
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -107,6 +111,7 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn, wi

m.dpool = newPool(option.BlockingPoolSize, dead, option.BlockingPoolCleanup, option.BlockingPoolMinSize, wireFn)
m.spool = newPool(option.BlockingPoolSize, dead, option.BlockingPoolCleanup, option.BlockingPoolMinSize, wireNoBgFn)
m.unhealthyServerTimeout = option.UnHealthyNodeInterval
return m
}

Expand Down Expand Up @@ -411,6 +416,23 @@ func (m *mux) Addr() string {
return m.dst
}

func (m *mux) SetServerUnHealthy() {
m.serverUnhealthystatus.Store(uint32(time.Now().Unix()))
}

func (m *mux) IsServerUnHealthy() bool {

unhealthy := m.serverUnhealthystatus.Load()
if unhealthy == 0 {
return false
}
if time.Now().Unix()-int64(unhealthy) < int64(m.unhealthyServerTimeout) {
return true
}
m.serverUnhealthystatus.Store(0) // reset the status if the timeout has passed
return false
}

func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}
Expand Down
7 changes: 7 additions & 0 deletions valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ type ClientOption struct {
// EnableReplicaAZInfo enables the client to load the replica node's availability zone.
// If true, the client will set the `AZ` field in `ReplicaInfo`.
EnableReplicaAZInfo bool

// The interval to mark a node as unhealty during temporary degraded state, defaults to 15 seconds
// Note: Only used for cluster client.
UnHealthyNodeInterval time.Duration
}

// SentinelOption contains MasterSet,
Expand Down Expand Up @@ -442,6 +446,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.RetryDelay == nil {
option.RetryDelay = defaultRetryDelayFn
}
if option.UnHealthyNodeInterval == 0 {
option.UnHealthyNodeInterval = 15 * time.Second // default to 15 seconds
}
if option.Sentinel.MasterSet != "" {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newSentinelClient(&option, makeConn, newRetryer(option.RetryDelay))
Expand Down
Loading