diff --git a/client_test.go b/client_test.go index bfa7b449..adccdcb2 100644 --- a/client_test.go +++ b/client_test.go @@ -38,6 +38,10 @@ type mockConn struct { ReceiveOverride map[string]func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error } +func (m *mockConn) IsLoading() bool { + return true +} + func (m *mockConn) Override(c conn) { if m.OverrideFn != nil { m.OverrideFn(c) diff --git a/cluster.go b/cluster.go index 9d8f2f66..71b1fde9 100644 --- a/cluster.go +++ b/cluster.go @@ -206,6 +206,7 @@ func (c *clusterClient) _refresh() (err error) { pending = nil groups := result.parse(c.opt.TLSConfig != nil) + conns := make(map[string]connrole, len(groups)) for master, g := range groups { conns[master] = connrole{conn: c.connFn(master, c.opt)} @@ -245,30 +246,41 @@ func (c *clusterClient) _refresh() (err error) { pslots := [16384]conn{} var rslots []conn for master, g := range groups { + + var replicaNodesToConsider []ReplicaInfo + // consider only healthy replica nodes for conn assignment to slots + for i := 1; i < len(g.nodes); i++ { + if cc, ok := conns[g.nodes[i].Addr]; ok { + if !cc.conn.IsLoading() { + replicaNodesToConsider = append(replicaNodesToConsider, g.nodes[i]) + } + } + } + replicaNodeCount := len(replicaNodesToConsider) + switch { - case c.opt.ReplicaOnly && len(g.nodes) > 1: - nodesCount := len(g.nodes) + case c.opt.ReplicaOnly && len(replicaNodesToConsider) > 0: for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { - pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn + pslots[i] = conns[replicaNodesToConsider[util.FastRand(replicaNodeCount)].Addr].conn } } case c.rOpt != nil: if len(rslots) == 0 { // lazy init rslots = make([]conn, 16384) } - if len(g.nodes) > 1 { - n := len(g.nodes) - 1 + if len(replicaNodesToConsider) > 0 { + n := len(replicaNodesToConsider) if c.opt.EnableReplicaAZInfo { var wg sync.WaitGroup - for i := 1; i <= n; i += 4 { // batch AZ() for every 4 connections + for i := 0; i <= n; i += 4 { // batch AZ() for every 4 connections for j := i; j <= i+4 && j <= n; j++ { wg.Add(1) go func(wg *sync.WaitGroup, conn conn, info *ReplicaInfo) { info.AZ = conn.AZ() wg.Done() - }(&wg, conns[g.nodes[j].Addr].conn, &g.nodes[j]) + }(&wg, conns[replicaNodesToConsider[j].Addr].conn, &replicaNodesToConsider[j]) } wg.Wait() } @@ -277,9 +289,9 @@ func (c *clusterClient) _refresh() (err error) { for _, slot := range g.slots { for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ { pslots[i] = conns[master].conn - rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:]) + rIndex := c.opt.ReplicaSelector(uint16(i), replicaNodesToConsider) if rIndex >= 0 && rIndex < n { - rslots[i] = conns[g.nodes[1+rIndex].Addr].conn + rslots[i] = conns[replicaNodesToConsider[rIndex].Addr].conn } else { rslots[i] = conns[master].conn } @@ -524,6 +536,10 @@ process: resp = results.s[1] resultsp.Put(results) goto process + case RedirectLoadingRetry: + + c.refresh(ctx) // on-demand refresh + fallthrough case RedirectRetry: if c.retry && cmd.IsReadOnly() { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) @@ -1229,8 +1245,10 @@ func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr mode = RedirectMove } else if addr, ok = err.IsAsk(); ok { mode = RedirectAsk - } else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() { + } else if err.IsClusterDown() || err.IsTryAgain() { mode = RedirectRetry + } else if err.IsLoading() { + mode = RedirectLoadingRetry } } else if ctx.Err() == nil { mode = RedirectRetry @@ -1447,6 +1465,7 @@ const ( RedirectMove RedirectAsk RedirectRetry + RedirectLoadingRetry panicMsgCxSlot = "cross slot command in Dedicated is prohibited" panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited" diff --git a/internal/cmds/cmds.go b/internal/cmds/cmds.go index b36a5fd7..1fbbb3c0 100644 --- a/internal/cmds/cmds.go +++ b/internal/cmds/cmds.go @@ -61,6 +61,11 @@ var ( PingCmd = Completed{ cs: newCommandSlice([]string{"PING"}), } + // InfoCmd is predefined INFO + InfoPersistenceCmd = Completed{ + cs: newCommandSlice([]string{"INFO", "Persistence"}), + } + // SlotCmd is predefined CLUSTER SLOTS SlotCmd = Completed{ cs: newCommandSlice([]string{"CLUSTER", "SLOTS"}), diff --git a/mux.go b/mux.go index bcebaf94..c4dcefd3 100644 --- a/mux.go +++ b/mux.go @@ -4,6 +4,7 @@ import ( "context" "net" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -13,6 +14,10 @@ import ( "github.com/valkey-io/valkey-go/internal/util" ) +const ( + loadingEtaSecondsKey = "loading_eta_seconds:" +) + type connFn func(dst string, opt *ClientOption) conn type dialFn func(ctx context.Context, dst string, opt *ClientOption) (net.Conn, error) type wireFn func(ctx context.Context) wire @@ -43,6 +48,7 @@ type conn interface { Addr() string SetOnCloseHook(func(error)) OptInCmd() cmds.Completed + IsLoading() bool } var _ conn = (*mux)(nil) @@ -61,8 +67,9 @@ type mux struct { maxp int maxm int - usePool bool - optIn bool + usePool bool + optIn bool + nodeLoadingStatus atomic.Uint32 } func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux { @@ -267,6 +274,10 @@ func (m *mux) blocking(pool *pool, ctx context.Context, cmd Completed) (resp Val if resp.NonValkeyError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded) wire.Close() } + // check loading status of the node + if !m.IsLoading() && isLoadingError(resp.Error()) { + m.setLoadingStatus(ctx) + } pool.Store(wire) return resp } @@ -279,6 +290,10 @@ func (m *mux) blockingMulti(pool *pool, ctx context.Context, cmd []Completed) (r wire.Close() break } + // check loading status of the node + if !m.IsLoading() && isLoadingError(res.Error()) { + m.setLoadingStatus(ctx) + } } pool.Store(wire) return resp @@ -290,6 +305,10 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp ValkeyResult) { if resp = wire.Do(ctx, cmd); isBroken(resp.NonValkeyError(), wire) { m.wire[slot].CompareAndSwap(wire, m.init) } + // check loading status of the node + if !m.IsLoading() && isLoadingError(resp.Error()) { + m.setLoadingStatus(ctx) + } return resp } @@ -302,6 +321,10 @@ func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *valkeyr m.wire[slot].CompareAndSwap(wire, m.init) return resp } + // check loading status of the node + if !m.IsLoading() && isLoadingError(r.Error()) { + m.setLoadingStatus(ctx) + } } return resp } @@ -313,6 +336,10 @@ func (m *mux) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Val if isBroken(resp.NonValkeyError(), wire) { m.wire[slot].CompareAndSwap(wire, m.init) } + // check loading status of the node + if !m.IsLoading() && isLoadingError(resp.Error()) { + m.setLoadingStatus(ctx) + } return resp } @@ -373,6 +400,10 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT m.wire[slot].CompareAndSwap(wire, m.init) return resps } + // check loading status of the node + if !m.IsLoading() && isLoadingError(r.Error()) { + m.setLoadingStatus(ctx) + } } return resps } @@ -411,10 +442,53 @@ func (m *mux) Addr() string { return m.dst } +func (m *mux) setLoadingStatus(ctx context.Context) { + w := m.pipe(ctx, 0) + res := w.Do(ctx, cmds.InfoPersistenceCmd) + r := res.String() + loadingEtaIdx := strings.Index(r, loadingEtaSecondsKey) + if loadingEtaIdx > 0 { + eta, _ := strconv.Atoi(string(r[loadingEtaIdx+len(loadingEtaSecondsKey)])) + if eta > 0 { + // this sets when the loading status will be expired + etaTime := time.Now().Add(time.Duration(eta) * time.Second).Unix() + m.nodeLoadingStatus.CompareAndSwap(0, uint32(etaTime)) + + } + } + +} + +func (m *mux) IsLoading() bool { + + status := m.nodeLoadingStatus.Load() + if status == 0 { + return false + } + + if time.Now().Unix() < int64(status) { + return true + } + + m.nodeLoadingStatus.Store(0) // reset the status if expired + + return false +} + func isBroken(err error, w wire) bool { return err != nil && err != ErrClosing && w.Error() != nil } +func isLoadingError(err error) bool { + if err == nil { + return false + } + if verr, ok := err.(*ValkeyError); ok { + return verr.IsLoading() + } + return false +} + func slotfn(n int, ks uint16, noreply bool) uint16 { if n == 1 || ks == cmds.NoSlot || noreply { return 0 diff --git a/valkey.go b/valkey.go index 720a42bd..82f23f06 100644 --- a/valkey.go +++ b/valkey.go @@ -239,6 +239,10 @@ type ClientOption struct { // EnableReplicaAZInfo enables the client to load the replica node's availability zone. // If true, the client will set the `AZ` field in `ReplicaInfo`. EnableReplicaAZInfo bool + + // The interval to mark a node as unhealty during temporary degraded state, defaults to 15 seconds + // Note: Only used for cluster client. + UnHealthyNodeInterval time.Duration } // SentinelOption contains MasterSet, @@ -442,6 +446,9 @@ func NewClient(option ClientOption) (client Client, err error) { if option.RetryDelay == nil { option.RetryDelay = defaultRetryDelayFn } + if option.UnHealthyNodeInterval == 0 { + option.UnHealthyNodeInterval = 15 * time.Second // default to 15 seconds + } if option.Sentinel.MasterSet != "" { option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex) return newSentinelClient(&option, makeConn, newRetryer(option.RetryDelay))