@@ -143,7 +143,8 @@ internal final class ConnectionPool {
143
143
}
144
144
145
145
/// Deactivates this connection pool. Once this is called, no further connections can be obtained
146
- /// from the pool. Leased connections are not deactivated and can continue to be used.
146
+ /// from the pool. Leased connections are not deactivated and can continue to be used. All waiters
147
+ /// are failed with a pool is closed error.
147
148
func close( promise: EventLoopPromise < Void > ? = nil , logger: Logger ) {
148
149
if self . loop. inEventLoop {
149
150
self . closePool ( promise: promise, logger: logger)
@@ -228,7 +229,7 @@ extension ConnectionPool {
228
229
switch self . state {
229
230
case . closing:
230
231
// We don't want this anymore, drop it.
231
- _ = connection . close ( )
232
+ self . closeConnectionForShutdown ( connection )
232
233
case . closed:
233
234
// This is programmer error, we shouldn't have entered this state.
234
235
logger. critical ( " new connection created on a closed pool " , metadata: [
@@ -250,10 +251,21 @@ extension ConnectionPool {
250
251
RedisLogging . MetadataKeys. error: " \( error) "
251
252
] )
252
253
253
- guard case . active = self . state else {
254
- // No point continuing connection creation if we're not active.
255
- logger. trace ( " not creating new connections due to inactivity " )
254
+ switch self . state {
255
+ case . active:
256
+ break // continue further down
257
+
258
+ case . closing( let remaining, let promise) :
259
+ if remaining == 1 {
260
+ self . state = . closed
261
+ promise? . succeed ( )
262
+ } else {
263
+ self . state = . closing( remaining: remaining - 1 , promise)
264
+ }
256
265
return
266
+
267
+ case . closed:
268
+ preconditionFailure ( " Invalid state: \( self . state) " )
257
269
}
258
270
259
271
// Ok, we're still active. Before we do anything, we want to check whether anyone is still waiting
@@ -315,39 +327,42 @@ extension ConnectionPool {
315
327
private func closePool( promise: EventLoopPromise < Void > ? , logger: Logger ) {
316
328
self . loop. preconditionInEventLoop ( )
317
329
318
- // Pool closure must be monotonic.
319
- guard case . active = self . state else {
320
- logger. info ( " received duplicate request to close connection pool " )
321
- promise? . succeed ( ( ) )
322
- return
323
- }
330
+ switch self . state {
331
+ case . active:
332
+ self . state = . closing( remaining: self . activeConnectionCount, promise)
324
333
325
- self . state = . closing
334
+ case . closing( let count, let existingPromise) :
335
+ if let existingPromise = existingPromise {
336
+ existingPromise. futureResult. cascade ( to: promise)
337
+ } else {
338
+ self . state = . closing( remaining: count, promise)
339
+ }
340
+ return
326
341
327
- // To close the pool we need to drop all active connections.
328
- let connections = self . availableConnections
329
- self . availableConnections = [ ]
330
- let closeFutures = connections . map { $0 . close ( ) }
342
+ case . closed :
343
+ promise ? . succeed ( )
344
+ return
345
+ }
331
346
332
347
// We also cancel all pending leases.
333
348
while let pendingLease = self . connectionWaiters. popFirst ( ) {
334
349
pendingLease. fail ( RedisConnectionPoolError . poolClosed)
335
350
}
336
351
337
- guard self . activeConnectionCount == 0 else {
338
- logger . debug ( " not closing pool, waiting for all connections to be returned " , metadata : [
339
- RedisLogging . MetadataKeys . poolConnectionCount : " \( self . activeConnectionCount ) "
340
- ] )
341
- promise? . fail ( RedisConnectionPoolError . poolHasActiveConnections )
352
+ if self . activeConnectionCount == 0 {
353
+ // That was all the connections, so this is now closed.
354
+ logger . trace ( " pool is now closed " )
355
+ self . state = . closed
356
+ promise? . succeed ( )
342
357
return
343
358
}
344
359
345
- // That was all the connections, so this is now closed .
346
- logger . trace ( " pool is now closed " )
347
- self . state = . closed
348
- EventLoopFuture < Void >
349
- . andAllSucceed ( closeFutures , on : self . loop )
350
- . cascade ( to : promise )
360
+ // To close the pool we need to drop all active connections .
361
+ let connections = self . availableConnections
362
+ self . availableConnections = [ ]
363
+ for connection in connections {
364
+ self . closeConnectionForShutdown ( connection )
365
+ }
351
366
}
352
367
353
368
/// This is the on-thread implementation for leasing connections out to users. Here we work out how to get a new
@@ -401,51 +416,66 @@ extension ConnectionPool {
401
416
private func _returnLeasedConnection( _ connection: RedisConnection , logger: Logger ) {
402
417
self . loop. assertInEventLoop ( )
403
418
self . leasedConnectionCount -= 1
404
- self . _returnConnection ( connection, logger: logger)
419
+
420
+ switch self . state {
421
+ case . active:
422
+ self . _returnConnection ( connection, logger: logger)
423
+
424
+ case . closing:
425
+ return self . closeConnectionForShutdown ( connection)
426
+
427
+ case . closed:
428
+ preconditionFailure ( " Invalid state: \( self . state) " )
429
+ }
405
430
}
406
431
407
432
/// This is the on-thread implementation for returning connections to the pool. Here we work out what to do with a newly-acquired
408
433
/// connection.
409
434
private func _returnConnection( _ connection: RedisConnection , logger: Logger ) {
410
435
self . loop. assertInEventLoop ( )
436
+ precondition ( self . state. isActive)
411
437
412
438
guard connection. isConnected else {
413
439
// This connection isn't active anymore. We'll dump it and potentially kick off a reconnection.
414
440
self . refillConnections ( logger: logger)
415
441
return
416
442
}
417
443
418
- switch self . state {
419
- case . active:
420
- // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room
421
- // in the pool, we'll put it there. Otherwise, we'll close it.
422
- if let waiter = self . connectionWaiters. popFirst ( ) {
423
- self . leaseConnection ( connection, to: waiter)
424
- } else if self . canAddConnectionToPool {
425
- self . availableConnections. append ( connection)
426
- } else if let evictable = self . availableConnections. popFirst ( ) {
427
- // We have at least one pooled connection. The returned is more recently active, so kick out the pooled
428
- // connection in favour of this one and close the recently evicted one.
429
- self . availableConnections. append ( connection)
430
- _ = evictable. close ( )
431
- } else {
432
- // We don't need it, close it.
433
- _ = connection. close ( )
434
- }
435
- case . closed:
436
- // In general we shouldn't see leased connections return in .closed, as we should only be able to
437
- // transition to .closed when all the leases are back. We tolerate this in production builds by just closing the
438
- // connection, but in debug builds we assert to be sure.
439
- logger. warning ( " connection returned to closed pool " , metadata: [
440
- RedisLogging . MetadataKeys. connectionID: " \( connection. id) "
441
- ] )
442
- assertionFailure ( " Returned connection to closed pool " )
443
- fallthrough
444
- case . closing:
445
- // We don't need this connection, close it.
444
+ // If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room
445
+ // in the pool, we'll put it there. Otherwise, we'll close it.
446
+ if let waiter = self . connectionWaiters. popFirst ( ) {
447
+ self . leaseConnection ( connection, to: waiter)
448
+ } else if self . canAddConnectionToPool {
449
+ self . availableConnections. append ( connection)
450
+ } else if let evictable = self . availableConnections. popFirst ( ) {
451
+ // We have at least one pooled connection. The returned is more recently active, so kick out the pooled
452
+ // connection in favour of this one and close the recently evicted one.
453
+ self . availableConnections. append ( connection)
454
+ _ = evictable. close ( )
455
+ } else {
456
+ // We don't need it, close it.
446
457
_ = connection. close ( )
447
- guard self . leasedConnectionCount == 0 else { return }
448
- self . state = . closed
458
+ }
459
+ }
460
+
461
+ private func closeConnectionForShutdown( _ connection: RedisConnection ) {
462
+ connection. close ( ) . whenComplete { _ in
463
+ self . loop. preconditionInEventLoop ( )
464
+
465
+ switch self . state {
466
+ case . closing( let remaining, let promise) :
467
+ if remaining == 1 {
468
+ self . state = . closed
469
+ promise? . succeed ( )
470
+ } else {
471
+ self . state = . closing( remaining: remaining - 1 , promise)
472
+ }
473
+
474
+ case . closed, . active:
475
+ // The state must not change if we are closing a connection, while we are
476
+ // closing the pool.
477
+ preconditionFailure ( " Invalid state: \( self . state) " )
478
+ }
449
479
}
450
480
}
451
481
}
@@ -457,10 +487,19 @@ extension ConnectionPool {
457
487
458
488
/// The user has requested the connection pool to close, but there are still active connections leased to users
459
489
/// and in the pool.
460
- case closing
490
+ case closing( remaining : Int , EventLoopPromise < Void > ? )
461
491
462
492
/// The connection pool is closed: no connections are outstanding
463
493
case closed
494
+
495
+ var isActive : Bool {
496
+ switch self {
497
+ case . active:
498
+ return true
499
+ case . closing, . closed:
500
+ return false
501
+ }
502
+ }
464
503
}
465
504
}
466
505
0 commit comments