Skip to content

Commit f57b061

Browse files
committed
support no block xreadgroup
1 parent 73c879d commit f57b061

File tree

5 files changed

+33
-2
lines changed

5 files changed

+33
-2
lines changed

extra/rediscensus/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
22
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
33
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
44
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
5+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
56
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
67
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
8+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
79
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
810
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
911
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=

extra/rediscmd/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.15
55
replace github.com/redis/go-redis/v9 => ../..
66

77
require (
8-
github.com/bsm/ginkgo/v2 v2.7.0
9-
github.com/bsm/gomega v1.26.0
8+
github.com/bsm/ginkgo/v2 v2.12.0
9+
github.com/bsm/gomega v1.27.10
1010
github.com/redis/go-redis/v9 v9.3.1
1111
)

extra/rediscmd/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
22
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
3+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
34
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
45
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
6+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
57
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
68
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
79
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=

extra/redisotel/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
22
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
3+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
34
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
45
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
6+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
57
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
68
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
79
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

stream_commands.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type StreamCmdable interface {
2222
XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
2323
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
2424
XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
25+
XReadGroupNoBlock(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
2526
XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
2627
XPending(ctx context.Context, stream, group string) *XPendingCmd
2728
XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
@@ -221,6 +222,30 @@ type XReadGroupArgs struct {
221222
NoAck bool
222223
}
223224

225+
func (c cmdable) XReadGroupNoBlock(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
226+
args := make([]interface{}, 0, 10+len(a.Streams))
227+
args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
228+
229+
keyPos := int8(4)
230+
if a.Count > 0 {
231+
args = append(args, "count", a.Count)
232+
keyPos += 2
233+
}
234+
if a.NoAck {
235+
args = append(args, "noack")
236+
keyPos++
237+
}
238+
args = append(args, "streams")
239+
keyPos++
240+
for _, s := range a.Streams {
241+
args = append(args, s)
242+
}
243+
cmd := NewXStreamSliceCmd(ctx, args...)
244+
cmd.SetFirstKeyPos(keyPos)
245+
_ = c(ctx, cmd)
246+
return cmd
247+
}
248+
224249
func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
225250
args := make([]interface{}, 0, 10+len(a.Streams))
226251
args = append(args, "xreadgroup", "group", a.Group, a.Consumer)

0 commit comments

Comments
 (0)