Skip to content

Commit

Permalink
Optimize connection pool loop check
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusSintonen committed Jun 15, 2024
1 parent da86ca4 commit a8d638a
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 56 deletions.
68 changes: 40 additions & 28 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import ssl
import sys
from types import TracebackType
from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type
from typing import (
AsyncIterable,
AsyncIterator,
Iterable,
List,
Optional,
Type,
)

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
Expand Down Expand Up @@ -238,6 +245,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,52 +257,56 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
available_connection = next(
(
connection
for connection in self._connections
if connection.can_handle_request(origin)
and connection.is_available()
),
None,
)

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
if available_connection is not None:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
pool_request.assign_to_connection(available_connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
68 changes: 40 additions & 28 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import ssl
import sys
from types import TracebackType
from typing import Iterable, Iterator, Iterable, List, Optional, Type
from typing import (
Iterable,
Iterator,
Iterable,
List,
Optional,
Type,
)

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
Expand Down Expand Up @@ -238,6 +245,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,52 +257,56 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
available_connection = next(
(
connection
for connection in self._connections
if connection.can_handle_request(origin)
and connection.is_available()
),
None,
)

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
if available_connection is not None:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
pool_request.assign_to_connection(available_connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down

0 comments on commit a8d638a

Please sign in to comment.