Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection pool optimization: reduce connection maintenance loops complexity #929

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import ssl
import sys
from types import TracebackType
from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type
from typing import (
AsyncIterable,
AsyncIterator,
Iterable,
List,
Optional,
Type,
)

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
Expand Down Expand Up @@ -238,6 +245,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,52 +257,56 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
available_connection = next(
(
connection
for connection in self._connections
if connection.can_handle_request(origin)
and connection.is_available()
),
None,
)

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
if available_connection is not None:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
pool_request.assign_to_connection(available_connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
68 changes: 40 additions & 28 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import ssl
import sys
from types import TracebackType
from typing import Iterable, Iterator, Iterable, List, Optional, Type
from typing import (
Iterable,
Iterator,
Iterable,
List,
Optional,
Type,
)

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
Expand Down Expand Up @@ -238,6 +245,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,52 +257,56 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a copy of the list here?

if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
available_connection = next(
(
connection
for connection in self._connections
if connection.can_handle_request(origin)
and connection.is_available()
),
None,
)

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
if available_connection is not None:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
pool_request.assign_to_connection(available_connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we continue iterating over self._requests if there is no available or idling connection? Can we break in such cases?

# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
Loading