Skip to content

Commit dd9a200

Browse files
authored
fix: improve pipelines retry logic (#2232)
* fix: improve pipelines retry logic
1 parent 6327c52 commit dd9a200

File tree

4 files changed

+96
-125
lines changed

4 files changed

+96
-125
lines changed

cluster.go

+51-60
Original file line numberDiff line numberDiff line change
@@ -1180,8 +1180,8 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error
11801180

11811181
func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
11821182
cmdsMap := newCmdsMap()
1183-
err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
1184-
if err != nil {
1183+
1184+
if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
11851185
setCmdsErr(cmds, err)
11861186
return err
11871187
}
@@ -1201,18 +1201,7 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
12011201
wg.Add(1)
12021202
go func(node *clusterNode, cmds []Cmder) {
12031203
defer wg.Done()
1204-
1205-
err := c._processPipelineNode(ctx, node, cmds, failedCmds)
1206-
if err == nil {
1207-
return
1208-
}
1209-
if attempt < c.opt.MaxRedirects {
1210-
if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1211-
setCmdsErr(cmds, err)
1212-
}
1213-
} else {
1214-
setCmdsErr(cmds, err)
1215-
}
1204+
c._processPipelineNode(ctx, node, cmds, failedCmds)
12161205
}(node, cmds)
12171206
}
12181207

@@ -1267,13 +1256,13 @@ func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool
12671256

12681257
func (c *ClusterClient) _processPipelineNode(
12691258
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1270-
) error {
1271-
return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1259+
) {
1260+
_ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
12721261
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1273-
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1262+
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
12741263
return writeCmds(wr, cmds)
1275-
})
1276-
if err != nil {
1264+
}); err != nil {
1265+
setCmdsErr(cmds, err)
12771266
return err
12781267
}
12791268

@@ -1291,7 +1280,7 @@ func (c *ClusterClient) pipelineReadCmds(
12911280
cmds []Cmder,
12921281
failedCmds *cmdsMap,
12931282
) error {
1294-
for _, cmd := range cmds {
1283+
for i, cmd := range cmds {
12951284
err := cmd.readReply(rd)
12961285
cmd.SetErr(err)
12971286

@@ -1303,15 +1292,24 @@ func (c *ClusterClient) pipelineReadCmds(
13031292
continue
13041293
}
13051294

1306-
if c.opt.ReadOnly && (isLoadingError(err) || !isRedisError(err)) {
1295+
if c.opt.ReadOnly {
13071296
node.MarkAsFailing()
1308-
return err
13091297
}
1310-
if isRedisError(err) {
1311-
continue
1298+
1299+
if !isRedisError(err) {
1300+
if shouldRetry(err, true) {
1301+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1302+
}
1303+
setCmdsErr(cmds[i+1:], err)
1304+
return err
13121305
}
1306+
}
1307+
1308+
if err := cmds[0].Err(); err != nil && shouldRetry(err, true) {
1309+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
13131310
return err
13141311
}
1312+
13151313
return nil
13161314
}
13171315

@@ -1393,19 +1391,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
13931391
wg.Add(1)
13941392
go func(node *clusterNode, cmds []Cmder) {
13951393
defer wg.Done()
1396-
1397-
err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1398-
if err == nil {
1399-
return
1400-
}
1401-
1402-
if attempt < c.opt.MaxRedirects {
1403-
if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1404-
setCmdsErr(cmds, err)
1405-
}
1406-
} else {
1407-
setCmdsErr(cmds, err)
1408-
}
1394+
c._processTxPipelineNode(ctx, node, cmds, failedCmds)
14091395
}(node, cmds)
14101396
}
14111397

@@ -1431,34 +1417,39 @@ func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int
14311417

14321418
func (c *ClusterClient) _processTxPipelineNode(
14331419
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1434-
) error {
1435-
return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1436-
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1437-
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1438-
return writeCmds(wr, cmds)
1439-
})
1440-
if err != nil {
1441-
return err
1442-
}
1443-
1444-
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1445-
statusCmd := cmds[0].(*StatusCmd)
1446-
// Trim multi and exec.
1447-
cmds = cmds[1 : len(cmds)-1]
1448-
1449-
err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
1450-
if err != nil {
1451-
moved, ask, addr := isMovedError(err)
1452-
if moved || ask {
1453-
return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
1454-
}
1420+
) {
1421+
_ = node.Client.hooks.processTxPipeline(
1422+
ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1423+
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1424+
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1425+
return writeCmds(wr, cmds)
1426+
}); err != nil {
1427+
setCmdsErr(cmds, err)
14551428
return err
14561429
}
14571430

1458-
return pipelineReadCmds(rd, cmds)
1431+
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1432+
statusCmd := cmds[0].(*StatusCmd)
1433+
// Trim multi and exec.
1434+
trimmedCmds := cmds[1 : len(cmds)-1]
1435+
1436+
if err := c.txPipelineReadQueued(
1437+
ctx, rd, statusCmd, trimmedCmds, failedCmds,
1438+
); err != nil {
1439+
setCmdsErr(cmds, err)
1440+
1441+
moved, ask, addr := isMovedError(err)
1442+
if moved || ask {
1443+
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
1444+
}
1445+
1446+
return err
1447+
}
1448+
1449+
return pipelineReadCmds(rd, trimmedCmds)
1450+
})
14591451
})
14601452
})
1461-
})
14621453
}
14631454

14641455
func (c *ClusterClient) txPipelineReadQueued(

cluster_test.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -1276,20 +1276,33 @@ var _ = Describe("ClusterClient timeout", func() {
12761276
Context("read/write timeout", func() {
12771277
BeforeEach(func() {
12781278
opt := redisClusterOptions()
1279-
opt.ReadTimeout = 250 * time.Millisecond
1280-
opt.WriteTimeout = 250 * time.Millisecond
1281-
opt.MaxRedirects = 1
12821279
client = cluster.newClusterClient(ctx, opt)
12831280

12841281
err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
1285-
return client.ClientPause(ctx, pause).Err()
1282+
err := client.ClientPause(ctx, pause).Err()
1283+
1284+
opt := client.Options()
1285+
opt.ReadTimeout = time.Nanosecond
1286+
opt.WriteTimeout = time.Nanosecond
1287+
1288+
return err
12861289
})
12871290
Expect(err).NotTo(HaveOccurred())
1291+
1292+
// Overwrite timeouts after the client is initialized.
1293+
opt.ReadTimeout = time.Nanosecond
1294+
opt.WriteTimeout = time.Nanosecond
1295+
opt.MaxRedirects = 0
12881296
})
12891297

12901298
AfterEach(func() {
12911299
_ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
12921300
defer GinkgoRecover()
1301+
1302+
opt := client.Options()
1303+
opt.ReadTimeout = time.Second
1304+
opt.WriteTimeout = time.Second
1305+
12931306
Eventually(func() error {
12941307
return client.Ping(ctx).Err()
12951308
}, 2*pause).ShouldNot(HaveOccurred())

race_test.go

-21
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package redis_test
22

33
import (
44
"bytes"
5-
"context"
65
"fmt"
76
"net"
87
"strconv"
@@ -289,26 +288,6 @@ var _ = Describe("races", func() {
289288
wg.Wait()
290289
Expect(atomic.LoadUint32(&received)).To(Equal(uint32(C * N)))
291290
})
292-
293-
It("should abort on context timeout", func() {
294-
opt := redisClusterOptions()
295-
client := cluster.newClusterClient(ctx, opt)
296-
297-
ctx, cancel := context.WithCancel(context.Background())
298-
299-
wg := performAsync(C, func(_ int) {
300-
_, err := client.XRead(ctx, &redis.XReadArgs{
301-
Streams: []string{"test", "$"},
302-
Block: 1 * time.Second,
303-
}).Result()
304-
Expect(err).To(HaveOccurred())
305-
Expect(err.Error()).To(Or(Equal(context.Canceled.Error()), ContainSubstring("operation was canceled")))
306-
})
307-
308-
time.Sleep(10 * time.Millisecond)
309-
cancel()
310-
wg.Wait()
311-
})
312291
})
313292

314293
var _ = Describe("cluster races", func() {

redis.go

+28-40
Original file line numberDiff line numberDiff line change
@@ -290,27 +290,7 @@ func (c *baseClient) withConn(
290290
c.releaseConn(ctx, cn, err)
291291
}()
292292

293-
done := ctx.Done() //nolint:ifshort
294-
295-
if done == nil {
296-
err = fn(ctx, cn)
297-
return err
298-
}
299-
300-
errc := make(chan error, 1)
301-
go func() { errc <- fn(ctx, cn) }()
302-
303-
select {
304-
case <-done:
305-
_ = cn.Close()
306-
// Wait for the goroutine to finish and send something.
307-
<-errc
308-
309-
err = ctx.Err()
310-
return err
311-
case err = <-errc:
312-
return err
313-
}
293+
return fn(ctx, cn)
314294
}
315295

316296
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
@@ -416,7 +396,6 @@ func (c *baseClient) generalProcessPipeline(
416396
) error {
417397
err := c._generalProcessPipeline(ctx, cmds, p)
418398
if err != nil {
419-
setCmdsErr(cmds, err)
420399
return err
421400
}
422401
return cmdsFirstErr(cmds)
@@ -429,6 +408,7 @@ func (c *baseClient) _generalProcessPipeline(
429408
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
430409
if attempt > 0 {
431410
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
411+
setCmdsErr(cmds, err)
432412
return err
433413
}
434414
}
@@ -449,53 +429,61 @@ func (c *baseClient) _generalProcessPipeline(
449429
func (c *baseClient) pipelineProcessCmds(
450430
ctx context.Context, cn *pool.Conn, cmds []Cmder,
451431
) (bool, error) {
452-
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
432+
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
453433
return writeCmds(wr, cmds)
454-
})
455-
if err != nil {
434+
}); err != nil {
435+
setCmdsErr(cmds, err)
456436
return true, err
457437
}
458438

459-
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
439+
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
460440
return pipelineReadCmds(rd, cmds)
461-
})
462-
return true, err
441+
}); err != nil {
442+
return true, err
443+
}
444+
445+
return false, nil
463446
}
464447

465448
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
466-
for _, cmd := range cmds {
449+
for i, cmd := range cmds {
467450
err := cmd.readReply(rd)
468451
cmd.SetErr(err)
469452
if err != nil && !isRedisError(err) {
453+
setCmdsErr(cmds[i+1:], err)
470454
return err
471455
}
472456
}
473-
return nil
457+
// Retry errors like "LOADING redis is loading the dataset in memory".
458+
return cmds[0].Err()
474459
}
475460

476461
func (c *baseClient) txPipelineProcessCmds(
477462
ctx context.Context, cn *pool.Conn, cmds []Cmder,
478463
) (bool, error) {
479-
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
464+
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
480465
return writeCmds(wr, cmds)
481-
})
482-
if err != nil {
466+
}); err != nil {
467+
setCmdsErr(cmds, err)
483468
return true, err
484469
}
485470

486-
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
471+
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
487472
statusCmd := cmds[0].(*StatusCmd)
488473
// Trim multi and exec.
489-
cmds = cmds[1 : len(cmds)-1]
474+
trimmedCmds := cmds[1 : len(cmds)-1]
490475

491-
err := txPipelineReadQueued(rd, statusCmd, cmds)
492-
if err != nil {
476+
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil {
477+
setCmdsErr(cmds, err)
493478
return err
494479
}
495480

496-
return pipelineReadCmds(rd, cmds)
497-
})
498-
return false, err
481+
return pipelineReadCmds(rd, trimmedCmds)
482+
}); err != nil {
483+
return false, err
484+
}
485+
486+
return false, nil
499487
}
500488

501489
func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {

0 commit comments

Comments
 (0)