77 "fmt"
88 "io/ioutil"
99 "log"
10+ "math"
1011 "math/rand"
1112 "os"
1213 "os/signal"
@@ -20,6 +21,7 @@ import (
2021
2122 hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
2223 redis "github.com/redis/go-redis/v9"
24+ "golang.org/x/time/rate"
2325)
2426
2527const (
@@ -31,6 +33,8 @@ const (
3133 redisTLSInsecureSkipVerify = "tls_insecure_skip_verify"
3234)
3335
36+ const Inf = rate .Limit (math .MaxFloat64 )
37+
3438var totalMessages uint64
3539var totalSubscribedChannels int64
3640var totalPublishers int64
@@ -53,7 +57,7 @@ type testResult struct {
5357 Addresses []string `json:"Addresses"`
5458}
5559
56- func publisherRoutine (clientName string , channels []string , mode string , measureRTT bool , verbose bool , dataSize int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client ) {
60+ func publisherRoutine (clientName string , channels []string , mode string , measureRTT bool , verbose bool , dataSize int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client , useLimiter bool , rateLimiter * rate. Limiter ) {
5761 defer wg .Done ()
5862
5963 if verbose {
@@ -81,12 +85,16 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
8185
8286 default :
8387 msg := payload
84- if measureRTT {
85- now := time .Now ().UnixMicro ()
86- msg = strconv .FormatInt (now , 10 )
87- }
8888
8989 for _ , ch := range channels {
90+ if useLimiter {
91+ r := rateLimiter .ReserveN (time .Now (), int (1 ))
92+ time .Sleep (r .Delay ())
93+ }
94+ if measureRTT {
95+ now := time .Now ().UnixMicro ()
96+ msg = strconv .FormatInt (now , 10 )
97+ }
9098 var err error
9199 switch mode {
92100 case "spublish" :
@@ -210,6 +218,8 @@ func main() {
210218 host := flag .String ("host" , "127.0.0.1" , "redis host." )
211219 port := flag .String ("port" , "6379" , "redis port." )
212220 cpuprofile := flag .String ("cpuprofile" , "" , "write cpu profile to file" )
221+ rps := flag .Int64 ("rps" , 0 , "Max rps. If 0 no limit is applied and the DB is stressed up to maximum." )
222+ rpsburst := flag .Int64 ("rps-burst" , 0 , "Max rps burst. If 0 the allowed burst will be the ammount of clients." )
213223 password := flag .String ("a" , "" , "Password for Redis Auth." )
214224 dataSize := flag .Int ("data-size" , 128 , "Payload size in bytes for publisher messages when RTT mode is disabled" )
215225 mode := flag .String ("mode" , "subscribe" , "Subscribe mode. Either 'subscribe' or 'ssubscribe'." )
@@ -381,6 +391,21 @@ func main() {
381391 rttLatencyChannel := make (chan int64 , 100000 ) // Channel for RTT measurements. buffer of 100K messages to process
382392 totalCreatedClients := 0
383393 if strings .Contains (* mode , "publish" ) {
394+ var requestRate = Inf
395+ var requestBurst = int (* rps )
396+ useRateLimiter := false
397+ if * rps != 0 {
398+ requestRate = rate .Limit (* rps )
399+ log .Println (fmt .Sprintf ("running publisher mode with rate-limit enabled. Max published %d messages/sec.\n " , * rps ))
400+ useRateLimiter = true
401+ if * rpsburst != 0 {
402+ requestBurst = int (* rpsburst )
403+ }
404+ } else {
405+ log .Println (fmt .Sprintf ("running publisher mode with maximum rate enabled." ))
406+ }
407+
408+ var rateLimiter = rate .NewLimiter (requestRate , requestBurst )
384409 // Only run publishers
385410 for client_id := 1 ; client_id <= * clients ; client_id ++ {
386411 channels := []string {}
@@ -421,7 +446,7 @@ func main() {
421446 }
422447
423448 wg .Add (1 )
424- go publisherRoutine (publisherName , channels , * mode , * measureRTT , * verbose , * dataSize , ctx , & wg , client )
449+ go publisherRoutine (publisherName , channels , * mode , * measureRTT , * verbose , * dataSize , ctx , & wg , client , useRateLimiter , rateLimiter )
425450 atomic .AddInt64 (& totalPublishers , 1 )
426451 atomic .AddUint64 (& totalConnects , 1 )
427452 }
0 commit comments