Skip to content

Commit bc1e511

Browse files
committed
Batch CEX collector messages
1 parent 9b1a98a commit bc1e511

File tree

1 file changed

+36
-41
lines changed

1 file changed

+36
-41
lines changed

src/collector_node/collector_node.py

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
```
1414
1515
"""
16-
# pylint: disable=C0412
16+
# pylint: disable=C0412,E1121
1717

1818
import argparse
1919
import asyncio
@@ -35,22 +35,22 @@
3535
import websocket
3636
from simple_sign.sign import sign_with_key
3737

38-
CNT_ENABLED: Final[bool] = True
38+
CNT_ENABLED: Final[bool] = False
3939

4040

4141
# Import config.
4242
try:
4343
import config
4444
import feed_helper
4545
import flock
46-
from version import get_version
47-
4846
from cnt_collector_node.helper_functions import check_tokens_pair
4947
from cnt_collector_node.pairs import DEX_PAIRS
48+
from version import get_version
5049
except ModuleNotFoundError:
5150
try:
5251
from cnt_collector_node.helper_functions import check_tokens_pair
5352
from cnt_collector_node.pairs import DEX_PAIRS
53+
5454
from collector_node import config, feed_helper, flock
5555
from collector_node.version import get_version
5656
except ModuleNotFoundError:
@@ -66,7 +66,6 @@
6666

6767
sys.dont_write_bytecode = True
6868

69-
logger = logging.getLogger(__name__)
7069

7170
logging.basicConfig(
7271
format="%(asctime)-15s %(levelname)s :: %(filename)s:%(lineno)s:%(funcName)s() :: %(message)s",
@@ -77,7 +76,10 @@
7776
# Default to UTC time.
7877
logging.Formatter.converter = time.gmtime
7978

80-
# pylint: disable=E1121
79+
logger = logging.getLogger(__name__)
80+
81+
# Time to sleep in-between web-socket requests.
82+
SLEEP_TIME: Final[float] = 0.1
8183

8284

8385
async def read_identity() -> dict:
@@ -156,15 +158,18 @@ async def fetch_dex_feeds(feeds: list, identity: dict) -> list:
156158
return await retrieve_cnt(pairs, identity)
157159

158160

159-
async def fetch_cex_data(feed: str) -> dict:
161+
async def fetch_cex_data(feeds: str) -> dict:
160162
"""Fetch data from the collector app using the subprocess command."""
161-
logger.debug("fetching cex feeds using goder: %s ('%s')", config.GOFER, feed)
163+
logger.debug("fetching cex feeds using gofer: %s ('%s')", config.GOFER, len(feeds))
164+
stdout = []
162165
try:
163166
ps_out = subprocess.run(
164167
[
165168
config.GOFER,
166169
"data",
167-
feed,
170+
]
171+
+ feeds
172+
+ [
168173
"-o",
169174
"orcfax",
170175
],
@@ -181,22 +186,23 @@ async def fetch_cex_data(feed: str) -> dict:
181186
logger.error("json decode failed: %s", err)
182187
return {}
183188
logger.info("stderr: %s", stderr)
184-
return stdout.get(feed)
189+
return stdout
185190

186191

187192
async def fetch_cex_feeds(feeds: list[str]) -> AsyncGenerator:
188193
"""Fetch results from the collector software and send them to the
189194
validator.
190195
"""
191-
logger.debug("fetching cex feeds")
192-
for feed in random.sample(feeds, len(feeds)):
193-
logger.info("feed: %s", feed)
194-
res = await fetch_cex_data(feed=feed)
195-
if not res:
196-
logger.error("cannot retrieve data for: '%s'", feed)
197-
continue
198-
logger.debug("collecting cex data, yielding")
199-
yield res
196+
res = await fetch_cex_data(feeds=feeds)
197+
if len(res.keys()) < len(feeds):
198+
logger.error(
199+
"cannot retrieve data for: '%s'",
200+
list(set(feeds).difference(list(res.keys()))),
201+
)
202+
data_to_send = []
203+
for item in res.values():
204+
data_to_send.append(json.dumps(item))
205+
return data_to_send
200206

201207

202208
def _return_ca_ssl_context():
@@ -215,25 +221,17 @@ async def sign_message(data_to_send: dict):
215221

216222
async def send_to_ws(validator_websocket, data_to_send: dict):
217223
"""Send data to a websocket."""
218-
logger.debug("attempting to send to websocket")
219-
id_ = data_to_send["message"]["identity"]["node_id"]
220-
timestamp = data_to_send["message"]["timestamp"]
221-
feed = data_to_send["message"]["feed"]
222-
logger.info(
223-
"sending message '%s' from id: %s with timestamp: %s", feed, id_, timestamp
224-
)
224+
logger.info("sending message containing; '%s' datum", len(data_to_send))
225225
data_to_send = await sign_message(json.dumps(data_to_send))
226226
await validator_websocket.send_str(data_to_send)
227227
try:
228-
# `wait_for` exits early if necessary to avoid the validator
229-
# swallowing this message without return so we can continue onto the next.
230228
msg = await asyncio.wait_for(validator_websocket.receive(), 10)
231229
if "ERROR" in msg:
232-
logger.error("websocket response: %s (%s)", msg, feed)
230+
logger.error("websocket response: '%s'", msg)
233231
return
234-
logger.info("websocket response: %s (%s)", msg, feed)
232+
logger.info("websocket response: '%s'", msg)
235233
except asyncio.exceptions.TimeoutError as err:
236-
logger.error("websocket wait_for resp timeout for feed '%s' ('%s')", feed, err)
234+
logger.error("websocket wait_for resp timeout for feeds: '%s'", err)
237235
return
238236

239237

@@ -267,13 +265,11 @@ async def send_data_to_validator(
267265
timeout=120,
268266
) as validator_websocket:
269267
try:
270-
sleep_time = 0.1
271-
async for data_to_send in data_cex:
272-
logger.debug(
273-
"sending to web-socket, then sleeping for '%ss'", sleep_time
274-
)
275-
await send_to_ws(validator_websocket, data_to_send)
276-
time.sleep(sleep_time)
268+
logger.debug(
269+
"sending to web-socket, then sleeping for '%ss'", SLEEP_TIME
270+
)
271+
await send_to_ws(validator_websocket, data_cex)
272+
time.sleep(SLEEP_TIME)
277273
if not CNT_ENABLED:
278274
logger.debug(
279275
"cnt collection is not enabled nothing to send to web-socket"
@@ -284,7 +280,7 @@ async def send_data_to_validator(
284280
if not data_to_send:
285281
continue
286282
await send_to_ws(validator_websocket, data_to_send)
287-
time.sleep(sleep_time)
283+
time.sleep(SLEEP_TIME)
288284
except aiohttp.client_exceptions.ServerDisconnectedError as err:
289285
logger.error("connection closed unexpectedly: %s", err)
290286
except aiohttp.client_exceptions.ConnectionTimeoutError as err:
@@ -297,7 +293,6 @@ async def send_data_to_validator(
297293
logger.error("problem connecting to the validator: %s", err)
298294

299295

300-
301296
async def fetch_and_send(feeds: list, identity: dict) -> None:
302297
"""Fetch feed data and send it to a validator websocket."""
303298

@@ -316,7 +311,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None:
316311
logger.debug("len cex feeds: '%s'", len(cex_feeds))
317312
logger.debug("len dex feeds: '%s'", len(dex_feeds))
318313

319-
data_cex = fetch_cex_feeds(cex_feeds)
314+
data_cex = await fetch_cex_feeds(cex_feeds)
320315
data_dex = await collect_dex(dex_feeds, identity)
321316

322317
id_ = identity["node_id"]

0 commit comments

Comments
 (0)