diff --git a/commands_test.go b/commands_test.go index 641757d23..edc956943 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2588,13 +2588,14 @@ var _ = Describe("Commands", func() { Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + expireAt := time.Now().Add(10 * time.Second) + res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(BeEquivalentTo([]int64{time.Now().Add(10 * time.Second).UnixMilli(), -1, -2})) + Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2})) }) It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { @@ -5888,6 +5889,78 @@ var _ = Describe("Commands", func() { Expect(err).To(Equal(redis.Nil)) }) + It("should XRead LastEntry", Label("NonRedisEnterprise"), func() { + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"stream"}, + Count: 2, // we expect 1 message + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + }) + + It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() { + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"stream", "stream"}, + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + }) + + It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() { + start := time.Now() + go func() { + defer GinkgoRecover() + + time.Sleep(100 * time.Millisecond) + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "empty", + ID: "4-0", + Values: map[string]interface{}{"quatro": "quatre"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("4-0")) + }() + + res, err := client.XRead(ctx, &redis.XReadArgs{ + Streams: []string{"empty"}, + Block: 500 * time.Millisecond, + ID: "+", + }).Result() + Expect(err).NotTo(HaveOccurred()) + // Ensure that the XRead call with LastEntry option blocked for at least 100ms. + Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond)) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "empty", + Messages: []redis.XMessage{ + {ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}}, + }, + }, + })) + }) + Describe("group", func() { BeforeEach(func() { err := client.XGroupCreate(ctx, "stream", "group", "0").Err() diff --git a/stream_commands.go b/stream_commands.go index 1ad33740c..6d7b22922 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -137,10 +137,11 @@ type XReadArgs struct { Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2 Count int64 Block time.Duration + ID string } func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { - args := make([]interface{}, 0, 6+len(a.Streams)) + args := make([]interface{}, 0, 2*len(a.Streams)+6) args = append(args, "xread") keyPos := int8(1) @@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { for _, s := range a.Streams { args = append(args, s) } + if a.ID != "" { + for range a.Streams { + args = append(args, a.ID) + } + } cmd := NewXStreamSliceCmd(ctx, args...) if a.Block >= 0 {