Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 41 additions & 48 deletions src/collector_node/collector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
```

"""
# pylint: disable=C0412
# pylint: disable=C0412,E1121

import argparse
import asyncio
Expand All @@ -35,38 +35,34 @@
import websocket
from simple_sign.sign import sign_with_key

CNT_ENABLED: Final[bool] = True
CNT_ENABLED: Final[bool] = False


# Import config.
try:
import config
import feed_helper
import flock
from cnt_collector_node.helper_functions import check_tokens_pair
from cnt_collector_node.pairs import DEX_PAIRS
from version import get_version
except ModuleNotFoundError:
try:
from cnt_collector_node.helper_functions import check_tokens_pair
from cnt_collector_node.pairs import DEX_PAIRS

from collector_node import config, feed_helper, flock
from collector_node.version import get_version
except ModuleNotFoundError:
from src.collector_node import config, feed_helper, flock
from src.collector_node.version import get_version

try:
from src.cnt_collector_node.helper_functions import check_tokens_pair
from src.cnt_collector_node.pairs import DEX_PAIRS
except ModuleNotFoundError:
CNT_ENABLED = False
try:
# Import CNT related config. IMPORTANT: keep this separate for ITN
# testing or any config not making use of DEX config.
from src.cnt_collector_node.helper_functions import check_tokens_pair
from src.cnt_collector_node.pairs import DEX_PAIRS
except ModuleNotFoundError:
CNT_ENABLED = False


sys.dont_write_bytecode = True

logger = logging.getLogger(__name__)

logging.basicConfig(
format="%(asctime)-15s %(levelname)s :: %(filename)s:%(lineno)s:%(funcName)s() :: %(message)s",
Expand All @@ -77,7 +73,10 @@
# Default to UTC time.
logging.Formatter.converter = time.gmtime

# pylint: disable=E1121
logger = logging.getLogger(__name__)

# Time to sleep in-between web-socket requests.
SLEEP_TIME: Final[float] = 0.1


async def read_identity() -> dict:
Expand Down Expand Up @@ -156,15 +155,18 @@ async def fetch_dex_feeds(feeds: list, identity: dict) -> list:
return await retrieve_cnt(pairs, identity)


async def fetch_cex_data(feed: str) -> dict:
async def fetch_cex_data(feeds: str) -> dict:
"""Fetch data from the collector app using the subprocess command."""
logger.debug("fetching cex feeds using goder: %s ('%s')", config.GOFER, feed)
logger.debug("fetching cex feeds using gofer: %s ('%s')", config.GOFER, len(feeds))
stdout = []
try:
ps_out = subprocess.run(
[
config.GOFER,
"data",
feed,
]
+ feeds
+ [
"-o",
"orcfax",
],
Expand All @@ -181,22 +183,23 @@ async def fetch_cex_data(feed: str) -> dict:
logger.error("json decode failed: %s", err)
return {}
logger.info("stderr: %s", stderr)
return stdout.get(feed)
return stdout


async def fetch_cex_feeds(feeds: list[str]) -> AsyncGenerator:
"""Fetch results from the collector software and send them to the
validator.
"""
logger.debug("fetching cex feeds")
for feed in random.sample(feeds, len(feeds)):
logger.info("feed: %s", feed)
res = await fetch_cex_data(feed=feed)
if not res:
logger.error("cannot retrieve data for: '%s'", feed)
continue
logger.debug("collecting cex data, yielding")
yield res
res = await fetch_cex_data(feeds=feeds)
if len(res.keys()) < len(feeds):
logger.error(
"cannot retrieve data for: '%s'",
list(set(feeds).difference(list(res.keys()))),
)
data_to_send = []
for item in res.values():
data_to_send.append(json.dumps(item))
return data_to_send


def _return_ca_ssl_context():
Expand All @@ -215,25 +218,17 @@ async def sign_message(data_to_send: dict):

async def send_to_ws(validator_websocket, data_to_send: dict):
"""Send data to a websocket."""
logger.debug("attempting to send to websocket")
id_ = data_to_send["message"]["identity"]["node_id"]
timestamp = data_to_send["message"]["timestamp"]
feed = data_to_send["message"]["feed"]
logger.info(
"sending message '%s' from id: %s with timestamp: %s", feed, id_, timestamp
)
logger.info("sending message containing; '%s' datum", len(data_to_send))
data_to_send = await sign_message(json.dumps(data_to_send))
await validator_websocket.send_str(data_to_send)
try:
# `wait_for` exits early if necessary to avoid the validator
# swallowing this message without return so we can continue onto the next.
msg = await asyncio.wait_for(validator_websocket.receive(), 10)
if "ERROR" in msg:
logger.error("websocket response: %s (%s)", msg, feed)
logger.error("websocket response: '%s'", msg)
return
logger.info("websocket response: %s (%s)", msg, feed)
logger.info("websocket response: '%s'", msg)
except asyncio.exceptions.TimeoutError as err:
logger.error("websocket wait_for resp timeout for feed '%s' ('%s')", feed, err)
logger.error("websocket wait_for resp timeout for feeds: '%s'", err)
return


Expand Down Expand Up @@ -267,13 +262,11 @@ async def send_data_to_validator(
timeout=120,
) as validator_websocket:
try:
sleep_time = 0.1
async for data_to_send in data_cex:
logger.debug(
"sending to web-socket, then sleeping for '%ss'", sleep_time
)
await send_to_ws(validator_websocket, data_to_send)
time.sleep(sleep_time)
logger.debug(
"sending to web-socket, then sleeping for '%ss'", SLEEP_TIME
)
await send_to_ws(validator_websocket, data_cex)
time.sleep(SLEEP_TIME)
if not CNT_ENABLED:
logger.debug(
"cnt collection is not enabled nothing to send to web-socket"
Expand All @@ -284,7 +277,7 @@ async def send_data_to_validator(
if not data_to_send:
continue
await send_to_ws(validator_websocket, data_to_send)
time.sleep(sleep_time)
time.sleep(SLEEP_TIME)
except aiohttp.client_exceptions.ServerDisconnectedError as err:
logger.error("connection closed unexpectedly: %s", err)
except aiohttp.client_exceptions.ConnectionTimeoutError as err:
Expand Down Expand Up @@ -315,7 +308,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None:
logger.debug("len cex feeds: '%s'", len(cex_feeds))
logger.debug("len dex feeds: '%s'", len(dex_feeds))

data_cex = fetch_cex_feeds(cex_feeds)
data_cex = await fetch_cex_feeds(cex_feeds)
data_dex = await collect_dex(dex_feeds, identity)

id_ = identity["node_id"]
Expand Down