diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 251224ac0..676df915a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -79,7 +79,8 @@ final class HTTPConnectionPool: .concurrentHTTP1ConnectionsPerHostSoftLimit, retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment, preferHTTP1: clientConfiguration.httpVersion == .http1Only, - maximumConnectionUses: clientConfiguration.maximumUsesPerConnection + maximumConnectionUses: clientConfiguration.maximumUsesPerConnection, + preWarmedHTTP1ConnectionCount: clientConfiguration.connectionPool.preWarmedHTTP1ConnectionCount ) } @@ -104,6 +105,11 @@ final class HTTPConnectionPool: enum Unlocked { case createConnection(Connection.ID, on: EventLoop) case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown) + case closeConnectionAndCreateConnection( + close: Connection, + newConnectionID: Connection.ID, + on: EventLoop + ) case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown) case migration( createConnections: [(Connection.ID, EventLoop)], @@ -185,12 +191,27 @@ final class HTTPConnectionPool: self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop) case .scheduleTimeoutTimer(let connectionID, on: let eventLoop): self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop) + case .scheduleTimeoutTimerAndCreateConnection(let timeoutID, let newConnectionID, let eventLoop): + self.locked.connection = .scheduleTimeoutTimer(timeoutID, on: eventLoop) + self.unlocked.connection = .createConnection(newConnectionID, on: eventLoop) case .cancelTimeoutTimer(let connectionID): self.locked.connection = .cancelTimeoutTimer(connectionID) + case .createConnectionAndCancelTimeoutTimer(let createdID, on: let eventLoop, cancelTimerID: let cancelID): + self.unlocked.connection = .createConnection(createdID, on: eventLoop) + self.locked.connection = .cancelTimeoutTimer(cancelID) case .closeConnection(let connection, let isShutdown): self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown) + case .closeConnectionAndCreateConnection( + let closeConnection, + let newConnectionID, + let eventLoop + ): + self.unlocked.connection = .closeConnectionAndCreateConnection( + close: closeConnection, + newConnectionID: newConnectionID, + on: eventLoop + ) case .cleanupConnections(var cleanupContext, let isShutdown): - // self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff) cleanupContext.connectBackoff = [] self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown) @@ -287,6 +308,23 @@ final class HTTPConnectionPool: self.delegate.connectionPoolDidShutdown(self, unclean: unclean) } + case .closeConnectionAndCreateConnection( + let connectionToClose, + let newConnectionID, + let eventLoop + ): + self.logger.trace( + "closing and creating connection", + metadata: [ + "ahc-connection-id": "\(connectionToClose.id)" + ] + ) + + self.createConnection(newConnectionID, on: eventLoop) + + // we are not interested in the close promise... + connectionToClose.close(promise: nil) + case .cleanupConnections(let cleanupContext, let isShutdown): for connection in cleanupContext.close { connection.close(promise: nil) @@ -400,7 +438,7 @@ final class HTTPConnectionPool: self.modifyStateAndRunActions { stateMachine in if self._idleTimer.removeValue(forKey: connectionID) != nil { // The timer still exists. State Machines assumes it is alive - return stateMachine.connectionIdleTimeout(connectionID) + return stateMachine.connectionIdleTimeout(connectionID, on: eventLoop) } return .none } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index 15138a141..87dcc03b6 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -262,32 +262,30 @@ extension HTTPConnectionPool { private var overflowIndex: Array.Index /// The number of times each connection can be used before it is closed and replaced. private let maximumConnectionUses: Int? - - init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator, maximumConnectionUses: Int?) { + /// How many pre-warmed connections we should create. + private let preWarmedConnectionCount: Int + + init( + maximumConcurrentConnections: Int, + generator: Connection.ID.Generator, + maximumConnectionUses: Int?, + preWarmedHTTP1ConnectionCount: Int + ) { self.connections = [] self.connections.reserveCapacity(min(maximumConcurrentConnections, 1024)) self.overflowIndex = self.connections.endIndex self.maximumConcurrentConnections = maximumConcurrentConnections self.generator = generator self.maximumConnectionUses = maximumConnectionUses + self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount } var stats: Stats { - var stats = Stats() - // all additions here can be unchecked, since we will have at max self.connections.count - // which itself is an Int. For this reason we will never overflow. - for connectionState in self.connections { - if connectionState.isConnecting { - stats.connecting &+= 1 - } else if connectionState.isBackingOff { - stats.backingOff &+= 1 - } else if connectionState.isLeased { - stats.leased &+= 1 - } else if connectionState.isIdle { - stats.idle &+= 1 - } - } - return stats + self.connectionStats(in: self.connections.startIndex..) -> Stats { + var stats = Stats() + // all additions here can be unchecked, since we will have at max self.connections.count + // which itself is an Int. For this reason we will never overflow. + for connectionState in self.connections[range] { + if connectionState.isConnecting { + stats.connecting &+= 1 + } else if connectionState.isBackingOff { + stats.backingOff &+= 1 + } else if connectionState.isLeased { + stats.leased &+= 1 + } else if connectionState.isIdle { + stats.idle &+= 1 + } + } + return stats + } + // MARK: - Mutations - /// A connection's use. Did it serve in the pool or was it specialized for an `EventLoop`? @@ -836,6 +852,10 @@ extension HTTPConnectionPool { var leased: Int = 0 var connecting: Int = 0 var backingOff: Int = 0 + + var nonLeased: Int { + self.idle + self.connecting + self.backingOff + } } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 09b1dc85e..34c8027e9 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -33,19 +33,23 @@ extension HTTPConnectionPool { /// The property was introduced to fail fast during testing. /// Otherwise this should always be true and not turned off. private let retryConnectionEstablishment: Bool + private let preWarmedConnectionCount: Int init( idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int, retryConnectionEstablishment: Bool, maximumConnectionUses: Int?, + preWarmedHTTP1ConnectionCount: Int, lifecycleState: StateMachine.LifecycleState ) { self.connections = HTTP1Connections( maximumConcurrentConnections: maximumConcurrentConnections, generator: idGenerator, - maximumConnectionUses: maximumConnectionUses + maximumConnectionUses: maximumConnectionUses, + preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount ) + self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount self.retryConnectionEstablishment = retryConnectionEstablishment self.requests = RequestQueue() @@ -145,9 +149,26 @@ extension HTTPConnectionPool { private mutating func executeRequestOnPreferredEventLoop(_ request: Request, eventLoop: EventLoop) -> Action { if let connection = self.connections.leaseConnection(onPreferred: eventLoop) { + // Cool, a connection is available. If using this would put us below our needed extra set, we + // should create another. + let stats = self.connections.generalPurposeStats + let needExtraConnection = + stats.nonLeased < (self.requests.count + self.preWarmedConnectionCount) && self.connections.canGrow + let action: StateMachine.ConnectionAction + + if needExtraConnection { + action = .createConnectionAndCancelTimeoutTimer( + createdID: self.connections.createNewConnection(on: eventLoop), + on: eventLoop, + cancelTimerID: connection.id + ) + } else { + action = .cancelTimeoutTimer(connection.id) + } + return .init( request: .executeRequest(request, connection, cancelTimeout: false), - connection: .cancelTimeoutTimer(connection.id) + connection: action ) } @@ -294,7 +315,20 @@ extension HTTPConnectionPool { } } - mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + mutating func connectionIdleTimeout(_ connectionID: Connection.ID, on eventLoop: any EventLoop) -> Action { + // Don't close idle connections if we need pre-warmed connections. Instead, re-arm the idle timer. + // We still want the idle timers to make sure we eventually fall below the pre-warmed limit. + if self.preWarmedConnectionCount > 0 { + let stats = self.connections.generalPurposeStats + if stats.idle <= self.preWarmedConnectionCount { + return .init( + request: .none, + connection: .scheduleTimeoutTimer(connectionID, on: eventLoop) + ) + } + } + + // Ok, we do actually want the connection count to go down. guard let connection = self.connections.closeConnectionIfIdle(connectionID) else { // because of a race this connection (connection close runs against trigger of timeout) // was already removed from the state machine. @@ -410,11 +444,7 @@ extension HTTPConnectionPool { case .running: // Close the connection if it's expired. if context.shouldBeClosed { - let connection = self.connections.closeConnection(at: index) - return .init( - request: .none, - connection: .closeConnection(connection, isShutdown: .no) - ) + return self.nextActionForToBeClosedIdleConnection(at: index, context: context) } else { switch context.use { case .generalPurpose: @@ -446,28 +476,63 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.IdleConnectionContext ) -> EstablishedAction { + var requestAction = HTTPConnectionPool.StateMachine.RequestAction.none + var parkedConnectionDetails: (HTTPConnectionPool.Connection.ID, any EventLoop)? = nil + // 1. Check if there are waiting requests in the general purpose queue if let request = self.requests.popFirst(for: nil) { - return .init( - request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true), - connection: .none + requestAction = .executeRequest( + request, + self.connections.leaseConnection(at: index), + cancelTimeout: true ) } // 2. Check if there are waiting requests in the matching eventLoop queue - if let request = self.requests.popFirst(for: context.eventLoop) { - return .init( - request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true), - connection: .none + if case .none = requestAction, let request = self.requests.popFirst(for: context.eventLoop) { + requestAction = .executeRequest( + request, + self.connections.leaseConnection(at: index), + cancelTimeout: true ) } // 3. Create a timeout timer to ensure the connection is closed if it is idle for too - // long. - let (connectionID, eventLoop) = self.connections.parkConnection(at: index) + // long, assuming we don't already have a use for it. + if case .none = requestAction { + parkedConnectionDetails = self.connections.parkConnection(at: index) + } + + // 4. We may need to create another connection to make sure we have enough pre-warmed ones. + // We need to do that if we have fewer non-leased connections than we need pre-warmed ones _and_ the pool can grow. + // Note that in this case we don't need to account for the number of pending requests, as that is 0: step 1 + // confirmed that. + let connectionAction: EstablishedConnectionAction + + if self.connections.generalPurposeStats.nonLeased < self.preWarmedConnectionCount + && self.connections.canGrow + { + // Re-use the event loop of the connection that just got created. + if let parkedConnectionDetails { + let newConnectionID = self.connections.createNewConnection(on: parkedConnectionDetails.1) + connectionAction = .scheduleTimeoutTimerAndCreateConnection( + timeoutID: parkedConnectionDetails.0, + newConnectionID: newConnectionID, + on: parkedConnectionDetails.1 + ) + } else { + let newConnectionID = self.connections.createNewConnection(on: context.eventLoop) + connectionAction = .createConnection(connectionID: newConnectionID, on: context.eventLoop) + } + } else if let parkedConnectionDetails { + connectionAction = .scheduleTimeoutTimer(parkedConnectionDetails.0, on: parkedConnectionDetails.1) + } else { + connectionAction = .none + } + return .init( - request: .none, - connection: .scheduleTimeoutTimer(connectionID, on: eventLoop) + request: requestAction, + connection: connectionAction ) } @@ -495,6 +560,37 @@ extension HTTPConnectionPool { ) } + private mutating func nextActionForToBeClosedIdleConnection( + at index: Int, + context: HTTP1Connections.IdleConnectionContext + ) -> EstablishedAction { + // Step 1: Tell the connection pool to drop what it knows about this object. + let connectionToClose = self.connections.closeConnection(at: index) + + // Step 2: Check whether we need a connection to replace this one. We do if we have fewer non-leased connections + // than we requests + minimumPrewarming count _and_ the pool can grow. Note that in many cases the above closure + // will have made some space, which is just fine. + let nonLeased = self.connections.generalPurposeStats.nonLeased + let neededNonLeased = self.requests.generalPurposeCount + self.preWarmedConnectionCount + + let connectionAction: EstablishedConnectionAction + if nonLeased < neededNonLeased && self.connections.canGrow { + // We re-use the EL of the connection we just closed. + let newConnectionID = self.connections.createNewConnection(on: connectionToClose.eventLoop) + connectionAction = .closeConnectionAndCreateConnection( + closeConnection: connectionToClose, + newConnectionID: newConnectionID, + on: connectionToClose.eventLoop + ) + } else { + connectionAction = .closeConnection(connectionToClose, isShutdown: .no) + } + return .init( + request: .none, + connection: connectionAction + ) + } + // MARK: Failed/Closed connection management private mutating func nextActionForFailedConnection( @@ -530,7 +626,10 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.FailedConnectionContext ) -> Action { - if context.connectionsStartingForUseCase < self.requests.generalPurposeCount { + let needConnectionForRequest = + context.connectionsStartingForUseCase + < (self.requests.generalPurposeCount + self.preWarmedConnectionCount) + if needConnectionForRequest { // if we have more requests queued up, than we have starting connections, we should // create a new connection let (newConnectionID, newEventLoop) = self.connections.replaceConnection(at: index) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index 0cc02cf0f..b905e26bd 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -42,9 +42,24 @@ extension HTTPConnectionPool { case scheduleTimeoutTimer(Connection.ID, on: EventLoop) case cancelTimeoutTimer(Connection.ID) + case createConnectionAndCancelTimeoutTimer( + createdID: Connection.ID, + on: EventLoop, + cancelTimerID: Connection.ID + ) + case scheduleTimeoutTimerAndCreateConnection( + timeoutID: Connection.ID, + newConnectionID: Connection.ID, + on: EventLoop + ) case closeConnection(Connection, isShutdown: IsShutdown) case cleanupConnections(CleanupContext, isShutdown: IsShutdown) + case closeConnectionAndCreateConnection( + closeConnection: Connection, + newConnectionID: Connection.ID, + on: EventLoop + ) case migration( createConnections: [(Connection.ID, EventLoop)], @@ -102,18 +117,21 @@ extension HTTPConnectionPool { /// Otherwise this should always be true and not turned off. private let retryConnectionEstablishment: Bool let maximumConnectionUses: Int? + let preWarmedHTTP1ConnectionCount: Int init( idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int, retryConnectionEstablishment: Bool, preferHTTP1: Bool, - maximumConnectionUses: Int? + maximumConnectionUses: Int?, + preWarmedHTTP1ConnectionCount: Int ) { self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections self.retryConnectionEstablishment = retryConnectionEstablishment self.idGenerator = idGenerator self.maximumConnectionUses = maximumConnectionUses + self.preWarmedHTTP1ConnectionCount = preWarmedHTTP1ConnectionCount if preferHTTP1 { let http1State = HTTP1StateMachine( @@ -121,6 +139,7 @@ extension HTTPConnectionPool { maximumConcurrentConnections: maximumConcurrentHTTP1Connections, retryConnectionEstablishment: retryConnectionEstablishment, maximumConnectionUses: maximumConnectionUses, + preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount, lifecycleState: .running ) self.state = .http1(http1State) @@ -159,6 +178,7 @@ extension HTTPConnectionPool { maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections, retryConnectionEstablishment: self.retryConnectionEstablishment, maximumConnectionUses: self.maximumConnectionUses, + preWarmedHTTP1ConnectionCount: self.preWarmedHTTP1ConnectionCount, lifecycleState: http2StateMachine.lifecycleState ) @@ -314,10 +334,10 @@ extension HTTPConnectionPool { ) } - mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + mutating func connectionIdleTimeout(_ connectionID: Connection.ID, on eventLoop: any EventLoop) -> Action { self.state.modify( http1: { http1 in - http1.connectionIdleTimeout(connectionID) + http1.connectionIdleTimeout(connectionID, on: eventLoop) }, http2: { http2 in http2.connectionIdleTimeout(connectionID) @@ -413,6 +433,20 @@ extension HTTPConnectionPool.StateMachine { HTTPConnectionPool.Connection, isShutdown: HTTPConnectionPool.StateMachine.ConnectionAction.IsShutdown ) + case scheduleTimeoutTimerAndCreateConnection( + timeoutID: HTTPConnectionPool.Connection.ID, + newConnectionID: HTTPConnectionPool.Connection.ID, + on: EventLoop + ) + case closeConnectionAndCreateConnection( + closeConnection: HTTPConnectionPool.Connection, + newConnectionID: HTTPConnectionPool.Connection.ID, + on: EventLoop + ) + case createConnection( + connectionID: HTTPConnectionPool.Connection.ID, + on: EventLoop + ) } } @@ -434,6 +468,24 @@ extension HTTPConnectionPool.StateMachine.ConnectionAction { self = .scheduleTimeoutTimer(connectionID, on: eventLoop) case .closeConnection(let connection, let isShutdown): self = .closeConnection(connection, isShutdown: isShutdown) + case .closeConnectionAndCreateConnection( + let closeConnection, + let newConnectionID, + let eventLoop + ): + self = .closeConnectionAndCreateConnection( + closeConnection: closeConnection, + newConnectionID: newConnectionID, + on: eventLoop + ) + case .scheduleTimeoutTimerAndCreateConnection(let timeoutID, let newConnectionID, let eventLoop): + self = .scheduleTimeoutTimerAndCreateConnection( + timeoutID: timeoutID, + newConnectionID: newConnectionID, + on: eventLoop + ) + case .createConnection(let connectionID, on: let eventLoop): + self = .createConnection(connectionID, on: eventLoop) } } } @@ -444,27 +496,32 @@ extension HTTPConnectionPool.StateMachine.ConnectionAction { _ establishedAction: HTTPConnectionPool.StateMachine.EstablishedConnectionAction ) -> Self { switch establishedAction { - case .none: + case .none, .createConnection: + // createConnection can only come from the HTTP/1 pool, so we only see this when + // migrating to HTTP/2. We can ignore it there: we already have a connection to use. return .migration( createConnections: migrationAction.createConnections, closeConnections: migrationAction.closeConnections, scheduleTimeout: nil ) + case .closeConnectionAndCreateConnection( + closeConnection: let connection, + newConnectionID: _, + on: _ + ): + // This event can only come _from_ the HTTP/1 pool, migrating to HTTP/2. We do not do prewarmed HTTP/2 connections, + // so we can ignore the request for a new connection. This is thus the same as the case below. + return Self.closeConnection(connection, isShutdown: .no, migrationAction: migrationAction) case .closeConnection(let connection, let isShutdown): - guard isShutdown == .no else { - precondition( - migrationAction.closeConnections.isEmpty && migrationAction.createConnections.isEmpty, - "migration actions are not supported during shutdown" - ) - return .closeConnection(connection, isShutdown: isShutdown) - } - var closeConnections = migrationAction.closeConnections - closeConnections.append(connection) - return .migration( - createConnections: migrationAction.createConnections, - closeConnections: closeConnections, - scheduleTimeout: nil - ) + return Self.closeConnection(connection, isShutdown: isShutdown, migrationAction: migrationAction) + case .scheduleTimeoutTimerAndCreateConnection( + timeoutID: let connectionID, + newConnectionID: _, + on: let eventLoop + ): + // This event can only come _from_ the HTTP/1 pool, migrating to HTTP/2. We do not do prewarmed HTTP/2 connections, + // so we can ignore the request for a new connection. This is thus the same as the case below. + fallthrough case .scheduleTimeoutTimer(let connectionID, let eventLoop): return .migration( createConnections: migrationAction.createConnections, @@ -473,4 +530,25 @@ extension HTTPConnectionPool.StateMachine.ConnectionAction { ) } } + + private static func closeConnection( + _ connection: HTTPConnectionPool.Connection, + isShutdown: HTTPConnectionPool.StateMachine.ConnectionAction.IsShutdown, + migrationAction: HTTPConnectionPool.StateMachine.ConnectionMigrationAction + ) -> Self { + guard isShutdown == .no else { + precondition( + migrationAction.closeConnections.isEmpty && migrationAction.createConnections.isEmpty, + "migration actions are not supported during shutdown" + ) + return .closeConnection(connection, isShutdown: isShutdown) + } + var closeConnections = migrationAction.closeConnections + closeConnections.append(connection) + return .migration( + createConnections: migrationAction.createConnections, + closeConnections: closeConnections, + scheduleTimeout: nil + ) + } } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index f516e9083..f22810378 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -1211,6 +1211,17 @@ extension HTTPClient.Configuration { /// ``HTTPClient`` will automatically mitigate these kind of issues if this flag is turned on. public var retryConnectionEstablishment: Bool = true + /// The number of pre-warmed HTTP/1 connections to maintain. + /// + /// When set to a number greater than zero, any HTTP/1 connection pool created will attempt to maintain + /// at least this number of "extra" idle connections, above the connections currently in use, up to the + /// limit of ``concurrentHTTP1ConnectionsPerHostSoftLimit``. + /// + /// These connections will not be made while the pool is idle: only once the first connection is made + /// to a host will the others be opened. In addition, to manage the connection creation rate and + /// avoid flooding servers, prewarmed connection creation will be done one-at-a-time. + public var preWarmedHTTP1ConnectionCount: Int = 0 + public init() {} public init(idleTimeout: TimeAmount) { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift index 914990048..f225307ea 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift @@ -24,7 +24,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -60,7 +61,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -115,7 +117,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) for el in [el1, el2, el3, el4] { @@ -146,7 +149,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) for el in [el1, el2, el3, el4] { @@ -177,7 +181,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) for el in [el1, el2, el3, el4] { @@ -205,7 +210,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) for el in [el1, el1, el1, el1, el2] { @@ -256,7 +262,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -280,7 +287,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -303,7 +311,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -328,7 +337,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: .init(), - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) for el in [el1, el2, el3, el4] { @@ -387,7 +397,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -420,7 +431,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -463,7 +475,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -495,7 +508,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -538,7 +552,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() let el2 = elg.next() @@ -586,7 +601,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 2, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -625,7 +641,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 1, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let el1 = elg.next() @@ -671,7 +688,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { var connections = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: generator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let connID1 = connections.createNewConnection(on: el1) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift index 2be6cfa26..8dd59baaf 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift @@ -31,7 +31,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) var connections = MockConnectionPool() @@ -116,7 +117,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: false, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) var connections = MockConnectionPool() @@ -185,7 +187,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 2, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) @@ -253,7 +256,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 2, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) @@ -294,7 +298,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 2, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) @@ -663,7 +668,7 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { return XCTFail("Expected to have one parked connection") } - let action = state.connectionIdleTimeout(connection.id) + let action = state.connectionIdleTimeout(connection.id, on: connection.eventLoop) XCTAssertEqual(action.connection, .closeConnection(connection, isShutdown: .no)) XCTAssertEqual(action.request, .none) XCTAssertNoThrow(try connections.closeConnection(connection)) @@ -711,7 +716,7 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { XCTAssertEqual(state.http1ConnectionClosed(connection.id), .none) // triggered by timer - XCTAssertEqual(state.connectionIdleTimeout(connection.id), .none) + XCTAssertEqual(state.connectionIdleTimeout(connection.id, on: connection.eventLoop), .none) } func testConnectionBackoffVsShutdownRace() { @@ -723,7 +728,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 6, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next(), requiresEventLoopForChannel: false) @@ -764,7 +770,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 6, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next(), requiresEventLoopForChannel: false) @@ -804,7 +811,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 6, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest = MockHTTPScheduableRequest(eventLoop: eventLoop.next(), requiresEventLoopForChannel: false) @@ -833,7 +841,8 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 6, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let mockRequest1 = MockHTTPScheduableRequest(eventLoop: elg.next(), requiresEventLoopForChannel: false) @@ -881,4 +890,607 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { ) XCTAssertEqual(timeoutAction.connection, .none) } + + func testPrewarmingSimpleFlow() throws { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 4 + ) + + var connectionIDs = [HTTPConnectionPool.Connection.ID]() + var connections = MockConnectionPool() + + // attempt to send one request. + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + var action = state.executeRequest(request) + guard case .createConnection(var connectionID, var connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + connectionIDs.append(connectionID) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + + // We're going to end up creating 5 connections immediately, even though only one is leased: the other 4 are pre-warmed. + for connectionIndex in 0..<5 { + let conn = try connections.succeedConnectionCreationHTTP1(connectionID) + let createdAction = state.newHTTP1ConnectionCreated(conn) + + switch createdAction.request { + case .executeRequest(_, let connection, _): + try connections.execute(mockRequest, on: connection) + case .none: + try connections.parkConnection(connectionID) + default: + return XCTFail( + "Unexpected request action \(createdAction.request), connection index: \(connectionIndex)" + ) + } + + if connectionIndex == 0, + case .createConnection(let newConnectionID, let newConnectionEL) = createdAction.connection + { + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + connectionIDs.append(connectionID) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + } else if connectionIndex < 4, + case .scheduleTimeoutTimerAndCreateConnection(let timeoutID, let newConnectionID, let newConnectionEL) = + createdAction.connection + { + XCTAssertEqual(connectionID, timeoutID) + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + connectionIDs.append(connectionID) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + } else if connectionIndex == 4, case .scheduleTimeoutTimer = createdAction.connection { + // Expected, the loop will terminate now. + () + } else { + return XCTFail( + "Unexpected connection action: \(createdAction.connection) with index \(connectionIndex)" + ) + } + } + + XCTAssertEqual(connections.count, 5) + XCTAssertEqual(connections.parked, 4) + XCTAssertEqual(connectionIDs.count, 5) + + // Now we complete the first request. + try connections.finishExecution(connectionIDs[0]) + action = state.http1ConnectionReleased(connectionIDs[0]) + guard case .scheduleTimeoutTimer = action.connection else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.parkConnection(connectionIDs[0]) + + XCTAssertEqual(connections.count, 5) + XCTAssertEqual(connections.parked, 5) + XCTAssertEqual(connectionIDs.count, 5) + } + + func testPrewarmingCreatesUpToTheMax() throws { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 4 + ) + + var connections = MockConnectionPool() + + // Attempt to send one request. Complete the connection creation immediately, deferring the next connection creation, and then complete the + // request. + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + var action = state.executeRequest(request) + guard case .createConnection(var connectionID, var connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + var conn = try connections.succeedConnectionCreationHTTP1(connectionID) + var createdAction = state.newHTTP1ConnectionCreated(conn) + guard case .createConnection(var newConnectionID, var newConnectionEL) = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + try connections.execute(mockRequest, on: conn) + try connections.finishExecution(connectionID) + action = state.http1ConnectionReleased(connectionID) + + // Here the state machine has _again_ asked us to create a connection. This is because the pre-warming + // phase takes any opportunity to do that. + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let veryDelayedConnectionID, let veryDelayedLoop) = action + .connection + else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.parkConnection(connectionID) + + // At this stage we're gonna end up creating 3 connections. No outstanding requests are present, so + // we only need the pre-warmed set, which includes the one we already made. + // + // The first will ask for another connection + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let nextConnectionID, let nextConnectionEL) = createdAction + .connection + else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + (newConnectionID, newConnectionEL) = (nextConnectionID, nextConnectionEL) + + // The second one only asks for a timeout. + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + // Now we should complete the delayed connection request. This will also only ask for a timer. + (connectionID, connectionEL) = (veryDelayedConnectionID, veryDelayedLoop) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + XCTAssertEqual(connections.count, 4) + XCTAssertEqual(connections.parked, 4) + + // Now we start sending requests. The first 4 requests will be accompanied by requests to create new connections, + // because as each connection goes out, the pre-warming creates another. We'll let them succeed. + for _ in 0..<4 { + let eventLoop = elg.next() + + let mockRequest = MockHTTPScheduableRequest(eventLoop: eventLoop) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + guard + case .createConnectionAndCancelTimeoutTimer( + let newConnectionID, + let newConnectionLoop, + let activatedConnectionID + ) = action.connection + else { + return XCTFail("Unexpected connection action: \(action)") + } + + guard case .executeRequest(_, let connection, _) = action.request else { + return XCTFail("Expected to execute a request next, but got: \(action.request)") + } + + try connections.activateConnection(activatedConnectionID) + try connections.execute(mockRequest, on: connection) + + // Now create the new connection. + XCTAssertNoThrow(try connections.createConnection(newConnectionID, on: newConnectionLoop)) + conn = try connections.succeedConnectionCreationHTTP1(newConnectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(newConnectionID) + } + + XCTAssertEqual(connections.count, 8) + XCTAssertEqual(connections.parked, 4) + + // The next 4 should _not_ ask to create new connections. We're at the cap, and prewarming can't exceed it. + for _ in 0..<4 { + let eventLoop = elg.next() + + let mockRequest = MockHTTPScheduableRequest(eventLoop: eventLoop) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + guard case .cancelTimeoutTimer(let activatedConnectionID) = action.connection else { + return XCTFail("Unexpected connection action: \(action)") + } + + guard case .executeRequest(_, let connection, _) = action.request else { + return XCTFail("Expected to execute a request next, but got: \(action.request)") + } + + try connections.activateConnection(activatedConnectionID) + try connections.execute(mockRequest, on: connection) + } + + XCTAssertEqual(connections.count, 8) + XCTAssertEqual(connections.parked, 0) + + while let connectionID = connections.randomActiveConnection() { + try connections.finishExecution(connectionID) + action = state.http1ConnectionReleased(connectionID) + + guard case .scheduleTimeoutTimer = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + } + + XCTAssertEqual(connections.count, 8) + XCTAssertEqual(connections.parked, 0) + } + + func testPrewarmingAffectsConnectionFailure() throws { + struct SomeError: Error {} + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 4 + ) + + var connections = MockConnectionPool() + + // Attempt to send one request. Complete the connection creation immediately, deferring the next connection creation, and then complete the + // request. + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + var action = state.executeRequest(request) + guard case .createConnection(var connectionID, var connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + var conn = try connections.succeedConnectionCreationHTTP1(connectionID) + var createdAction = state.newHTTP1ConnectionCreated(conn) + guard case .createConnection(var newConnectionID, var newConnectionEL) = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + try connections.execute(mockRequest, on: conn) + try connections.finishExecution(connectionID) + action = state.http1ConnectionReleased(connectionID) + + // Here the state machine has _again_ asked us to create a connection. This is because the pre-warming + // phase takes any opportunity to do that. + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let veryDelayedConnectionID, let veryDelayedLoop) = action + .connection + else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.parkConnection(connectionID) + + // At this stage we're gonna end up creating 3 connections. No outstanding requests are present, so + // we only need the pre-warmed set, which includes the one we already made. + // + // The first will ask for another connection + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let nextConnectionID, let nextConnectionEL) = createdAction + .connection + else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + (newConnectionID, newConnectionEL) = (nextConnectionID, nextConnectionEL) + + // The second one only asks for a timeout. + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + // Now we should complete the delayed connection request. This will also only ask for a timer. + (connectionID, connectionEL) = (veryDelayedConnectionID, veryDelayedLoop) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + XCTAssertEqual(connections.count, 4) + XCTAssertEqual(connections.parked, 4) + + // Now, one of these connections idle-fails. + let parked = connections.randomParkedConnection()! + try connections.closeConnection(parked) + action = state.http1ConnectionClosed(parked.id) + + guard case .createConnection(var id, on: let loop) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + + // A reasonable request. But it fails! + // + // Let's do this next bit a few times to convince ourselves it's a real problem. + for _ in 0..<8 { + // We're asked to schedule a backoff timer. + action = state.failedToCreateNewConnection(SomeError(), connectionID: id) + guard case .scheduleBackoffTimer(let backoffID, _, _) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + XCTAssertEqual(backoffID, id) + + // Once it passes, ask what to do. We'll be asked, again, to create a connection. + action = state.connectionCreationBackoffDone(backoffID) + guard case .createConnection(let backedOffID, on: let backedOffLoop) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + XCTAssertNotEqual(backedOffID, id) + XCTAssertIdentical(backedOffLoop, loop) + id = backedOffID + } + + // Finally it works. + XCTAssertNoThrow(try connections.createConnection(id, on: loop)) + conn = try connections.succeedConnectionCreationHTTP1(id) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(id) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + } + + func testIdleConnectionTimeoutHandlingWithPrewarming() throws { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 4 + ) + + var connections = MockConnectionPool() + + // Attempt to send one request. Complete the connection creation immediately, deferring the next connection creation, and then complete the + // request. + var mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + var request = HTTPConnectionPool.Request(mockRequest) + var action = state.executeRequest(request) + guard case .createConnection(var connectionID, var connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + var conn = try connections.succeedConnectionCreationHTTP1(connectionID) + var createdAction = state.newHTTP1ConnectionCreated(conn) + guard case .createConnection(var newConnectionID, var newConnectionEL) = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + try connections.execute(mockRequest, on: conn) + try connections.finishExecution(connectionID) + action = state.http1ConnectionReleased(connectionID) + + // Here the state machine has _again_ asked us to create a connection. This is because the pre-warming + // phase takes any opportunity to do that. + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let veryDelayedConnectionID, let veryDelayedLoop) = action + .connection + else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.parkConnection(connectionID) + + // At this stage we're gonna end up creating 3 connections. No outstanding requests are present, so + // we only need the pre-warmed set, which includes the one we already made. + // + // The first will ask for another connection + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let nextConnectionID, let nextConnectionEL) = createdAction + .connection + else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + (newConnectionID, newConnectionEL) = (nextConnectionID, nextConnectionEL) + + // The second one only asks for a timeout. + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + // Now we should complete the delayed connection request. This will also only ask for a timer. + (connectionID, connectionEL) = (veryDelayedConnectionID, veryDelayedLoop) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + + XCTAssertEqual(connections.count, 4) + XCTAssertEqual(connections.parked, 4) + + // Now, the idle timeout timer fires. We can do this a few times, it'll keep + // re-arming. + for _ in 0..<8 { + action = state.connectionIdleTimeout(connectionID, on: connectionEL) + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + } + + // Let's force another connection to be created for a request. + mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + request = HTTPConnectionPool.Request(mockRequest) + action = state.executeRequest(request) + guard + case .createConnectionAndCancelTimeoutTimer(let extraConnectionID, let extraConnectionEL, _) = action + .connection + else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + guard case .executeRequest(_, let requestConnection, _) = action.request else { + return XCTFail("Unexpected request action") + } + + XCTAssertNoThrow(try connections.createConnection(extraConnectionID, on: extraConnectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(extraConnectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + try connections.activateConnection(requestConnection.id) + try connections.execute(mockRequest, on: requestConnection) + try connections.finishExecution(requestConnection.id) + try connections.parkConnection(requestConnection.id) + action = state.http1ConnectionReleased(requestConnection.id) + + // Back to idle. + guard case .scheduleTimeoutTimer = action.connection else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.parkConnection(extraConnectionID) + + XCTAssertEqual(connections.count, 5) + XCTAssertEqual(connections.parked, 5) + + // This time when the idle timeout fires, we're actually asked to close the connection. + action = state.connectionIdleTimeout(connectionID, on: connectionEL) + guard case .closeConnection = action.connection else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + } + + func testPrewarmingForcesReCreationOfConnectionsWhenTheyHitMaxUses() throws { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + // The scenario we want to hit can only happen when there is never a spare pre-warmed connection + // in the pool _and_ we can't create more. The easiest way to test this is to just + // create pre-warmed connections up to the pool limit, which they won't pass. + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: 1, + preWarmedHTTP1ConnectionCount: 8 + ) + + var connections = MockConnectionPool() + + // Attempt to send one request. Complete the connection creation immediately, deferring the next connection creation, but don't + // complete the request. + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + var action = state.executeRequest(request) + guard case .createConnection(var connectionID, var connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + var conn = try connections.succeedConnectionCreationHTTP1(connectionID) + var createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + guard case .createConnection(var newConnectionID, var newConnectionEL) = createdAction.connection else { + return XCTFail("Unexpected connection action") + } + guard case .executeRequest(_, let requestConn, _) = createdAction.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + + // At this stage we're gonna end up creating 7 more connections. No outstanding requests are present, so + // we only need the pre-warmed set, which includes the one we already made. + // + // The first six will ask for another connection. + for _ in 0..<6 { + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard + case .scheduleTimeoutTimerAndCreateConnection(_, let nextConnectionID, let nextConnectionEL) = + createdAction.connection + else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + (newConnectionID, newConnectionEL) = (nextConnectionID, nextConnectionEL) + } + + // The seventh one only asks for a timeout. + (connectionID, connectionEL) = (newConnectionID, newConnectionEL) + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + conn = try connections.succeedConnectionCreationHTTP1(connectionID) + createdAction = state.newHTTP1ConnectionCreated(conn) + try connections.parkConnection(connectionID) + + guard case .scheduleTimeoutTimer = createdAction.connection else { + return XCTFail("Unexpected connection action: \(createdAction.connection)") + } + + XCTAssertEqual(connections.count, 8) + XCTAssertEqual(connections.parked, 8) + + // Now we're gonna actually complete that request from earlier. + try connections.activateConnection(requestConn.id) + try connections.execute(mockRequest, on: requestConn) + try connections.finishExecution(requestConn.id) + action = state.http1ConnectionReleased(requestConn.id) + + // Here the state machine has asked us to close the connection and create a new one. That's because we've hit the + // max usages limit. + guard case .closeConnectionAndCreateConnection(let toClose, _, _) = action.connection else { + return XCTFail("Unexpected action: \(action.connection)") + } + try connections.closeConnection(toClose) + + // We won't bother doing it though, it's enough that it asked. + } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index e64fd5e71..d59dae796 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -393,6 +393,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentConnections: 8, retryConnectionEstablishment: true, maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0, lifecycleState: .running ) @@ -475,7 +476,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -537,7 +539,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -582,7 +585,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -634,7 +638,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -676,7 +681,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -729,7 +735,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var http1Conns = HTTPConnectionPool.HTTP1Connections( maximumConcurrentConnections: 8, generator: idGenerator, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) let conn1ID = http1Conns.createNewConnection(on: el1) var state = HTTPConnectionPool.HTTP2StateMachine( @@ -805,7 +812,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) /// first 8 request should create a new connection @@ -900,7 +908,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: false, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) /// create a new connection @@ -948,7 +957,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) /// first 8 request should create a new connection @@ -1092,7 +1102,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: false, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) // create http2 connection @@ -1173,7 +1184,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: false, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) // create http2 connection @@ -1260,7 +1272,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: false, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] @@ -1505,7 +1518,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connection = connections.randomParkedConnection()! XCTAssertNoThrow(try connections.closeConnection(connection)) - let idleTimeoutAction = state.connectionIdleTimeout(connection.id) + let idleTimeoutAction = state.connectionIdleTimeout(connection.id, on: connection.eventLoop) XCTAssertEqual(idleTimeoutAction.connection, .closeConnection(connection, isShutdown: .no)) XCTAssertEqual(idleTimeoutAction.request, .none) diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index e49c67f19..a6b48fb9a 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -545,7 +545,8 @@ extension MockConnectionPool { maximumConcurrentHTTP1Connections: maxNumberOfConnections, retryConnectionEstablishment: true, preferHTTP1: true, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) var connections = MockConnectionPool() var queuer = MockRequestQueuer() @@ -613,7 +614,8 @@ extension MockConnectionPool { maximumConcurrentHTTP1Connections: 8, retryConnectionEstablishment: true, preferHTTP1: false, - maximumConnectionUses: nil + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 ) var connections = MockConnectionPool() var queuer = MockRequestQueuer()