Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type mockConn struct {
ReceiveOverride map[string]func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
}

func (m *mockConn) IsLoading() bool {
return true
}

func (m *mockConn) Override(c conn) {
if m.OverrideFn != nil {
m.OverrideFn(c)
Expand Down
39 changes: 29 additions & 10 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (c *clusterClient) _refresh() (err error) {
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)

conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt)}
Expand Down Expand Up @@ -245,30 +246,41 @@ func (c *clusterClient) _refresh() (err error) {
pslots := [16384]conn{}
var rslots []conn
for master, g := range groups {

var replicaNodesToConsider []ReplicaInfo
// consider only healthy replica nodes for conn assignment to slots
for i := 1; i < len(g.nodes); i++ {
if cc, ok := conns[g.nodes[i].Addr]; ok {
if !cc.conn.IsLoading() {
replicaNodesToConsider = append(replicaNodesToConsider, g.nodes[i])
}
}
}
replicaNodeCount := len(replicaNodesToConsider)

switch {
case c.opt.ReplicaOnly && len(g.nodes) > 1:
nodesCount := len(g.nodes)
case c.opt.ReplicaOnly && len(replicaNodesToConsider) > 0:
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn
pslots[i] = conns[replicaNodesToConsider[util.FastRand(replicaNodeCount)].Addr].conn
}
}
case c.rOpt != nil:
if len(rslots) == 0 { // lazy init
rslots = make([]conn, 16384)
}
if len(g.nodes) > 1 {
n := len(g.nodes) - 1
if len(replicaNodesToConsider) > 0 {
n := len(replicaNodesToConsider)

if c.opt.EnableReplicaAZInfo {
var wg sync.WaitGroup
for i := 1; i <= n; i += 4 { // batch AZ() for every 4 connections
for i := 0; i <= n; i += 4 { // batch AZ() for every 4 connections
for j := i; j <= i+4 && j <= n; j++ {
wg.Add(1)
go func(wg *sync.WaitGroup, conn conn, info *ReplicaInfo) {
info.AZ = conn.AZ()
wg.Done()
}(&wg, conns[g.nodes[j].Addr].conn, &g.nodes[j])
}(&wg, conns[replicaNodesToConsider[j].Addr].conn, &replicaNodesToConsider[j])
}
wg.Wait()
}
Expand All @@ -277,9 +289,9 @@ func (c *clusterClient) _refresh() (err error) {
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
pslots[i] = conns[master].conn
rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:])
rIndex := c.opt.ReplicaSelector(uint16(i), replicaNodesToConsider)
if rIndex >= 0 && rIndex < n {
rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
rslots[i] = conns[replicaNodesToConsider[rIndex].Addr].conn
} else {
rslots[i] = conns[master].conn
}
Expand Down Expand Up @@ -524,6 +536,10 @@ process:
resp = results.s[1]
resultsp.Put(results)
goto process
case RedirectLoadingRetry:

c.refresh(ctx) // on-demand refresh
fallthrough
case RedirectRetry:
if c.retry && cmd.IsReadOnly() {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error())
Expand Down Expand Up @@ -1229,8 +1245,10 @@ func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr
mode = RedirectMove
} else if addr, ok = err.IsAsk(); ok {
mode = RedirectAsk
} else if err.IsClusterDown() || err.IsTryAgain() || err.IsLoading() {
} else if err.IsClusterDown() || err.IsTryAgain() {
mode = RedirectRetry
} else if err.IsLoading() {
mode = RedirectLoadingRetry
}
} else if ctx.Err() == nil {
mode = RedirectRetry
Expand Down Expand Up @@ -1447,6 +1465,7 @@ const (
RedirectMove
RedirectAsk
RedirectRetry
RedirectLoadingRetry

panicMsgCxSlot = "cross slot command in Dedicated is prohibited"
panicMixCxSlot = "Mixing no-slot and cross slot commands in DoMulti is prohibited"
Expand Down
5 changes: 5 additions & 0 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ var (
PingCmd = Completed{
cs: newCommandSlice([]string{"PING"}),
}
// InfoCmd is predefined INFO
InfoPersistenceCmd = Completed{
cs: newCommandSlice([]string{"INFO", "Persistence"}),
}

// SlotCmd is predefined CLUSTER SLOTS
SlotCmd = Completed{
cs: newCommandSlice([]string{"CLUSTER", "SLOTS"}),
Expand Down
78 changes: 76 additions & 2 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -13,6 +14,10 @@ import (
"github.com/valkey-io/valkey-go/internal/util"
)

const (
loadingEtaSecondsKey = "loading_eta_seconds:"
)

type connFn func(dst string, opt *ClientOption) conn
type dialFn func(ctx context.Context, dst string, opt *ClientOption) (net.Conn, error)
type wireFn func(ctx context.Context) wire
Expand Down Expand Up @@ -43,6 +48,7 @@ type conn interface {
Addr() string
SetOnCloseHook(func(error))
OptInCmd() cmds.Completed
IsLoading() bool
}

var _ conn = (*mux)(nil)
Expand All @@ -61,8 +67,9 @@ type mux struct {
maxp int
maxm int

usePool bool
optIn bool
usePool bool
optIn bool
nodeLoadingStatus atomic.Uint32
}

func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
Expand Down Expand Up @@ -267,6 +274,10 @@ func (m *mux) blocking(pool *pool, ctx context.Context, cmd Completed) (resp Val
if resp.NonValkeyError() != nil { // abort the wire if blocking command return early (ex. context.DeadlineExceeded)
wire.Close()
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(resp.Error()) {
m.setLoadingStatus(ctx)
}
pool.Store(wire)
return resp
}
Expand All @@ -279,6 +290,10 @@ func (m *mux) blockingMulti(pool *pool, ctx context.Context, cmd []Completed) (r
wire.Close()
break
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(res.Error()) {
m.setLoadingStatus(ctx)
}
}
pool.Store(wire)
return resp
Expand All @@ -290,6 +305,10 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp ValkeyResult) {
if resp = wire.Do(ctx, cmd); isBroken(resp.NonValkeyError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(resp.Error()) {
m.setLoadingStatus(ctx)
}
return resp
}

Expand All @@ -302,6 +321,10 @@ func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *valkeyr
m.wire[slot].CompareAndSwap(wire, m.init)
return resp
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(r.Error()) {
m.setLoadingStatus(ctx)
}
}
return resp
}
Expand All @@ -313,6 +336,10 @@ func (m *mux) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) Val
if isBroken(resp.NonValkeyError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(resp.Error()) {
m.setLoadingStatus(ctx)
}
return resp
}

Expand Down Expand Up @@ -373,6 +400,10 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
m.wire[slot].CompareAndSwap(wire, m.init)
return resps
}
// check loading status of the node
if !m.IsLoading() && isLoadingError(r.Error()) {
m.setLoadingStatus(ctx)
}
}
return resps
}
Expand Down Expand Up @@ -411,10 +442,53 @@ func (m *mux) Addr() string {
return m.dst
}

func (m *mux) setLoadingStatus(ctx context.Context) {
w := m.pipe(ctx, 0)
res := w.Do(ctx, cmds.InfoPersistenceCmd)
r := res.String()
loadingEtaIdx := strings.Index(r, loadingEtaSecondsKey)
if loadingEtaIdx > 0 {
eta, _ := strconv.Atoi(string(r[loadingEtaIdx+len(loadingEtaSecondsKey)]))
if eta > 0 {
// this sets when the loading status will be expired
etaTime := time.Now().Add(time.Duration(eta) * time.Second).Unix()
m.nodeLoadingStatus.CompareAndSwap(0, uint32(etaTime))

}
}

}

func (m *mux) IsLoading() bool {

status := m.nodeLoadingStatus.Load()
if status == 0 {
return false
}

if time.Now().Unix() < int64(status) {
return true
}

m.nodeLoadingStatus.Store(0) // reset the status if expired

return false
}

func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}

func isLoadingError(err error) bool {
if err == nil {
return false
}
if verr, ok := err.(*ValkeyError); ok {
return verr.IsLoading()
}
return false
}

func slotfn(n int, ks uint16, noreply bool) uint16 {
if n == 1 || ks == cmds.NoSlot || noreply {
return 0
Expand Down
7 changes: 7 additions & 0 deletions valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ type ClientOption struct {
// EnableReplicaAZInfo enables the client to load the replica node's availability zone.
// If true, the client will set the `AZ` field in `ReplicaInfo`.
EnableReplicaAZInfo bool

// The interval to mark a node as unhealty during temporary degraded state, defaults to 15 seconds
// Note: Only used for cluster client.
UnHealthyNodeInterval time.Duration
}

// SentinelOption contains MasterSet,
Expand Down Expand Up @@ -442,6 +446,9 @@ func NewClient(option ClientOption) (client Client, err error) {
if option.RetryDelay == nil {
option.RetryDelay = defaultRetryDelayFn
}
if option.UnHealthyNodeInterval == 0 {
option.UnHealthyNodeInterval = 15 * time.Second // default to 15 seconds
}
if option.Sentinel.MasterSet != "" {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newSentinelClient(&option, makeConn, newRetryer(option.RetryDelay))
Expand Down
Loading