Skip to content

Commit

Permalink
Change trace events
Browse files Browse the repository at this point in the history
  • Loading branch information
karpetrosyan committed Jun 10, 2023
1 parent e5755be commit 8fa39ed
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 42 deletions.
56 changes: 56 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import logging
import ssl
import sys
from types import TracebackType
from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type

import httpcore

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock
from .._trace import atrace
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -161,8 +167,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
await atrace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -171,6 +183,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
await connection.aclose()
await atrace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -180,7 +198,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
await atrace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
await atrace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -192,6 +215,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -200,6 +226,9 @@ async def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
await connection.aclose()
await atrace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand All @@ -222,6 +251,7 @@ async def handle_async_request(self, request: Request) -> Response:
status = RequestStatus(request)

async with self._pool_lock:
await atrace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)
Expand All @@ -235,9 +265,22 @@ async def handle_async_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, httpcore.PoolTimeout):
await atrace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
async with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -254,6 +297,13 @@ async def handle_async_request(self, request: Request) -> Response:
async with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
await atrace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
await self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -285,6 +335,12 @@ async def response_closed(self, status: RequestStatus) -> None:
async with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
await atrace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
Expand Down
56 changes: 56 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import logging
import ssl
import sys
from types import TracebackType
from typing import Iterable, Iterator, Iterable, List, Optional, Type

import httpcore

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock
from .._trace import trace
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

logger = logging.getLogger("httpcore.connection_pool")


class RequestStatus:
def __init__(self, request: Request):
Expand Down Expand Up @@ -161,8 +167,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
# Reuse an existing connection if one is currently available.
for idx, connection in enumerate(self._pool):
if connection.can_handle_request(origin) and connection.is_available():
kwargs = {"connection": connection, "request": status.request}
trace("reuse_connection", logger, status.request, kwargs)
self._pool.pop(idx)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace(
"assign_request_to_connection", logger, status.request, kwargs
)
status.set_connection(connection)
return True

Expand All @@ -171,6 +183,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle():
connection.close()
trace(
"remove_connection",
logger,
None,
kwargs={"connection": connection},
)
self._pool.pop(idx)
break

Expand All @@ -180,7 +198,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:

# Otherwise create a new connection.
connection = self.create_connection(origin)
trace(
"add_connection", logger, status.request, kwargs={"connection": connection}
)
self._pool.insert(0, connection)
kwargs = {"request": status.request, "connection": connection}
trace("assign_request_to_connection", logger, status.request, kwargs)
status.set_connection(connection)
return True

Expand All @@ -192,6 +215,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.has_expired():
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)

# If the pool size exceeds the maximum number of allowed keep-alive connections,
Expand All @@ -200,6 +226,9 @@ def _close_expired_connections(self) -> None:
for idx, connection in reversed(list(enumerate(self._pool))):
if connection.is_idle() and pool_size > self._max_keepalive_connections:
connection.close()
trace(
"remove_connection", logger, None, kwargs={"connection": connection}
)
self._pool.pop(idx)
pool_size -= 1

Expand All @@ -222,6 +251,7 @@ def handle_request(self, request: Request) -> Response:
status = RequestStatus(request)

with self._pool_lock:
trace("add_request", logger, request, {"request": status.request})
self._requests.append(status)
self._close_expired_connections()
self._attempt_to_acquire_connection(status)
Expand All @@ -235,9 +265,22 @@ def handle_request(self, request: Request) -> Response:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
if isinstance(exc, httpcore.PoolTimeout):
trace(
"timeout_waiting_for_connection",
logger,
status.request,
{"request": status.request},
)
with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)
raise exc

Expand All @@ -254,6 +297,13 @@ def handle_request(self, request: Request) -> Response:
with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
kwargs = {"request": status.request, "connection": connection}
trace(
"unassign_request_from_connection",
logger,
status.request,
kwargs,
)
status.unset_connection()
self._attempt_to_acquire_connection(status)
except BaseException as exc:
Expand Down Expand Up @@ -285,6 +335,12 @@ def response_closed(self, status: RequestStatus) -> None:
with self._pool_lock:
# Update the state of the connection pool.
if status in self._requests:
trace(
"remove_request",
logger,
status.request,
{"request": status.request},
)
self._requests.remove(status)

if connection.is_closed() and connection in self._pool:
Expand Down
Loading

0 comments on commit 8fa39ed

Please sign in to comment.