Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ final class HTTPConnectionPool:
.concurrentHTTP1ConnectionsPerHostSoftLimit,
retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment,
preferHTTP1: clientConfiguration.httpVersion == .http1Only,
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection,
preWarmedHTTP1ConnectionCount: clientConfiguration.connectionPool.preWarmedHTTP1ConnectionCount
)
}

Expand All @@ -104,6 +105,11 @@ final class HTTPConnectionPool:
enum Unlocked {
case createConnection(Connection.ID, on: EventLoop)
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case closeConnectionAndCreateConnection(
close: Connection,
newConnectionID: Connection.ID,
on: EventLoop
)
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case migration(
createConnections: [(Connection.ID, EventLoop)],
Expand Down Expand Up @@ -185,12 +191,27 @@ final class HTTPConnectionPool:
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
case .scheduleTimeoutTimerAndCreateConnection(let timeoutID, let newConnectionID, let eventLoop):
self.locked.connection = .scheduleTimeoutTimer(timeoutID, on: eventLoop)
self.unlocked.connection = .createConnection(newConnectionID, on: eventLoop)
case .cancelTimeoutTimer(let connectionID):
self.locked.connection = .cancelTimeoutTimer(connectionID)
case .createConnectionAndCancelTimeoutTimer(let createdID, on: let eventLoop, cancelTimerID: let cancelID):
self.unlocked.connection = .createConnection(createdID, on: eventLoop)
self.locked.connection = .cancelTimeoutTimer(cancelID)
case .closeConnection(let connection, let isShutdown):
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
case .closeConnectionAndCreateConnection(
let closeConnection,
let newConnectionID,
let eventLoop
):
self.unlocked.connection = .closeConnectionAndCreateConnection(
close: closeConnection,
newConnectionID: newConnectionID,
on: eventLoop
)
case .cleanupConnections(var cleanupContext, let isShutdown):
//
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
cleanupContext.connectBackoff = []
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
Expand Down Expand Up @@ -287,6 +308,23 @@ final class HTTPConnectionPool:
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
}

case .closeConnectionAndCreateConnection(
let connectionToClose,
let newConnectionID,
let eventLoop
):
self.logger.trace(
"closing and creating connection",
metadata: [
"ahc-connection-id": "\(connectionToClose.id)"
]
)

self.createConnection(newConnectionID, on: eventLoop)

// we are not interested in the close promise...
connectionToClose.close(promise: nil)

case .cleanupConnections(let cleanupContext, let isShutdown):
for connection in cleanupContext.close {
connection.close(promise: nil)
Expand Down Expand Up @@ -400,7 +438,7 @@ final class HTTPConnectionPool:
self.modifyStateAndRunActions { stateMachine in
if self._idleTimer.removeValue(forKey: connectionID) != nil {
// The timer still exists. State Machines assumes it is alive
return stateMachine.connectionIdleTimeout(connectionID)
return stateMachine.connectionIdleTimeout(connectionID, on: eventLoop)
}
return .none
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,32 +262,30 @@ extension HTTPConnectionPool {
private var overflowIndex: Array<HTTP1ConnectionState>.Index
/// The number of times each connection can be used before it is closed and replaced.
private let maximumConnectionUses: Int?

init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator, maximumConnectionUses: Int?) {
/// How many pre-warmed connections we should create.
private let preWarmedConnectionCount: Int

init(
maximumConcurrentConnections: Int,
generator: Connection.ID.Generator,
maximumConnectionUses: Int?,
preWarmedHTTP1ConnectionCount: Int
) {
self.connections = []
self.connections.reserveCapacity(min(maximumConcurrentConnections, 1024))
self.overflowIndex = self.connections.endIndex
self.maximumConcurrentConnections = maximumConcurrentConnections
self.generator = generator
self.maximumConnectionUses = maximumConnectionUses
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
}

var stats: Stats {
var stats = Stats()
// all additions here can be unchecked, since we will have at max self.connections.count
// which itself is an Int. For this reason we will never overflow.
for connectionState in self.connections {
if connectionState.isConnecting {
stats.connecting &+= 1
} else if connectionState.isBackingOff {
stats.backingOff &+= 1
} else if connectionState.isLeased {
stats.leased &+= 1
} else if connectionState.isIdle {
stats.idle &+= 1
}
}
return stats
self.connectionStats(in: self.connections.startIndex..<self.connections.endIndex)
}

var generalPurposeStats: Stats {
self.connectionStats(in: self.connections.startIndex..<self.overflowIndex)
}

var isEmpty: Bool {
Expand Down Expand Up @@ -328,6 +326,24 @@ extension HTTPConnectionPool {
}
}

private func connectionStats(in range: Range<Int>) -> Stats {
var stats = Stats()
// all additions here can be unchecked, since we will have at max self.connections.count
// which itself is an Int. For this reason we will never overflow.
for connectionState in self.connections[range] {
if connectionState.isConnecting {
stats.connecting &+= 1
} else if connectionState.isBackingOff {
stats.backingOff &+= 1
} else if connectionState.isLeased {
stats.leased &+= 1
} else if connectionState.isIdle {
stats.idle &+= 1
}
}
return stats
}

// MARK: - Mutations -

/// A connection's use. Did it serve in the pool or was it specialized for an `EventLoop`?
Expand Down Expand Up @@ -836,6 +852,10 @@ extension HTTPConnectionPool {
var leased: Int = 0
var connecting: Int = 0
var backingOff: Int = 0

var nonLeased: Int {
self.idle + self.connecting + self.backingOff
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,23 @@ extension HTTPConnectionPool {
/// The property was introduced to fail fast during testing.
/// Otherwise this should always be true and not turned off.
private let retryConnectionEstablishment: Bool
private let preWarmedConnectionCount: Int

init(
idGenerator: Connection.ID.Generator,
maximumConcurrentConnections: Int,
retryConnectionEstablishment: Bool,
maximumConnectionUses: Int?,
preWarmedHTTP1ConnectionCount: Int,
lifecycleState: StateMachine.LifecycleState
) {
self.connections = HTTP1Connections(
maximumConcurrentConnections: maximumConcurrentConnections,
generator: idGenerator,
maximumConnectionUses: maximumConnectionUses
maximumConnectionUses: maximumConnectionUses,
preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount
)
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
self.retryConnectionEstablishment = retryConnectionEstablishment

self.requests = RequestQueue()
Expand Down Expand Up @@ -145,9 +149,26 @@ extension HTTPConnectionPool {

private mutating func executeRequestOnPreferredEventLoop(_ request: Request, eventLoop: EventLoop) -> Action {
if let connection = self.connections.leaseConnection(onPreferred: eventLoop) {
// Cool, a connection is available. If using this would put us below our needed extra set, we
// should create another.
let stats = self.connections.generalPurposeStats
let needExtraConnection =
stats.nonLeased < (self.requests.count + self.preWarmedConnectionCount) && self.connections.canGrow
let action: StateMachine.ConnectionAction

if needExtraConnection {
action = .createConnectionAndCancelTimeoutTimer(
createdID: self.connections.createNewConnection(on: eventLoop),
on: eventLoop,
cancelTimerID: connection.id
)
} else {
action = .cancelTimeoutTimer(connection.id)
}

return .init(
request: .executeRequest(request, connection, cancelTimeout: false),
connection: .cancelTimeoutTimer(connection.id)
connection: action
)
}

Expand Down Expand Up @@ -294,7 +315,20 @@ extension HTTPConnectionPool {
}
}

mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
mutating func connectionIdleTimeout(_ connectionID: Connection.ID, on eventLoop: any EventLoop) -> Action {
// Don't close idle connections if we need pre-warmed connections. Instead, re-arm the idle timer.
// We still want the idle timers to make sure we eventually fall below the pre-warmed limit.
if self.preWarmedConnectionCount > 0 {
let stats = self.connections.generalPurposeStats
if stats.idle <= self.preWarmedConnectionCount {
return .init(
request: .none,
connection: .scheduleTimeoutTimer(connectionID, on: eventLoop)
)
}
}

// Ok, we do actually want the connection count to go down.
guard let connection = self.connections.closeConnectionIfIdle(connectionID) else {
// because of a race this connection (connection close runs against trigger of timeout)
// was already removed from the state machine.
Expand Down Expand Up @@ -410,11 +444,7 @@ extension HTTPConnectionPool {
case .running:
// Close the connection if it's expired.
if context.shouldBeClosed {
let connection = self.connections.closeConnection(at: index)
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)
return self.nextActionForToBeClosedIdleConnection(at: index, context: context)
} else {
switch context.use {
case .generalPurpose:
Expand Down Expand Up @@ -446,28 +476,63 @@ extension HTTPConnectionPool {
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> EstablishedAction {
var requestAction = HTTPConnectionPool.StateMachine.RequestAction.none
var parkedConnectionDetails: (HTTPConnectionPool.Connection.ID, any EventLoop)? = nil

// 1. Check if there are waiting requests in the general purpose queue
if let request = self.requests.popFirst(for: nil) {
return .init(
request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true),
connection: .none
requestAction = .executeRequest(
request,
self.connections.leaseConnection(at: index),
cancelTimeout: true
)
}

// 2. Check if there are waiting requests in the matching eventLoop queue
if let request = self.requests.popFirst(for: context.eventLoop) {
return .init(
request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true),
connection: .none
if case .none = requestAction, let request = self.requests.popFirst(for: context.eventLoop) {
requestAction = .executeRequest(
request,
self.connections.leaseConnection(at: index),
cancelTimeout: true
)
}

// 3. Create a timeout timer to ensure the connection is closed if it is idle for too
// long.
let (connectionID, eventLoop) = self.connections.parkConnection(at: index)
// long, assuming we don't already have a use for it.
if case .none = requestAction {
parkedConnectionDetails = self.connections.parkConnection(at: index)
}

// 4. We may need to create another connection to make sure we have enough pre-warmed ones.
// We need to do that if we have fewer non-leased connections than we need pre-warmed ones _and_ the pool can grow.
// Note that in this case we don't need to account for the number of pending requests, as that is 0: step 1
// confirmed that.
let connectionAction: EstablishedConnectionAction

if self.connections.generalPurposeStats.nonLeased < self.preWarmedConnectionCount
&& self.connections.canGrow
{
// Re-use the event loop of the connection that just got created.
if let parkedConnectionDetails {
let newConnectionID = self.connections.createNewConnection(on: parkedConnectionDetails.1)
connectionAction = .scheduleTimeoutTimerAndCreateConnection(
timeoutID: parkedConnectionDetails.0,
newConnectionID: newConnectionID,
on: parkedConnectionDetails.1
)
} else {
let newConnectionID = self.connections.createNewConnection(on: context.eventLoop)
connectionAction = .createConnection(connectionID: newConnectionID, on: context.eventLoop)
}
} else if let parkedConnectionDetails {
connectionAction = .scheduleTimeoutTimer(parkedConnectionDetails.0, on: parkedConnectionDetails.1)
} else {
connectionAction = .none
}

return .init(
request: .none,
connection: .scheduleTimeoutTimer(connectionID, on: eventLoop)
request: requestAction,
connection: connectionAction
)
}

Expand Down Expand Up @@ -495,6 +560,37 @@ extension HTTPConnectionPool {
)
}

private mutating func nextActionForToBeClosedIdleConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> EstablishedAction {
// Step 1: Tell the connection pool to drop what it knows about this object.
let connectionToClose = self.connections.closeConnection(at: index)

// Step 2: Check whether we need a connection to replace this one. We do if we have fewer non-leased connections
// than we requests + minimumPrewarming count _and_ the pool can grow. Note that in many cases the above closure
// will have made some space, which is just fine.
let nonLeased = self.connections.generalPurposeStats.nonLeased
let neededNonLeased = self.requests.generalPurposeCount + self.preWarmedConnectionCount

let connectionAction: EstablishedConnectionAction
if nonLeased < neededNonLeased && self.connections.canGrow {
// We re-use the EL of the connection we just closed.
let newConnectionID = self.connections.createNewConnection(on: connectionToClose.eventLoop)
connectionAction = .closeConnectionAndCreateConnection(
closeConnection: connectionToClose,
newConnectionID: newConnectionID,
on: connectionToClose.eventLoop
)
} else {
connectionAction = .closeConnection(connectionToClose, isShutdown: .no)
}
return .init(
request: .none,
connection: connectionAction
)
}

// MARK: Failed/Closed connection management

private mutating func nextActionForFailedConnection(
Expand Down Expand Up @@ -530,7 +626,10 @@ extension HTTPConnectionPool {
at index: Int,
context: HTTP1Connections.FailedConnectionContext
) -> Action {
if context.connectionsStartingForUseCase < self.requests.generalPurposeCount {
let needConnectionForRequest =
context.connectionsStartingForUseCase
< (self.requests.generalPurposeCount + self.preWarmedConnectionCount)
if needConnectionForRequest {
// if we have more requests queued up, than we have starting connections, we should
// create a new connection
let (newConnectionID, newEventLoop) = self.connections.replaceConnection(at: index)
Expand Down
Loading