Skip to content
5 changes: 5 additions & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ def is_closed(self) -> bool:
return self._connect_failed
return self._connection.is_closed()

def get_available_stream_capacity(self) -> int:
if self._connection is None:
return 1
return self._connection.get_available_stream_capacity()

def info(self) -> str:
if self._connection is None:
return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
Expand Down
194 changes: 139 additions & 55 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,66 +277,150 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
Any closing connections are returned, allowing the I/O for closing
those connections to be handled seperately.
"""
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
# Initialize connection buckets
closing_conns: list[AsyncConnectionInterface] = []
available_conns: list[AsyncConnectionInterface] = []
occupied_conns: list[AsyncConnectionInterface] = []

# Track HTTP/2 connection capacity
http2_conn_stream_capacity: dict[AsyncConnectionInterface, int] = {}

# Phase 1: Categorize all connections in a single pass
for conn in self._connections:
if conn.is_closed():
# Closed connections are simply skipped (not added to any bucket)
continue
elif conn.has_expired():
# Expired connections need to be closed
closing_conns.append(conn)
elif conn.is_available():
# Available connections
available_conns.append(conn)
# Track HTTP/2 connection capacity
if self._http2:
# Get the actual available stream count from the connection
http2_conn_stream_capacity[conn] = (
conn.get_available_stream_capacity()
)
elif conn.is_idle():
# Idle but not available (this shouldn't happen, but handle it by closing the connection)
closing_conns.append(conn)
else:
# Occupied connections
occupied_conns.append(conn)

# Calculate how many new connections we can create
total_existing_connections = (
len(available_conns) + len(occupied_conns) + len(closing_conns)
)
new_conns_remaining_count = self._max_connections - total_existing_connections

# Phase 2: Assign queued requests to connections
for pool_request in self._requests:
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# Try to find an available connection that can handle this request
# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 1. There is an existing available connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
connection = available_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

return closing_connections
# 3. We can close an idle connection and then create a new connection to handle the request.

assigned = False

# Case 1: try to use an available connection
for i in range(len(available_conns) - 1, -1, -1):
# Loop in reverse order since popping an element from the end of the list is O(1),
# whereas popping from the beginning of the list is O(n)

conn = available_conns[i]
if conn.can_handle_request(origin):
# Assign the request to this connection
pool_request.assign_to_connection(conn)

# Handle HTTP/1.1 vs HTTP/2 differently
if self._http2 and conn in http2_conn_stream_capacity:
# HTTP/2: Decrement available capacity
http2_conn_stream_capacity[conn] -= 1
if http2_conn_stream_capacity[conn] <= 0:
# Move to occupied if no more capacity
available_conns.pop(i)
occupied_conns.append(conn)
del http2_conn_stream_capacity[conn]
else:
# HTTP/1.1: Move to occupied immediately
available_conns.pop(i)
occupied_conns.append(conn)

assigned = True
break

if assigned:
continue

# Case 2: Try to create a new connection
if new_conns_remaining_count > 0:
conn = self.create_connection(origin)
pool_request.assign_to_connection(conn)
# New connections go to occupied (we don't know if HTTP/1.1 or HTTP/2 yet, so assume no multiplexing)
occupied_conns.append(conn)
new_conns_remaining_count -= 1
continue

# Case 3, last resort: evict an idle connection and create a new connection
assigned = False
for i in range(len(available_conns) - 1, -1, -1):
# Loop in reverse order since popping an element from the end of the list is O(1),
# whereas popping from the beginning of the list is O(n)
conn = available_conns[i]
if conn.is_idle():
evicted_conn = available_conns.pop(i)
closing_conns.append(evicted_conn)
# Create new connection for the required origin
conn = self.create_connection(origin)
pool_request.assign_to_connection(conn)
occupied_conns.append(conn)
assigned = True
break

# All attempts failed: all connections are occupied and we can't create a new one
if not assigned:
# Break out of the loop since no more queued requests can be serviced at this time
break

# Phase 3: Enforce self._max_keepalive_connections by closing excess idle connections
#
# Only run keepalive enforcement if len(available_conns) > max_keepalive.
# Since idle connections are a subset of available connections, if there are
# fewer available connections than the limit, we cannot possibly violate it.
if len(available_conns) > self._max_keepalive_connections:
keepalive_available_conns: list[AsyncConnectionInterface] = []
n_idle_conns_kept = 0

for conn in available_conns:
if conn.is_idle():
if n_idle_conns_kept >= self._max_keepalive_connections:
# We've already kept the maximum allowed idle connections, close this one
closing_conns.append(conn)
else:
# Keep this idle connection as we're still under the limit
keepalive_available_conns.append(conn)
n_idle_conns_kept += 1
else:
# This is an available but not idle connection (active HTTP/2 with capacity)
# Always keep these as they don't count against keepalive limits
keepalive_available_conns.append(conn)

# Replace available_conns with the filtered list (excess idle connections removed)
available_conns = keepalive_available_conns

# Rebuild self._connections from all buckets
self._connections = available_conns + occupied_conns

return closing_conns

async def _close_connections(self, closing: list[AsyncConnectionInterface]) -> None:
# Close connections which have been removed from the pool.
Expand Down
7 changes: 7 additions & 0 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ def is_idle(self) -> bool:
def is_closed(self) -> bool:
return self._state == HTTPConnectionState.CLOSED

def get_available_stream_capacity(self) -> int:
"""
For HTTP/1.1, return 1 if the connection is idle (can accept a request),
0 otherwise (connection is busy).
"""
return 1 if self._state == HTTPConnectionState.IDLE else 0

def info(self) -> str:
origin = str(self._origin)
return (
Expand Down
19 changes: 15 additions & 4 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def __init__(
self._used_all_stream_ids = False
self._connection_error = False

# self._max_streams contains the maximum number of concurrent requests that this connection can handle
# Initially start with just 1 until the remote server provides its max_concurrent_streams value
self._max_streams = 1
self._concurrent_streams = 0 # Tracks currently active requests.

# Mapping from stream ID to response stream events.
self._events: dict[
int,
Expand Down Expand Up @@ -116,10 +121,6 @@ async def handle_async_request(self, request: Request) -> Response:

self._sent_connection_init = True

# Initially start with just 1 until the remote server provides
# its max_concurrent_streams value
self._max_streams = 1

local_settings_max_streams = (
self._h2_state.local_settings.max_concurrent_streams
)
Expand All @@ -129,6 +130,7 @@ async def handle_async_request(self, request: Request) -> Response:
await self._max_streams_semaphore.acquire()

await self._max_streams_semaphore.acquire()
self._concurrent_streams += 1

try:
stream_id = self._h2_state.get_next_available_stream_id()
Expand Down Expand Up @@ -408,6 +410,7 @@ async def _receive_remote_settings_change(

async def _response_closed(self, stream_id: int) -> None:
await self._max_streams_semaphore.release()
self._concurrent_streams -= 1
del self._events[stream_id]
async with self._state_lock:
if self._connection_terminated and not self._events:
Expand Down Expand Up @@ -529,6 +532,14 @@ def is_idle(self) -> bool:
def is_closed(self) -> bool:
return self._state == HTTPConnectionState.CLOSED

def get_available_stream_capacity(self) -> int:
"""
Return the number of additional streams that can be handled by this connection.
This is useful for determining how many more requests can be sent on this HTTP/2 connection.
Uses the actual SETTINGS_MAX_CONCURRENT_STREAMS negotiated with the server.
"""
return self._max_streams - self._concurrent_streams

def info(self) -> str:
origin = str(self._origin)
return (
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,8 @@ def is_idle(self) -> bool:
def is_closed(self) -> bool:
return self._connection.is_closed()

def get_available_stream_capacity(self) -> int:
return self._connection.get_available_stream_capacity()

def __repr__(self) -> str:
return f"<{self.__class__.__name__} [{self.info()}]>"
9 changes: 9 additions & 0 deletions httpcore/_async/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ def is_closed(self) -> bool:
returned to the connection pool or not.
"""
raise NotImplementedError() # pragma: nocover

def get_available_stream_capacity(self) -> int:
"""
Return the number of additional streams that can be handled by this connection.

For HTTP/1.1 connections, this is 1 if the connection is idle, 0 otherwise.
For HTTP/2 connections, this is the number of available concurrent streams.
"""
raise NotImplementedError() # pragma: nocover
5 changes: 5 additions & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ def is_closed(self) -> bool:
return self._connect_failed
return self._connection.is_closed()

def get_available_stream_capacity(self) -> int:
if self._connection is None:
return 1
return self._connection.get_available_stream_capacity()

def info(self) -> str:
if self._connection is None:
return "CONNECTION FAILED" if self._connect_failed else "CONNECTING"
Expand Down
Loading