Skip to content

Commit

Permalink
Merge pull request #2292 from AlexanderYastrebov/ring/reduce-set-addr…
Browse files Browse the repository at this point in the history
…s-shards-locking-2

fix: reduce `SetAddrs` shards lock contention
  • Loading branch information
vmihailenco authored Nov 23, 2022
2 parents 5502cf6 + 7c6f677 commit 4c46468
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 33 deletions.
118 changes: 118 additions & 0 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package redis
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -107,6 +110,7 @@ func TestRingSetAddrsAndRebalanceRace(t *testing.T) {
}
},
})
defer ring.Close()

// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
Expand Down Expand Up @@ -156,13 +160,127 @@ func BenchmarkRingShardingRebalanceLocked(b *testing.B) {
}

ring := NewRing(opts)
defer ring.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
ring.sharding.rebalanceLocked()
}
}

type testCounter struct {
mu sync.Mutex
t *testing.T
m map[string]int
}

func newTestCounter(t *testing.T) *testCounter {
return &testCounter{t: t, m: make(map[string]int)}
}

func (ct *testCounter) increment(key string) {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.m[key]++
}

func (ct *testCounter) expect(values map[string]int) {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.t.Helper()
if !reflect.DeepEqual(values, ct.m) {
ct.t.Errorf("expected %v != actual %v", values, ct.m)
}
}

func TestRingShardsCleanup(t *testing.T) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"

ringShard1Addr = "shard1.test"
ringShard2Addr = "shard2.test"
)

t.Run("closes unused shards", func(t *testing.T) {
closeCounter := newTestCounter(t)

ring := NewRing(&RingOptions{
Addrs: map[string]string{
ringShard1Name: ringShard1Addr,
ringShard2Name: ringShard2Addr,
},
NewClient: func(opt *Options) *Client {
c := NewClient(opt)
c.baseClient.onClose = func() error {
closeCounter.increment(opt.Addr)
return nil
}
return c
},
})
closeCounter.expect(map[string]int{})

// no change due to the same addresses
ring.SetAddrs(map[string]string{
ringShard1Name: ringShard1Addr,
ringShard2Name: ringShard2Addr,
})
closeCounter.expect(map[string]int{})

ring.SetAddrs(map[string]string{
ringShard1Name: ringShard1Addr,
})
closeCounter.expect(map[string]int{ringShard2Addr: 1})

ring.SetAddrs(map[string]string{
ringShard2Name: ringShard2Addr,
})
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})

ring.Close()
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 2})
})

t.Run("closes created shards if ring was closed", func(t *testing.T) {
createCounter := newTestCounter(t)
closeCounter := newTestCounter(t)

var (
ring *Ring
shouldClose int32
)

ring = NewRing(&RingOptions{
Addrs: map[string]string{
ringShard1Name: ringShard1Addr,
},
NewClient: func(opt *Options) *Client {
if atomic.LoadInt32(&shouldClose) != 0 {
ring.Close()
}
createCounter.increment(opt.Addr)
c := NewClient(opt)
c.baseClient.onClose = func() error {
closeCounter.increment(opt.Addr)
return nil
}
return c
},
})
createCounter.expect(map[string]int{ringShard1Addr: 1})
closeCounter.expect(map[string]int{})

atomic.StoreInt32(&shouldClose, 1)

ring.SetAddrs(map[string]string{
ringShard2Name: ringShard2Addr,
})
createCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})
})
}

//------------------------------------------------------------------------------

type timeoutErr struct {
Expand Down
68 changes: 41 additions & 27 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ type ringSharding struct {
hash ConsistentHash
numShard int
onNewNode []func(rdb *Client)

// ensures exclusive access to SetAddrs so there is no need
// to hold mu for the duration of potentially long shard creation
setAddrsMu sync.Mutex
}

type ringShards struct {
Expand All @@ -245,46 +249,62 @@ func (c *ringSharding) OnNewNode(fn func(rdb *Client)) {
// decrease number of shards, that you use. It will reuse shards that
// existed before and close the ones that will not be used anymore.
func (c *ringSharding) SetAddrs(addrs map[string]string) {
c.mu.Lock()
c.setAddrsMu.Lock()
defer c.setAddrsMu.Unlock()

cleanup := func(shards map[string]*ringShard) {
for addr, shard := range shards {
if err := shard.Client.Close(); err != nil {
internal.Logger.Printf(context.Background(), "shard.Close %s failed: %s", addr, err)
}
}
}

c.mu.RLock()
if c.closed {
c.mu.Unlock()
c.mu.RUnlock()
return
}
existing := c.shards
c.mu.RUnlock()

shards, created, unused := c.newRingShards(addrs, existing)

shards, cleanup := c.newRingShards(addrs, c.shards)
c.mu.Lock()
if c.closed {
cleanup(created)
c.mu.Unlock()
return
}
c.shards = shards
c.rebalanceLocked()
c.mu.Unlock()

cleanup()
cleanup(unused)
}

func (c *ringSharding) newRingShards(
addrs map[string]string, existingShards *ringShards,
) (*ringShards, func()) {
shardMap := make(map[string]*ringShard) // indexed by addr
unusedShards := make(map[string]*ringShard) // indexed by addr

if existingShards != nil {
for _, shard := range existingShards.list {
addr := shard.Client.opt.Addr
shardMap[addr] = shard
unusedShards[addr] = shard
}
}
addrs map[string]string, existing *ringShards,
) (shards *ringShards, created, unused map[string]*ringShard) {

shards = &ringShards{m: make(map[string]*ringShard, len(addrs))}
created = make(map[string]*ringShard) // indexed by addr
unused = make(map[string]*ringShard) // indexed by addr

shards := &ringShards{
m: make(map[string]*ringShard),
if existing != nil {
for _, shard := range existing.list {
unused[shard.addr] = shard
}
}

for name, addr := range addrs {
if shard, ok := shardMap[addr]; ok {
if shard, ok := unused[addr]; ok {
shards.m[name] = shard
delete(unusedShards, addr)
delete(unused, addr)
} else {
shard := newRingShard(c.opt, addr)
shards.m[name] = shard
created[addr] = shard

for _, fn := range c.onNewNode {
fn(shard.Client)
Expand All @@ -296,13 +316,7 @@ func (c *ringSharding) newRingShards(
shards.list = append(shards.list, shard)
}

return shards, func() {
for addr, shard := range unusedShards {
if err := shard.Client.Close(); err != nil {
internal.Logger.Printf(context.Background(), "shard.Close %s failed: %s", addr, err)
}
}
}
return
}

func (c *ringSharding) List() []*ringShard {
Expand Down
99 changes: 93 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strconv"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -123,15 +124,15 @@ var _ = Describe("Redis Ring", func() {
})
Expect(ring.Len(), 1)
gotShard := ring.ShardByName("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
Expect(gotShard).To(BeIdenticalTo(wantShard))

ring.SetAddrs(map[string]string{
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
})
Expect(ring.Len(), 2)
gotShard = ring.ShardByName("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
Expect(gotShard).To(BeIdenticalTo(wantShard))
})

It("uses 3 shards after setting it to 3 shards", func() {
Expand All @@ -155,8 +156,8 @@ var _ = Describe("Redis Ring", func() {
gotShard1 := ring.ShardByName(shardName1)
gotShard2 := ring.ShardByName(shardName2)
gotShard3 := ring.ShardByName(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard1).To(BeIdenticalTo(wantShard1))
Expect(gotShard2).To(BeIdenticalTo(wantShard2))
Expect(gotShard3).ToNot(BeNil())

ring.SetAddrs(map[string]string{
Expand All @@ -167,8 +168,8 @@ var _ = Describe("Redis Ring", func() {
gotShard1 = ring.ShardByName(shardName1)
gotShard2 = ring.ShardByName(shardName2)
gotShard3 = ring.ShardByName(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard1).To(BeIdenticalTo(wantShard1))
Expect(gotShard2).To(BeIdenticalTo(wantShard2))
Expect(gotShard3).To(BeNil())
})
})
Expand Down Expand Up @@ -739,3 +740,89 @@ var _ = Describe("Ring Tx timeout", func() {
testTimeout()
})
})

func TestRingSetAddrsContention(t *testing.T) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"
)

for _, port := range []string{ringShard1Port, ringShard2Port} {
if _, err := startRedis(port); err != nil {
t.Fatal(err)
}
}

t.Cleanup(func() {
for _, p := range processes {
if err := p.Close(); err != nil {
t.Errorf("Failed to stop redis process: %v", err)
}
}
processes = nil
})

ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShardOne": ":" + ringShard1Port,
},
NewClient: func(opt *redis.Options) *redis.Client {
// Simulate slow shard creation
time.Sleep(100 * time.Millisecond)
return redis.NewClient(opt)
},
})
defer ring.Close()

if _, err := ring.Ping(context.Background()).Result(); err != nil {
t.Fatal(err)
}

// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
defer func() { close(updatesDone) }()
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ticker.C:
if i%2 == 0 {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
})
} else {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
ringShard2Name: ":" + ringShard2Port,
})
}
case <-updatesDone:
return
}
}
}()

var pings, errClosed int
timer := time.NewTimer(1 * time.Second)
for running := true; running; pings++ {
select {
case <-timer.C:
running = false
default:
if _, err := ring.Ping(context.Background()).Result(); err != nil {
if err == redis.ErrClosed {
// The shard client could be closed while ping command is in progress
errClosed++
} else {
t.Fatal(err)
}
}
}
}

t.Logf("Number of pings: %d, errClosed: %d", pings, errClosed)
if pings < 10_000 {
t.Errorf("Expected at least 10k pings, got: %d", pings)
}
}

0 comments on commit 4c46468

Please sign in to comment.