Skip to content

Commit

Permalink
wip: simplify ApiWrapperBase after gql update
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreemurro committed Jul 1, 2024
1 parent e880186 commit 511f468
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 93 deletions.
12 changes: 3 additions & 9 deletions esoraider_server/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from typing import Optional, Tuple

from blacksheep.server import Application
Expand Down Expand Up @@ -143,20 +142,15 @@ async def healthcheck():


async def connect_api(app: Application) -> None:
api = app.service_provider.get(ApiWrapper)
api = ApiWrapper()
await api.connect()


async def configure_background_tasks(app):
asyncio.get_event_loop().create_task(connect_api(app))
app.services.add_instance(api)


async def close_api(app: Application):
service = app.service_provider[ApiWrapper]
await service.close()


app.on_start += configure_background_tasks
app.on_start += connect_api
app.on_stop += close_api

app.services.add_instance(ApiWrapper())
92 changes: 8 additions & 84 deletions esoraider_server/esologs/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import asyncio
from typing import Optional

import backoff # type: ignore
from gql import Client # type: ignore
from gql.dsl import DSLSchema # type: ignore
from gql.transport.aiohttp import AIOHTTPTransport # type: ignore
from gql.transport.exceptions import ( # type: ignore
TransportClosed,
TransportQueryError,
)
from gql.transport.exceptions import TransportQueryError # type: ignore
from loguru import logger
from oauthlib.oauth2 import BackendApplicationClient # type: ignore
from requests_oauthlib import OAuth2Session # type: ignore
Expand All @@ -20,7 +13,6 @@


class ApiWrapperBase(object):
# https://github.com/graphql-python/gql/issues/179#issuecomment-749044193
def __init__(self) -> None:
self._token = self._auth()
self._transport = AIOHTTPTransport(
Expand All @@ -32,49 +24,25 @@ def __init__(self) -> None:
fetch_schema_from_transport=True,
)
self._session = None
self._connect_task = None

self._close_request_event: Optional[asyncio.Event] = None
self._reconnect_request_event: Optional[asyncio.Event] = None

self._connected_event: Optional[asyncio.Event] = None
self._closed_event: Optional[asyncio.Event] = None

self.ds: DSLSchema = None

async def connect(self):
self._close_request_event = asyncio.Event()
self._reconnect_request_event = asyncio.Event()

self._connected_event = asyncio.Event()
self._closed_event = asyncio.Event()

logger.info('Opening connection')
if self._connect_task:
logger.info('Already connected')
else:
self._connected_event.clear()
self._connect_task = asyncio.create_task(self._connection_loop())
await asyncio.wait_for(
self._connected_event.wait(), timeout=TIMEOUT,
)
self._session = await self._client.connect_async(reconnecting=True)
self.ds = DSLSchema(self._client.schema)
logger.info('Connected to API')

async def close(self):
logger.info('Disconnecting')
self._connect_task = None
self._closed_event.clear()
self._close_request_event.set()
await asyncio.wait_for(self._closed_event.wait(), timeout=TIMEOUT)
self._session = None
await self._client.close_async()
logger.info('Disconnected')

@backoff.on_exception(backoff.expo, Exception, max_tries=3)
async def execute(self, *args, **kwargs):
try:
answer = await self._session.execute(*args, **kwargs)
except TransportClosed:
self._reconnect_request_event.set()
raise
except TransportQueryError as ex:
logger.error(ex)
logger.exception("Couldn't get the log")
return ex

return answer
Expand All @@ -90,47 +58,3 @@ def _auth(self):
client_secret=CLIENT_SECRET,
)
return token['access_token']

@backoff.on_exception(backoff.expo, Exception, max_time=WAIT_FOR)
async def _connection_loop(self):
while True:
logger.info('Connecting to API')
try:
async with self._client as session:
self._session = session
self.ds = DSLSchema(self._client.schema)
logger.info('Connected to API')
self._connected_event.set()

# Wait for the close or reconnect event
self._close_request_event.clear()
self._reconnect_request_event.clear()

close_event_task = asyncio.create_task(
self._close_request_event.wait(),
)
reconnect_event_task = asyncio.create_task(
self._reconnect_request_event.wait(),
)

events = [close_event_task, reconnect_event_task]

done, pending = await asyncio.wait(
events, return_when=asyncio.FIRST_COMPLETED,
)

for task in pending:
task.cancel()

if close_event_task in done:
# If we received a closed event,
# then we go out of the loop
break

# If we received a reconnect event,
# then we disconnect and connect again
finally:
self._session = None
logger.info('Disconnected from API')
logger.info('Connection closed')
self._closed_event.set()

0 comments on commit 511f468

Please sign in to comment.