From c7152ed1382771bf8ae1eca8e2f3de4456f9f30f Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Wed, 4 Jun 2025 17:52:36 +0200 Subject: [PATCH 1/2] Batch CEX collector messages --- src/collector_node/collector_node.py | 72 +++++++++++++--------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index fd06cb1..fb0dedc 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -13,7 +13,7 @@ ``` """ -# pylint: disable=C0412 +# pylint: disable=C0412,E1121 import argparse import asyncio @@ -35,7 +35,7 @@ import websocket from simple_sign.sign import sign_with_key -CNT_ENABLED: Final[bool] = True +CNT_ENABLED: Final[bool] = False # Import config. @@ -66,7 +66,6 @@ sys.dont_write_bytecode = True -logger = logging.getLogger(__name__) logging.basicConfig( format="%(asctime)-15s %(levelname)s :: %(filename)s:%(lineno)s:%(funcName)s() :: %(message)s", @@ -77,7 +76,10 @@ # Default to UTC time. logging.Formatter.converter = time.gmtime -# pylint: disable=E1121 +logger = logging.getLogger(__name__) + +# Time to sleep in-between web-socket requests. +SLEEP_TIME: Final[float] = 0.1 async def read_identity() -> dict: @@ -156,15 +158,18 @@ async def fetch_dex_feeds(feeds: list, identity: dict) -> list: return await retrieve_cnt(pairs, identity) -async def fetch_cex_data(feed: str) -> dict: +async def fetch_cex_data(feeds: str) -> dict: """Fetch data from the collector app using the subprocess command.""" - logger.debug("fetching cex feeds using goder: %s ('%s')", config.GOFER, feed) + logger.debug("fetching cex feeds using gofer: %s ('%s')", config.GOFER, len(feeds)) + stdout = [] try: ps_out = subprocess.run( [ config.GOFER, "data", - feed, + ] + + feeds + + [ "-o", "orcfax", ], @@ -181,22 +186,23 @@ async def fetch_cex_data(feed: str) -> dict: logger.error("json decode failed: %s", err) return {} logger.info("stderr: %s", stderr) - return stdout.get(feed) + return stdout async def fetch_cex_feeds(feeds: list[str]) -> AsyncGenerator: """Fetch results from the collector software and send them to the validator. """ - logger.debug("fetching cex feeds") - for feed in random.sample(feeds, len(feeds)): - logger.info("feed: %s", feed) - res = await fetch_cex_data(feed=feed) - if not res: - logger.error("cannot retrieve data for: '%s'", feed) - continue - logger.debug("collecting cex data, yielding") - yield res + res = await fetch_cex_data(feeds=feeds) + if len(res.keys()) < len(feeds): + logger.error( + "cannot retrieve data for: '%s'", + list(set(feeds).difference(list(res.keys()))), + ) + data_to_send = [] + for item in res.values(): + data_to_send.append(json.dumps(item)) + return data_to_send def _return_ca_ssl_context(): @@ -215,25 +221,17 @@ async def sign_message(data_to_send: dict): async def send_to_ws(validator_websocket, data_to_send: dict): """Send data to a websocket.""" - logger.debug("attempting to send to websocket") - id_ = data_to_send["message"]["identity"]["node_id"] - timestamp = data_to_send["message"]["timestamp"] - feed = data_to_send["message"]["feed"] - logger.info( - "sending message '%s' from id: %s with timestamp: %s", feed, id_, timestamp - ) + logger.info("sending message containing; '%s' datum", len(data_to_send)) data_to_send = await sign_message(json.dumps(data_to_send)) await validator_websocket.send_str(data_to_send) try: - # `wait_for` exits early if necessary to avoid the validator - # swallowing this message without return so we can continue onto the next. msg = await asyncio.wait_for(validator_websocket.receive(), 10) if "ERROR" in msg: - logger.error("websocket response: %s (%s)", msg, feed) + logger.error("websocket response: '%s'", msg) return - logger.info("websocket response: %s (%s)", msg, feed) + logger.info("websocket response: '%s'", msg) except asyncio.exceptions.TimeoutError as err: - logger.error("websocket wait_for resp timeout for feed '%s' ('%s')", feed, err) + logger.error("websocket wait_for resp timeout for feeds: '%s'", err) return @@ -267,13 +265,11 @@ async def send_data_to_validator( timeout=120, ) as validator_websocket: try: - sleep_time = 0.1 - async for data_to_send in data_cex: - logger.debug( - "sending to web-socket, then sleeping for '%ss'", sleep_time - ) - await send_to_ws(validator_websocket, data_to_send) - time.sleep(sleep_time) + logger.debug( + "sending to web-socket, then sleeping for '%ss'", SLEEP_TIME + ) + await send_to_ws(validator_websocket, data_cex) + time.sleep(SLEEP_TIME) if not CNT_ENABLED: logger.debug( "cnt collection is not enabled nothing to send to web-socket" @@ -284,7 +280,7 @@ async def send_data_to_validator( if not data_to_send: continue await send_to_ws(validator_websocket, data_to_send) - time.sleep(sleep_time) + time.sleep(SLEEP_TIME) except aiohttp.client_exceptions.ServerDisconnectedError as err: logger.error("connection closed unexpectedly: %s", err) except aiohttp.client_exceptions.ConnectionTimeoutError as err: @@ -315,7 +311,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None: logger.debug("len cex feeds: '%s'", len(cex_feeds)) logger.debug("len dex feeds: '%s'", len(dex_feeds)) - data_cex = fetch_cex_feeds(cex_feeds) + data_cex = await fetch_cex_feeds(cex_feeds) data_dex = await collect_dex(dex_feeds, identity) id_ = identity["node_id"] From c59ff408a933a719c39a79139748d21611bfc874 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 9 Jun 2025 18:20:12 +0200 Subject: [PATCH 2/2] Fix imports rerquired for the ITN --- src/collector_node/collector_node.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index fb0dedc..f76f004 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -43,25 +43,22 @@ import config import feed_helper import flock - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS from version import get_version except ModuleNotFoundError: try: - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS - from collector_node import config, feed_helper, flock from collector_node.version import get_version except ModuleNotFoundError: from src.collector_node import config, feed_helper, flock from src.collector_node.version import get_version - try: - from src.cnt_collector_node.helper_functions import check_tokens_pair - from src.cnt_collector_node.pairs import DEX_PAIRS - except ModuleNotFoundError: - CNT_ENABLED = False +try: + # Import CNT related config. IMPORTANT: keep this separate for ITN + # testing or any config not making use of DEX config. + from src.cnt_collector_node.helper_functions import check_tokens_pair + from src.cnt_collector_node.pairs import DEX_PAIRS +except ModuleNotFoundError: + CNT_ENABLED = False sys.dont_write_bytecode = True