Skip to content

Commit b6b119c

Browse files
Optimize connection pool
1 parent da86ca4 commit b6b119c

File tree

6 files changed

+286
-60
lines changed

6 files changed

+286
-60
lines changed

httpcore/_async/connection_pool.py

+21-19
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
238238
those connections to be handled seperately.
239239
"""
240240
closing_connections = []
241+
idling_count = 0
241242

242243
# First we handle cleaning up any connections that are closed,
243244
# have expired their keep-alive, or surplus idle connections.
@@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
249250
# log: "closing expired connection"
250251
self._connections.remove(connection)
251252
closing_connections.append(connection)
252-
elif (
253-
connection.is_idle()
254-
and len([connection.is_idle() for connection in self._connections])
255-
> self._max_keepalive_connections
256-
):
253+
elif connection.is_idle():
254+
if idling_count < self._max_keepalive_connections:
255+
idling_count += 1
256+
continue
257257
# log: "closing idle connection"
258258
self._connections.remove(connection)
259259
closing_connections.append(connection)
260260

261261
# Assign queued requests to connections.
262-
queued_requests = [request for request in self._requests if request.is_queued()]
263-
for pool_request in queued_requests:
262+
for pool_request in list(self._requests):
263+
if not pool_request.is_queued():
264+
continue
265+
264266
origin = pool_request.request.url.origin
265267
available_connections = [
266268
connection
267269
for connection in self._connections
268270
if connection.can_handle_request(origin) and connection.is_available()
269271
]
270-
idle_connections = [
271-
connection for connection in self._connections if connection.is_idle()
272-
]
273272

274273
# There are three cases for how we may be able to handle the request:
275274
#
@@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
286285
connection = self.create_connection(origin)
287286
self._connections.append(connection)
288287
pool_request.assign_to_connection(connection)
289-
elif idle_connections:
290-
# log: "closing idle connection"
291-
connection = idle_connections[0]
292-
self._connections.remove(connection)
293-
closing_connections.append(connection)
294-
# log: "creating new connection"
295-
connection = self.create_connection(origin)
296-
self._connections.append(connection)
297-
pool_request.assign_to_connection(connection)
288+
else:
289+
idling_connection = next(
290+
(c for c in self._connections if c.is_idle()), None
291+
)
292+
if idling_connection is not None:
293+
# log: "closing idle connection"
294+
self._connections.remove(idling_connection)
295+
closing_connections.append(idling_connection)
296+
# log: "creating new connection"
297+
new_connection = self.create_connection(origin)
298+
self._connections.append(new_connection)
299+
pool_request.assign_to_connection(new_connection)
298300

299301
return closing_connections
300302

httpcore/_async/http11.py

+25-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import logging
3+
import random
34
import ssl
45
import time
56
from types import TracebackType
@@ -56,10 +57,12 @@ def __init__(
5657
origin: Origin,
5758
stream: AsyncNetworkStream,
5859
keepalive_expiry: Optional[float] = None,
60+
socket_poll_interval_between: Tuple[float, float] = (1, 3),
5961
) -> None:
6062
self._origin = origin
6163
self._network_stream = stream
62-
self._keepalive_expiry: Optional[float] = keepalive_expiry
64+
self._keepalive_expiry = keepalive_expiry
65+
self._socket_poll_interval_between = socket_poll_interval_between
6366
self._expire_at: Optional[float] = None
6467
self._state = HTTPConnectionState.NEW
6568
self._state_lock = AsyncLock()
@@ -68,6 +71,8 @@ def __init__(
6871
our_role=h11.CLIENT,
6972
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
7073
)
74+
# Assuming we were just connected
75+
self._network_stream_used_at = time.monotonic()
7176

7277
async def handle_async_request(self, request: Request) -> Response:
7378
if not self.can_handle_request(request.url.origin):
@@ -173,6 +178,7 @@ async def _send_event(
173178
bytes_to_send = self._h11_state.send(event)
174179
if bytes_to_send is not None:
175180
await self._network_stream.write(bytes_to_send, timeout=timeout)
181+
self._network_stream_used_at = time.monotonic()
176182

177183
# Receiving the response...
178184

@@ -224,6 +230,7 @@ async def _receive_event(
224230
data = await self._network_stream.read(
225231
self.READ_NUM_BYTES, timeout=timeout
226232
)
233+
self._network_stream_used_at = time.monotonic()
227234

228235
# If we feed this case through h11 we'll raise an exception like:
229236
#
@@ -281,16 +288,28 @@ def is_available(self) -> bool:
281288
def has_expired(self) -> bool:
282289
now = time.monotonic()
283290
keepalive_expired = self._expire_at is not None and now > self._expire_at
291+
if keepalive_expired:
292+
return True
284293

285294
# If the HTTP connection is idle but the socket is readable, then the
286295
# only valid state is that the socket is about to return b"", indicating
287296
# a server-initiated disconnect.
288-
server_disconnected = (
289-
self._state == HTTPConnectionState.IDLE
290-
and self._network_stream.get_extra_info("is_readable")
291-
)
297+
# Checking the readable status is relatively expensive so check it at a lower frequency.
298+
if (now - self._network_stream_used_at) > self._socket_poll_interval():
299+
self._network_stream_used_at = now
300+
server_disconnected = (
301+
self._state == HTTPConnectionState.IDLE
302+
and self._network_stream.get_extra_info("is_readable")
303+
)
304+
if server_disconnected:
305+
return True
306+
307+
return False
292308

293-
return keepalive_expired or server_disconnected
309+
def _socket_poll_interval(self) -> float:
310+
# Randomize to avoid polling for all the connections at once
311+
low, high = self._socket_poll_interval_between
312+
return random.uniform(low, high)
294313

295314
def is_idle(self) -> bool:
296315
return self._state == HTTPConnectionState.IDLE

httpcore/_sync/connection_pool.py

+21-19
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
238238
those connections to be handled seperately.
239239
"""
240240
closing_connections = []
241+
idling_count = 0
241242

242243
# First we handle cleaning up any connections that are closed,
243244
# have expired their keep-alive, or surplus idle connections.
@@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
249250
# log: "closing expired connection"
250251
self._connections.remove(connection)
251252
closing_connections.append(connection)
252-
elif (
253-
connection.is_idle()
254-
and len([connection.is_idle() for connection in self._connections])
255-
> self._max_keepalive_connections
256-
):
253+
elif connection.is_idle():
254+
if idling_count < self._max_keepalive_connections:
255+
idling_count += 1
256+
continue
257257
# log: "closing idle connection"
258258
self._connections.remove(connection)
259259
closing_connections.append(connection)
260260

261261
# Assign queued requests to connections.
262-
queued_requests = [request for request in self._requests if request.is_queued()]
263-
for pool_request in queued_requests:
262+
for pool_request in list(self._requests):
263+
if not pool_request.is_queued():
264+
continue
265+
264266
origin = pool_request.request.url.origin
265267
available_connections = [
266268
connection
267269
for connection in self._connections
268270
if connection.can_handle_request(origin) and connection.is_available()
269271
]
270-
idle_connections = [
271-
connection for connection in self._connections if connection.is_idle()
272-
]
273272

274273
# There are three cases for how we may be able to handle the request:
275274
#
@@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
286285
connection = self.create_connection(origin)
287286
self._connections.append(connection)
288287
pool_request.assign_to_connection(connection)
289-
elif idle_connections:
290-
# log: "closing idle connection"
291-
connection = idle_connections[0]
292-
self._connections.remove(connection)
293-
closing_connections.append(connection)
294-
# log: "creating new connection"
295-
connection = self.create_connection(origin)
296-
self._connections.append(connection)
297-
pool_request.assign_to_connection(connection)
288+
else:
289+
idling_connection = next(
290+
(c for c in self._connections if c.is_idle()), None
291+
)
292+
if idling_connection is not None:
293+
# log: "closing idle connection"
294+
self._connections.remove(idling_connection)
295+
closing_connections.append(idling_connection)
296+
# log: "creating new connection"
297+
new_connection = self.create_connection(origin)
298+
self._connections.append(new_connection)
299+
pool_request.assign_to_connection(new_connection)
298300

299301
return closing_connections
300302

httpcore/_sync/http11.py

+25-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import enum
22
import logging
3+
import random
34
import ssl
45
import time
56
from types import TracebackType
@@ -56,10 +57,12 @@ def __init__(
5657
origin: Origin,
5758
stream: NetworkStream,
5859
keepalive_expiry: Optional[float] = None,
60+
socket_poll_interval_between: Tuple[float, float] = (1, 3),
5961
) -> None:
6062
self._origin = origin
6163
self._network_stream = stream
62-
self._keepalive_expiry: Optional[float] = keepalive_expiry
64+
self._keepalive_expiry = keepalive_expiry
65+
self._socket_poll_interval_between = socket_poll_interval_between
6366
self._expire_at: Optional[float] = None
6467
self._state = HTTPConnectionState.NEW
6568
self._state_lock = Lock()
@@ -68,6 +71,8 @@ def __init__(
6871
our_role=h11.CLIENT,
6972
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
7073
)
74+
# Assuming we were just connected
75+
self._network_stream_used_at = time.monotonic()
7176

7277
def handle_request(self, request: Request) -> Response:
7378
if not self.can_handle_request(request.url.origin):
@@ -173,6 +178,7 @@ def _send_event(
173178
bytes_to_send = self._h11_state.send(event)
174179
if bytes_to_send is not None:
175180
self._network_stream.write(bytes_to_send, timeout=timeout)
181+
self._network_stream_used_at = time.monotonic()
176182

177183
# Receiving the response...
178184

@@ -224,6 +230,7 @@ def _receive_event(
224230
data = self._network_stream.read(
225231
self.READ_NUM_BYTES, timeout=timeout
226232
)
233+
self._network_stream_used_at = time.monotonic()
227234

228235
# If we feed this case through h11 we'll raise an exception like:
229236
#
@@ -281,16 +288,28 @@ def is_available(self) -> bool:
281288
def has_expired(self) -> bool:
282289
now = time.monotonic()
283290
keepalive_expired = self._expire_at is not None and now > self._expire_at
291+
if keepalive_expired:
292+
return True
284293

285294
# If the HTTP connection is idle but the socket is readable, then the
286295
# only valid state is that the socket is about to return b"", indicating
287296
# a server-initiated disconnect.
288-
server_disconnected = (
289-
self._state == HTTPConnectionState.IDLE
290-
and self._network_stream.get_extra_info("is_readable")
291-
)
297+
# Checking the readable status is relatively expensive so check it at a lower frequency.
298+
if (now - self._network_stream_used_at) > self._socket_poll_interval():
299+
self._network_stream_used_at = now
300+
server_disconnected = (
301+
self._state == HTTPConnectionState.IDLE
302+
and self._network_stream.get_extra_info("is_readable")
303+
)
304+
if server_disconnected:
305+
return True
306+
307+
return False
292308

293-
return keepalive_expired or server_disconnected
309+
def _socket_poll_interval(self) -> float:
310+
# Randomize to avoid polling for all the connections at once
311+
low, high = self._socket_poll_interval_between
312+
return random.uniform(low, high)
294313

295314
def is_idle(self) -> bool:
296315
return self._state == HTTPConnectionState.IDLE

0 commit comments

Comments
 (0)