@@ -33,19 +33,23 @@ extension HTTPConnectionPool {
33
33
/// The property was introduced to fail fast during testing.
34
34
/// Otherwise this should always be true and not turned off.
35
35
private let retryConnectionEstablishment : Bool
36
+ private let preWarmedConnectionCount : Int
36
37
37
38
init (
38
39
idGenerator: Connection . ID . Generator ,
39
40
maximumConcurrentConnections: Int ,
40
41
retryConnectionEstablishment: Bool ,
41
42
maximumConnectionUses: Int ? ,
43
+ preWarmedHTTP1ConnectionCount: Int ,
42
44
lifecycleState: StateMachine . LifecycleState
43
45
) {
44
46
self . connections = HTTP1Connections (
45
47
maximumConcurrentConnections: maximumConcurrentConnections,
46
48
generator: idGenerator,
47
- maximumConnectionUses: maximumConnectionUses
49
+ maximumConnectionUses: maximumConnectionUses,
50
+ preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount
48
51
)
52
+ self . preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
49
53
self . retryConnectionEstablishment = retryConnectionEstablishment
50
54
51
55
self . requests = RequestQueue ( )
@@ -145,9 +149,25 @@ extension HTTPConnectionPool {
145
149
146
150
private mutating func executeRequestOnPreferredEventLoop( _ request: Request , eventLoop: EventLoop ) -> Action {
147
151
if let connection = self . connections. leaseConnection ( onPreferred: eventLoop) {
152
+ // Cool, a connection is available. If using this would put us below our needed extra set, we
153
+ // should create another.
154
+ let stats = self . connections. generalPurposeStats
155
+ let needExtraConnection = stats. nonLeased < ( self . requests. count + self . preWarmedConnectionCount) && self . connections. canGrow
156
+ let action : StateMachine . ConnectionAction
157
+
158
+ if needExtraConnection {
159
+ action = . createConnectionAndCancelTimeoutTimer(
160
+ createdID: self . connections. createNewConnection ( on: eventLoop) ,
161
+ on: eventLoop,
162
+ cancelTimerID: connection. id
163
+ )
164
+ } else {
165
+ action = . cancelTimeoutTimer( connection. id)
166
+ }
167
+
148
168
return . init(
149
169
request: . executeRequest( request, connection, cancelTimeout: false ) ,
150
- connection: . cancelTimeoutTimer ( connection . id )
170
+ connection: action
151
171
)
152
172
}
153
173
@@ -294,7 +314,20 @@ extension HTTPConnectionPool {
294
314
}
295
315
}
296
316
297
- mutating func connectionIdleTimeout( _ connectionID: Connection . ID ) -> Action {
317
+ mutating func connectionIdleTimeout( _ connectionID: Connection . ID , on eventLoop: any EventLoop ) -> Action {
318
+ // Don't close idle connections if we need pre-warmed connections. Instead, re-arm the idle timer.
319
+ // We still want the idle timers to make sure we eventually fall below the pre-warmed limit.
320
+ if self . preWarmedConnectionCount > 0 {
321
+ let stats = self . connections. generalPurposeStats
322
+ if stats. idle <= self . preWarmedConnectionCount {
323
+ return . init(
324
+ request: . none,
325
+ connection: . scheduleTimeoutTimer( connectionID, on: eventLoop)
326
+ )
327
+ }
328
+ }
329
+
330
+ // Ok, we do actually want the connection count to go down.
298
331
guard let connection = self . connections. closeConnectionIfIdle ( connectionID) else {
299
332
// because of a race this connection (connection close runs against trigger of timeout)
300
333
// was already removed from the state machine.
@@ -410,11 +443,7 @@ extension HTTPConnectionPool {
410
443
case . running:
411
444
// Close the connection if it's expired.
412
445
if context. shouldBeClosed {
413
- let connection = self . connections. closeConnection ( at: index)
414
- return . init(
415
- request: . none,
416
- connection: . closeConnection( connection, isShutdown: . no)
417
- )
446
+ return self . nextActionForToBeClosedIdleConnection ( at: index, context: context)
418
447
} else {
419
448
switch context. use {
420
449
case . generalPurpose:
@@ -446,28 +475,53 @@ extension HTTPConnectionPool {
446
475
at index: Int ,
447
476
context: HTTP1Connections . IdleConnectionContext
448
477
) -> EstablishedAction {
478
+ var requestAction = HTTPConnectionPool . StateMachine. RequestAction. none
479
+ var parkedConnectionDetails : ( HTTPConnectionPool . Connection . ID , any EventLoop ) ? = nil
480
+
449
481
// 1. Check if there are waiting requests in the general purpose queue
450
482
if let request = self . requests. popFirst ( for: nil ) {
451
- return . init(
452
- request: . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true ) ,
453
- connection: . none
454
- )
483
+ requestAction = . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true )
455
484
}
456
485
457
486
// 2. Check if there are waiting requests in the matching eventLoop queue
458
- if let request = self . requests. popFirst ( for: context. eventLoop) {
459
- return . init(
460
- request: . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true ) ,
461
- connection: . none
462
- )
487
+ if case . none = requestAction, let request = self . requests. popFirst ( for: context. eventLoop) {
488
+ requestAction = . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true )
463
489
}
464
490
465
491
// 3. Create a timeout timer to ensure the connection is closed if it is idle for too
466
- // long.
467
- let ( connectionID, eventLoop) = self . connections. parkConnection ( at: index)
492
+ // long, assuming we don't already have a use for it.
493
+ if case . none = requestAction {
494
+ parkedConnectionDetails = self . connections. parkConnection ( at: index)
495
+ }
496
+
497
+ // 4. We may need to create another connection to make sure we have enough pre-warmed ones.
498
+ // We need to do that if we have fewer non-leased connections than we need pre-warmed ones _and_ the pool can grow.
499
+ // Note that in this case we don't need to account for the number of pending requests, as that is 0: step 1
500
+ // confirmed that.
501
+ let connectionAction : EstablishedConnectionAction
502
+
503
+ if self . connections. generalPurposeStats. nonLeased < self . preWarmedConnectionCount && self . connections. canGrow {
504
+ // Re-use the event loop of the connection that just got created.
505
+ if let parkedConnectionDetails {
506
+ let newConnectionID = self . connections. createNewConnection ( on: parkedConnectionDetails. 1 )
507
+ connectionAction = . scheduleTimeoutTimerAndCreateConnection(
508
+ timeoutID: parkedConnectionDetails. 0 ,
509
+ newConnectionID: newConnectionID,
510
+ on: parkedConnectionDetails. 1
511
+ )
512
+ } else {
513
+ let newConnectionID = self . connections. createNewConnection ( on: context. eventLoop)
514
+ connectionAction = . createConnection( connectionID: newConnectionID, on: context. eventLoop)
515
+ }
516
+ } else if let parkedConnectionDetails {
517
+ connectionAction = . scheduleTimeoutTimer( parkedConnectionDetails. 0 , on: parkedConnectionDetails. 1 )
518
+ } else {
519
+ connectionAction = . none
520
+ }
521
+
468
522
return . init(
469
- request: . none ,
470
- connection: . scheduleTimeoutTimer ( connectionID , on : eventLoop )
523
+ request: requestAction ,
524
+ connection: connectionAction
471
525
)
472
526
}
473
527
@@ -495,6 +549,38 @@ extension HTTPConnectionPool {
495
549
)
496
550
}
497
551
552
+ private mutating func nextActionForToBeClosedIdleConnection(
553
+ at index: Int ,
554
+ context: HTTP1Connections . IdleConnectionContext
555
+ ) -> EstablishedAction {
556
+ // Step 1: Tell the connection pool to drop what it knows about this object.
557
+ let connectionToClose = self . connections. closeConnection ( at: index)
558
+
559
+ // Step 2: Check whether we need a connection to replace this one. We do if we have fewer non-leased connections
560
+ // than we requests + minimumPrewarming count _and_ the pool can grow. Note that in many cases the above closure
561
+ // will have made some space, which is just fine.
562
+ let nonLeased = self . connections. generalPurposeStats. nonLeased
563
+ let neededNonLeased = self . requests. generalPurposeCount + self . preWarmedConnectionCount
564
+
565
+ let connectionAction : EstablishedConnectionAction
566
+ if nonLeased < neededNonLeased && self . connections. canGrow {
567
+ // We re-use the EL of the connection we just closed.
568
+ let newConnectionID = self . connections. createNewConnection ( on: connectionToClose. eventLoop)
569
+ connectionAction = . closeConnectionAndCreateConnection(
570
+ closeConnection: connectionToClose,
571
+ isShutdown: . no,
572
+ newConnectionID: newConnectionID,
573
+ on: connectionToClose. eventLoop
574
+ )
575
+ } else {
576
+ connectionAction = . closeConnection( connectionToClose, isShutdown: . no)
577
+ }
578
+ return . init(
579
+ request: . none,
580
+ connection: connectionAction
581
+ )
582
+ }
583
+
498
584
// MARK: Failed/Closed connection management
499
585
500
586
private mutating func nextActionForFailedConnection(
@@ -530,7 +616,8 @@ extension HTTPConnectionPool {
530
616
at index: Int ,
531
617
context: HTTP1Connections . FailedConnectionContext
532
618
) -> Action {
533
- if context. connectionsStartingForUseCase < self . requests. generalPurposeCount {
619
+ let needConnectionForRequest = context. connectionsStartingForUseCase < ( self . requests. generalPurposeCount + self . preWarmedConnectionCount)
620
+ if needConnectionForRequest {
534
621
// if we have more requests queued up, than we have starting connections, we should
535
622
// create a new connection
536
623
let ( newConnectionID, newEventLoop) = self . connections. replaceConnection ( at: index)
0 commit comments