@@ -62,7 +62,8 @@ type Pipeline struct {
62
62
closeCh chan struct {}
63
63
closed int64
64
64
65
- err error
65
+ // protoErr is set to non-nil if there is an unrecoverable protocol error.
66
+ protoErr error
66
67
67
68
conn * conn
68
69
client * Client
@@ -77,10 +78,10 @@ type Pipeline struct {
77
78
func (r * Pipeline ) Error () string {
78
79
r .mu .Lock ()
79
80
defer r .mu .Unlock ()
80
- if r .err == nil {
81
+ if r .protoErr == nil {
81
82
return ""
82
83
}
83
- return r .err .Error ()
84
+ return r .protoErr .Error ()
84
85
}
85
86
86
87
// readUntilNewline reads until a newline, returning the bytes without the newline.
@@ -110,6 +111,10 @@ var (
110
111
_ grower = (* strings .Builder )(nil )
111
112
)
112
113
114
+ // ErrNil is a nil value. For example, it is returned for missing keys in
115
+ // GET and MGET.
116
+ var ErrNil = errors .New ("(nil)" )
117
+
113
118
func readBulkString (w io.Writer , rd * bufio.Reader , copyBuf []byte ) (int , error ) {
114
119
newlineBuf , err := readUntilNewline (rd , copyBuf )
115
120
if err != nil {
@@ -123,7 +128,7 @@ func readBulkString(w io.Writer, rd *bufio.Reader, copyBuf []byte) (int, error)
123
128
124
129
// n == -1 signals a nil value.
125
130
if stringSize <= 0 {
126
- return 0 , nil
131
+ return 0 , ErrNil
127
132
}
128
133
129
134
if g , ok := w .(grower ); ok {
@@ -231,7 +236,7 @@ func (r *Pipeline) Strings() ([]string, error) {
231
236
var ss []string
232
237
for i := 0 ; i < ln ; i ++ {
233
238
s , err := r .String ()
234
- if err != nil {
239
+ if err != nil && ! errors . Is ( err , ErrNil ) {
235
240
return ss , fmt .Errorf ("read string %d: %w" , i , err )
236
241
}
237
242
ss = append (ss , s )
@@ -258,25 +263,25 @@ func (r *Pipeline) writeTo(w io.Writer) (int64, replyType, error) {
258
263
return 0 , 0 , err
259
264
}
260
265
261
- if r .err != nil {
262
- return 0 , 0 , r .err
266
+ if r .protoErr != nil {
267
+ return 0 , 0 , r .protoErr
263
268
}
264
269
265
270
if r .pipeline .at == r .pipeline .end && len (r .arrayStack ) == 0 && ! r .subscribeMode {
266
271
return 0 , 0 , fmt .Errorf ("no more results" )
267
272
}
268
273
269
- r .err = r .conn .wr .Flush ()
270
- if r .err != nil {
271
- r .err = fmt .Errorf ("flush: %w" , r .err )
272
- return 0 , 0 , r .err
274
+ r .protoErr = r .conn .wr .Flush ()
275
+ if r .protoErr != nil {
276
+ r .protoErr = fmt .Errorf ("flush: %w" , r .protoErr )
277
+ return 0 , 0 , r .protoErr
273
278
}
274
279
275
280
var typByte byte
276
- typByte , r .err = r .conn .rd .ReadByte ()
277
- if r .err != nil {
278
- r .err = fmt .Errorf ("read type: %w" , r .err )
279
- return 0 , 0 , r .err
281
+ typByte , r .protoErr = r .conn .rd .ReadByte ()
282
+ if r .protoErr != nil {
283
+ r .protoErr = fmt .Errorf ("read type: %w" , r .protoErr )
284
+ return 0 , 0 , r .protoErr
280
285
}
281
286
typ := replyType (typByte )
282
287
@@ -306,15 +311,15 @@ func (r *Pipeline) writeTo(w io.Writer) (int64, replyType, error) {
306
311
switch typ {
307
312
case replyTypeSimpleString , replyTypeInteger , replyTypeArray :
308
313
// Simple string or integer
309
- s , r .err = readUntilNewline (r .conn .rd , r .conn .miscBuf )
310
- if r .err != nil {
311
- return 0 , typ , r .err
314
+ s , r .protoErr = readUntilNewline (r .conn .rd , r .conn .miscBuf )
315
+ if r .protoErr != nil {
316
+ return 0 , typ , r .protoErr
312
317
}
313
318
314
319
isNewArray := typ == '*'
315
320
316
321
var n int
317
- n , r .err = w .Write (s )
322
+ n , r .protoErr = w .Write (s )
318
323
incrRead (isNewArray )
319
324
var newArraySize int
320
325
if isNewArray {
@@ -328,26 +333,33 @@ func (r *Pipeline) writeTo(w io.Writer) (int64, replyType, error) {
328
333
r .arrayStack = append (r .arrayStack , newArraySize )
329
334
}
330
335
}
331
- return int64 (n ), typ , r .err
336
+ return int64 (n ), typ , r .protoErr
332
337
case replyTypeBulkString :
333
338
// Bulk string
334
- var n int
335
- n , r .err = readBulkString (w , r .conn .rd , r .conn .miscBuf )
339
+ var (
340
+ n int
341
+ err error
342
+ )
343
+ n , err = readBulkString (w , r .conn .rd , r .conn .miscBuf )
336
344
incrRead (false )
337
- return int64 (n ), typ , r .err
345
+ // A nil is highly recoverable.
346
+ if ! errors .Is (err , ErrNil ) {
347
+ r .protoErr = err
348
+ }
349
+ return int64 (n ), typ , err
338
350
case replyTypeError :
339
351
// Error
340
- s , r .err = readUntilNewline (r .conn .rd , r .conn .miscBuf )
341
- if r .err != nil {
342
- return 0 , typ , r .err
352
+ s , r .protoErr = readUntilNewline (r .conn .rd , r .conn .miscBuf )
353
+ if r .protoErr != nil {
354
+ return 0 , typ , r .protoErr
343
355
}
344
356
incrRead (false )
345
357
return 0 , typ , & Error {
346
358
raw : string (s ),
347
359
}
348
360
default :
349
- r .err = fmt .Errorf ("unknown type %q" , typ )
350
- return 0 , typ , r .err
361
+ r .protoErr = fmt .Errorf ("unknown type %q" , typ )
362
+ return 0 , typ , r .protoErr
351
363
}
352
364
}
353
365
@@ -456,7 +468,7 @@ func (r *Pipeline) Next() bool {
456
468
457
469
// HasMore returns true if there are more results to read.
458
470
func (r * Pipeline ) HasMore () bool {
459
- if r .err != nil {
471
+ if r .protoErr != nil {
460
472
return false
461
473
}
462
474
@@ -495,7 +507,7 @@ func (r *Pipeline) close() error {
495
507
// r.conn is set to nil to prevent accidental reuse.
496
508
r .conn = nil
497
509
// Only return conn when it is in a known good state.
498
- if r .err == nil && ! r .subscribeMode && ! r .HasMore () {
510
+ if r .protoErr == nil && ! r .subscribeMode && ! r .HasMore () {
499
511
r .client .putConn (conn )
500
512
return nil
501
513
}
0 commit comments