Skip to content

Commit 0f7b002

Browse files
Add RESP3 support ( --resp parameter ) (#9)
* Add RESP3 support ( --resp parameter ) * Using context.WithCancel after the move to radix v4 * Remove resp 2 from default --------- Co-authored-by: ofekshenawa <[email protected]>
1 parent 2f691f2 commit 0f7b002

File tree

3 files changed

+67
-36
lines changed

3 files changed

+67
-36
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ go 1.13
55
require (
66
github.com/golangci/golangci-lint v1.50.1 // indirect
77
github.com/mediocregopher/radix/v3 v3.5.2
8+
github.com/mediocregopher/radix/v4 v4.1.2
89
)

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,8 @@ github.com/mbilski/exhaustivestruct v1.2.0 h1:wCBmUnSYufAHO6J4AVWY6ff+oxWxsVFrwg
582582
github.com/mbilski/exhaustivestruct v1.2.0/go.mod h1:OeTBVxQWoEmB2J2JCHmXWPJ0aksxSUOUy+nvtVEfzXc=
583583
github.com/mediocregopher/radix/v3 v3.5.2 h1:A9u3G7n4+fWmDZ2ZDHtlK+cZl4q55T+7RjKjR0/MAdk=
584584
github.com/mediocregopher/radix/v3 v3.5.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
585+
github.com/mediocregopher/radix/v4 v4.1.2 h1:Pj7XnNK5WuzzFy63g98pnccainAePK+aZNQRvxSvj2I=
586+
github.com/mediocregopher/radix/v4 v4.1.2/go.mod h1:ajchozX/6ELmydxWeWM6xCFHVpZ4+67LXHOTOVR0nCE=
585587
github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg=
586588
github.com/mgechev/revive v1.2.4 h1:+2Hd/S8oO2H0Ikq2+egtNwQsVhAeELHjxjIUFX5ajLI=
587589
github.com/mgechev/revive v1.2.4/go.mod h1:iAWlQishqCuj4yhV24FTnKSXGpbAA+0SckXB8GQMX/Q=
@@ -833,6 +835,8 @@ github.com/tenntenn/modver v1.0.1/go.mod h1:bePIyQPb7UeioSRkw3Q0XeMhYZSMx9B8ePqg
833835
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
834836
github.com/tetafro/godot v1.4.11 h1:BVoBIqAf/2QdbFmSwAWnaIqDivZdOV0ZRwEm6jivLKw=
835837
github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8=
838+
github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI=
839+
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
836840
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
837841
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
838842
github.com/timonwong/loggercheck v0.9.3 h1:ecACo9fNiHxX4/Bc02rW2+kaJIAMAes7qJ7JKxt0EZI=

subscriber.go

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
6+
"errors"
57
"flag"
68
"fmt"
7-
radix "github.com/mediocregopher/radix/v3"
9+
radix "github.com/mediocregopher/radix/v4"
810
"io/ioutil"
911
"log"
1012
"os"
@@ -33,49 +35,50 @@ type testResult struct {
3335
Addresses []string `json:"Addresses"`
3436
}
3537

36-
func subscriberRoutine(addr string, subscriberName string, channel string, printMessages bool, stop chan struct{}, wg *sync.WaitGroup, opts []radix.DialOpt) {
38+
func subscriberRoutine(addr string, subscriberName string, channel string, printMessages bool, ctx context.Context, wg *sync.WaitGroup, opts radix.Dialer) {
3739
// tell the caller we've stopped
3840
defer wg.Done()
3941

40-
conn, _, _, msgCh, _ := bootstrapPubSub(addr, subscriberName, channel, opts)
41-
defer conn.Close()
42+
_, _, ps, _ := bootstrapPubSub(addr, subscriberName, channel, opts)
43+
defer ps.Close()
4244

4345
for {
44-
select {
45-
case msg := <-msgCh:
46-
if printMessages {
47-
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message))
48-
}
49-
atomic.AddUint64(&totalMessages, 1)
46+
msg, err := ps.Next(ctx)
47+
if errors.Is(err, context.Canceled) {
5048
break
51-
case <-stop:
52-
return
49+
} else if err != nil {
50+
panic(err)
5351
}
52+
if printMessages {
53+
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message))
54+
}
55+
atomic.AddUint64(&totalMessages, 1)
5456
}
5557
}
5658

57-
func bootstrapPubSub(addr string, subscriberName string, channel string, opts []radix.DialOpt) (radix.Conn, error, radix.PubSubConn, chan radix.PubSubMessage, *time.Ticker) {
59+
func bootstrapPubSub(addr string, subscriberName string, channel string, opts radix.Dialer) (radix.Conn, error, radix.PubSubConn, *time.Ticker) {
5860
// Create a normal redis connection
59-
conn, err := radix.Dial("tcp", addr, opts...)
61+
ctx := context.Background()
62+
conn, err := opts.Dial(ctx, "tcp", addr)
63+
6064
if err != nil {
6165
log.Fatal(err)
6266
}
6367

64-
err = conn.Do(radix.FlatCmd(nil, "CLIENT", "SETNAME", subscriberName))
68+
err = conn.Do(ctx, radix.FlatCmd(nil, "CLIENT", "SETNAME", subscriberName))
6569
if err != nil {
6670
log.Fatal(err)
6771
}
6872

6973
// Pass that connection into PubSub, conn should never get used after this
70-
ps := radix.PubSub(conn)
74+
ps := radix.PubSubConfig{}.New(conn)
7175

72-
msgCh := make(chan radix.PubSubMessage)
73-
err = ps.Subscribe(msgCh, channel)
76+
err = ps.Subscribe(ctx, channel)
7477
if err != nil {
7578
log.Fatal(err)
7679
}
7780

78-
return conn, err, ps, msgCh, nil
81+
return conn, err, ps, nil
7982
}
8083

8184
func main() {
@@ -95,23 +98,27 @@ func main() {
9598
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
9699
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
97100
printMessages := flag.Bool("print-messages", false, "print messages.")
98-
dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.")
101+
//TODO FIX ME
102+
//dialTimeout := flag.Duration("redis-timeout", time.Second*300, "determines the timeout to pass to redis connection setup. It adjust the connection, read, and write timeouts.")
103+
resp := flag.String("resp", "", "redis command response protocol (2 - RESP 2, 3 - RESP 3)")
99104
flag.Parse()
100105

101106
totalMessages = 0
102107
var nodes []radix.ClusterNode
103108
var nodesAddresses []string
104109
var node_subscriptions_count []int
105-
opts := make([]radix.DialOpt, 0)
110+
opts := radix.Dialer{}
106111
if *password != "" {
112+
opts.AuthPass = *password
107113
if *username != "" {
108-
opts = append(opts, radix.DialAuthUser(*username, *password))
109-
} else {
110-
opts = append(opts, radix.DialAuthPass(*password))
114+
opts.AuthUser = *username
111115
}
112116
}
113-
opts = append(opts, radix.DialTimeout(*dialTimeout))
114-
fmt.Printf("Using a redis connection, read, and write timeout of %v\n", *dialTimeout)
117+
if *resp == "2" {
118+
opts.Protocol = "2"
119+
} else if *resp == "3" {
120+
opts.Protocol = "3"
121+
}
115122
if *test_time != 0 && *messages_per_channel_subscriber != 0 {
116123
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
117124
}
@@ -126,7 +133,23 @@ func main() {
126133
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub, opts)
127134
}
128135

129-
stopChan := make(chan struct{})
136+
ctx := context.Background()
137+
// trap Ctrl+C and call cancel on the context
138+
// We Use this instead of the previous stopChannel + chan radix.PubSubMessage
139+
ctx, cancel := context.WithCancel(ctx)
140+
cS := make(chan os.Signal, 1)
141+
signal.Notify(cS, os.Interrupt)
142+
defer func() {
143+
signal.Stop(cS)
144+
cancel()
145+
}()
146+
go func() {
147+
select {
148+
case <-cS:
149+
cancel()
150+
case <-ctx.Done():
151+
}
152+
}()
130153

131154
// a WaitGroup for the goroutines to tell us they've stopped
132155
wg := sync.WaitGroup{}
@@ -145,7 +168,7 @@ func main() {
145168
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
146169
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
147170
wg.Add(1)
148-
go subscriberRoutine(addr.Addr, subscriberName, channel, *printMessages, stopChan, &wg, opts)
171+
go subscriberRoutine(addr.Addr, subscriberName, channel, *printMessages, ctx, &wg, opts)
149172
}
150173
}
151174
}
@@ -195,7 +218,7 @@ func main() {
195218
}
196219

197220
// tell the goroutine to stop
198-
close(stopChan)
221+
close(c)
199222
// and wait for them both to reply back
200223
wg.Wait()
201224
}
@@ -218,14 +241,15 @@ func getClusterNodesFromArgs(nodes []radix.ClusterNode, port *string, host *stri
218241
return nodes, nodesAddresses, node_subscriptions_count
219242
}
220243

221-
func getClusterNodesFromTopology(host *string, port *string, nodes []radix.ClusterNode, nodesAddresses []string, node_subscriptions_count []int, opts []radix.DialOpt) ([]radix.ClusterNode, []string, []int) {
244+
func getClusterNodesFromTopology(host *string, port *string, nodes []radix.ClusterNode, nodesAddresses []string, node_subscriptions_count []int, opts radix.Dialer) ([]radix.ClusterNode, []string, []int) {
222245
// Create a normal redis connection
223-
conn, err := radix.Dial("tcp", fmt.Sprintf("%s:%s", *host, *port), opts...)
246+
ctx := context.Background()
247+
conn, err := opts.Dial(ctx, "tcp", fmt.Sprintf("%s:%s", *host, *port))
224248
if err != nil {
225249
panic(err)
226250
}
227251
var topology radix.ClusterTopo
228-
err = conn.Do(radix.FlatCmd(&topology, "CLUSTER", "SLOTS"))
252+
err = conn.Do(ctx, radix.FlatCmd(&topology, "CLUSTER", "SLOTS"))
229253
if err != nil {
230254
log.Fatal(err)
231255
}
@@ -292,17 +316,18 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
292316
return false, start, time.Since(start), totalMessages, messageRateTs
293317
}
294318

295-
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts []radix.DialOpt) {
319+
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string, opts radix.Dialer) {
296320
for _, slot := range nodes {
297-
conn, err := radix.Dial("tcp", slot.Addr, opts...)
321+
ctx := context.Background()
322+
conn, err := opts.Dial(ctx, "tcp", slot.Addr)
298323
if err != nil {
299324
panic(err)
300325
}
301326
_, err, pubsubTopology := getPubSubBufferLimit(err, conn)
302327
if strings.Compare(*client_output_buffer_limit_pubsub, pubsubTopology) != 0 {
303328
fmt.Println(fmt.Sprintf("\tCHANGING DB pubsub topology for address %s from %s to %s", slot.Addr, pubsubTopology, *client_output_buffer_limit_pubsub))
304329

305-
err = conn.Do(radix.FlatCmd(nil, "CONFIG", "SET", "client-output-buffer-limit", fmt.Sprintf("pubsub %s", *client_output_buffer_limit_pubsub)))
330+
err = conn.Do(ctx, radix.FlatCmd(nil, "CONFIG", "SET", "client-output-buffer-limit", fmt.Sprintf("pubsub %s", *client_output_buffer_limit_pubsub)))
306331
if err != nil {
307332
log.Fatal(err)
308333
}
@@ -320,7 +345,8 @@ func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output
320345

321346
func getPubSubBufferLimit(err error, conn radix.Conn) ([]string, error, string) {
322347
var topologyResponse []string
323-
err = conn.Do(radix.FlatCmd(&topologyResponse, "CONFIG", "GET", "client-output-buffer-limit"))
348+
ctx := context.Background()
349+
err = conn.Do(ctx, radix.FlatCmd(&topologyResponse, "CONFIG", "GET", "client-output-buffer-limit"))
324350
if err != nil {
325351
log.Fatal(err)
326352
}

0 commit comments

Comments
 (0)