Skip to content

Commit

Permalink
Enable use of Kupo
Browse files Browse the repository at this point in the history
To optionally use Kupo, set KUPO_URL, otherwise it will default to
Ogmios. We also reorganize the code a little to provide options for
debugging in future.
  • Loading branch information
ross-spencer committed Jan 21, 2025
1 parent cb79b21 commit f749ef3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
25 changes: 19 additions & 6 deletions src/collector_node/collector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,21 @@ async def retrieve_cnt(requested: list, identity: dict) -> list:
logger.info("connecting to ogmios")
ogmios_ver = config.OGMIOS_VERSION
ogmios_ws: websocket.WebSocket = websocket.create_connection(config.OGMIOS_URL)
ogmios_context = {
use_kupo = False
if config.KUPO_URL:
use_kupo = True
kupo_url = config.KUPO_URL
context = {
"ogmios_ws": ogmios_ws,
"ogmios_ver": ogmios_ver,
"logger": logger,
"use_kupo": use_kupo,
"kupo_url": kupo_url,
}
for tokens_pair in requested:
message, timestamp = await check_tokens_pair(
database_context,
ogmios_context,
context,
identity,
tokens_pair,
)
Expand Down Expand Up @@ -225,6 +231,16 @@ async def send_to_ws(validator_websocket, data_to_send: dict):
return


async def collect_dex(dex_feeds: list, identity: dict) -> list:
"""Collect dex data and provide a way to exit gracefully if the
configuration is incorrect.
"""
data_dex = []
if CNT_ENABLED:
data_dex = await fetch_dex_feeds(dex_feeds, identity)
return data_dex


async def fetch_and_send(feeds: list, identity: dict) -> None:
"""Fetch feed data and send it to a validator websocket."""

Expand All @@ -244,10 +260,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None:
logger.debug("len dex feeds: '%s'", len(dex_feeds))

data_cex = fetch_cex_feeds(cex_feeds)
data_dex = []
if CNT_ENABLED:
logger.debug("cnt collection is not enabled")
data_dex = await fetch_dex_feeds(dex_feeds, identity)
data_dex = await collect_dex(dex_feeds, identity)

id_ = identity["node_id"]
validator_connection = f"{config.VALIDATOR_URI}/{id_}/"
Expand Down
22 changes: 20 additions & 2 deletions src/collector_node/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,32 @@
"gofer location needs to be configured, e.g. `export GOFER=/path/to/gofer`"
)
sys.exit(1)

# CNT configuration.
#
# Within the app CNT_ENABLED controls whether we go to the dexes. Ogmios
# needs configuring at a minimum. Kupo can be set to improve performance
# on database startup.

OGMIOS_URL = None
try:
OGMIOS_URL: Final[str] = os.environ["OGMIOS_URL"]
OGMIOS_URL = os.environ["OGMIOS_URL"]
logger.info("ogmios websocket: %s", OGMIOS_URL)
except KeyError:
logger.error(
"oogmios websocket url needs to be set, e.g. `export OGMIOS_URL=ws://<ip-address>`"
"ogmios websocket url needs to be set, e.g. `export OGMIOS_URL=ws://<ip-address>`"
)
sys.exit(1)

KUPO_URL = None
try:
KUPO_URL = os.environ["KUPO_URL"]
logger.info("kupo url: %s", KUPO_URL)
except KeyError:
KUPO_URL = None
logger.info(
"kupo url can optionally be set, e.g. `export KUPO_URL=http://<ip-address>`"
)

OGMIOS_VERSION: Final[str] = os.environ.get("OGMIOS_VERSION", "v6")
logger.info("ogmios version: %s", OGMIOS_VERSION)

0 comments on commit f749ef3

Please sign in to comment.