Skip to content

Commit 5053db2

Browse files
committed
fix: wrap cmds in Conn.TxPipeline
1 parent 0884e48 commit 5053db2

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

redis.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -725,21 +725,13 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
725725
return err
726726
}
727727

728-
func (c *Conn) processPipeline(ctx context.Context, cmds []Cmder) error {
729-
return c.hooks.processPipeline(ctx, cmds)
730-
}
731-
732-
func (c *Conn) processTxPipeline(ctx context.Context, cmds []Cmder) error {
733-
return c.hooks.processTxPipeline(ctx, cmds)
734-
}
735-
736728
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
737729
return c.Pipeline().Pipelined(ctx, fn)
738730
}
739731

740732
func (c *Conn) Pipeline() Pipeliner {
741733
pipe := Pipeline{
742-
exec: c.processPipeline,
734+
exec: c.hooks.processPipeline,
743735
}
744736
pipe.init()
745737
return &pipe
@@ -752,7 +744,10 @@ func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmd
752744
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
753745
func (c *Conn) TxPipeline() Pipeliner {
754746
pipe := Pipeline{
755-
exec: c.processTxPipeline,
747+
exec: func(ctx context.Context, cmds []Cmder) error {
748+
cmds = wrapMultiExec(ctx, cmds)
749+
return c.hooks.processTxPipeline(ctx, cmds)
750+
},
756751
}
757752
pipe.init()
758753
return &pipe

redis_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -446,3 +446,25 @@ var _ = Describe("Client context cancelation", func() {
446446
Expect(err).To(BeIdenticalTo(context.Canceled))
447447
})
448448
})
449+
450+
var _ = Describe("Conn", func() {
451+
var client *redis.Client
452+
453+
BeforeEach(func() {
454+
client = redis.NewClient(redisOptions())
455+
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
456+
})
457+
458+
AfterEach(func() {
459+
err := client.Close()
460+
Expect(err).NotTo(HaveOccurred())
461+
})
462+
463+
It("TxPipeline", func() {
464+
tx := client.Conn().TxPipeline()
465+
tx.SwapDB(ctx, 0, 2)
466+
tx.SwapDB(ctx, 1, 0)
467+
_, err := tx.Exec(ctx)
468+
Expect(err).NotTo(HaveOccurred())
469+
})
470+
})

0 commit comments

Comments
 (0)