Skip to content

Commit

Permalink
Increase dial timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitpaulk committed Jan 31, 2020
1 parent 732deb6 commit 0c481cd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ next_version_number := $(shell echo $$(($(current_version_number)+1)))
release:
git tag v$(next_version_number)
git push origin master v$(next_version_number)

test:
go test -v
33 changes: 15 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@ func main() {

args := flag.Args()

opts, err := redis.ParseURL(redisUrl)
opts.DialTimeout = time.Second * 30
if err != nil {
fmt.Printf("Err: %v\n", err)
os.Exit(1)
}

redisClient := redis.NewClient(opts)

if args[0] == "follow" {
fmt.Printf("Streaming logs...")
consumer, err := NewConsumer(redisUrl, streamKey)
consumer, err := NewConsumer(redisClient, streamKey)
if err != nil {
fmt.Printf("Err: %v\n", err)
os.Exit(1)
Expand All @@ -43,7 +52,7 @@ func main() {
}
} else if args[0] == "run" {
fmt.Printf("Streaming logs..")
producer, err := NewProducer(redisUrl, streamKey)
producer, err := NewProducer(redisClient, streamKey)
if err != nil {
fmt.Printf("Err: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -105,17 +114,10 @@ type Producer struct {
streamKey string
}

func NewProducer(redisUrl string, streamKey string) (*Producer, error) {
opts, err := redis.ParseURL(redisUrl)
if err != nil {
return nil, err
}

redisClient := redis.NewClient(opts)

func NewProducer(redisClient *redis.Client, streamKey string) (*Producer, error) {
// Delete the key first
cmd := redisClient.Del(streamKey)
_, err = cmd.Result()
_, err := cmd.Result()
if err != nil {
return nil, err
}
Expand All @@ -133,14 +135,9 @@ type Consumer struct {
bytesReadOfCurrentMessage int
}

func NewConsumer(redisUrl string, streamKey string) (*Consumer, error) {
opts, err := redis.ParseURL(redisUrl)
if err != nil {
return nil, err
}

func NewConsumer(redisClient *redis.Client, streamKey string) (*Consumer, error) {
return &Consumer{
redisClient: redis.NewClient(opts),
redisClient: redisClient,
streamKey: streamKey,
lastMessageID: "0",
}, nil
Expand Down
22 changes: 18 additions & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"io/ioutil"
"strings"
"testing"
"time"

"github.com/go-redis/redis/v7"
)

func TestConsumerAndProducer(t *testing.T) {
p, err := NewProducer("redis://localhost:6379", "testKey")
p, err := NewProducer(redisClient(t), "testKey")
if err != nil {
t.Fatalf("Create Producer Error: %v", err)
}
Expand All @@ -24,7 +27,7 @@ func TestConsumerAndProducer(t *testing.T) {

p.Close()

c, err := NewConsumer("redis://localhost:6379", "testKey")
c, err := NewConsumer(redisClient(t), "testKey")
bytes, err := ioutil.ReadAll(c)
if err != nil {
t.Errorf("Read Error: %v", err)
Expand All @@ -37,7 +40,7 @@ func TestConsumerAndProducer(t *testing.T) {
}

func TestLargeMessage(t *testing.T) {
p, err := NewProducer("redis://localhost:6379", "testKey2")
p, err := NewProducer(redisClient(t), "testKey2")
if err != nil {
t.Fatalf("Create Producer Error: %v", err)
}
Expand All @@ -51,7 +54,7 @@ func TestLargeMessage(t *testing.T) {

p.Close()

c, err := NewConsumer("redis://localhost:6379", "testKey2")
c, err := NewConsumer(redisClient(t), "testKey2")
bytes, err := ioutil.ReadAll(c)
if err != nil {
t.Errorf("Read Error: %v", err)
Expand All @@ -61,3 +64,14 @@ func TestLargeMessage(t *testing.T) {
t.Errorf("Expected long string, got: %v", string(bytes))
}
}

func redisClient(t *testing.T) *redis.Client {
opts, err := redis.ParseURL("redis://localhost:6379/0")
opts.DialTimeout = time.Second * 30
if err != nil {
t.Errorf("Err: %v", err)
t.FailNow()
}

return redis.NewClient(opts)
}

0 comments on commit 0c481cd

Please sign in to comment.