diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bec625341..6af351ec4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,6 +14,27 @@ on: jobs: + # test that we can generate a software distribution and install it + # thus avoid missing file issues after packaging. + sdist-linux: + name: 'sdist' + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Setup python + uses: actions/setup-python@v2 + with: + python-version: '3.10' + + - name: Build sdist + run: python setup.py sdist --formats=zip + + - name: Install sdist from .zips + run: python -m pip install dist/*.zip + testing: name: 'install + test-suite' runs-on: ubuntu-latest diff --git a/dockering/ib/docker-compose.yml b/dockering/ib/docker-compose.yml index f3a28d669..f8be56848 100644 --- a/dockering/ib/docker-compose.yml +++ b/dockering/ib/docker-compose.yml @@ -62,39 +62,39 @@ services: # - "127.0.0.1:4002:4002" # - "127.0.0.1:5900:5900" - ib_gw_live: - image: waytrade/ib-gateway:1012.2i - restart: always - network_mode: 'host' + # ib_gw_live: + # image: waytrade/ib-gateway:1012.2i + # restart: always + # network_mode: 'host' - volumes: - - type: bind - source: ./jts_live.ini - target: /root/jts/jts.ini - # don't let ibc clobber this file for - # the main reason of not having a stupid - # timezone set.. - read_only: true + # volumes: + # - type: bind + # source: ./jts_live.ini + # target: /root/jts/jts.ini + # # don't let ibc clobber this file for + # # the main reason of not having a stupid + # # timezone set.. + # read_only: true - # force our own ibc config - - type: bind - source: ./ibc.ini - target: /root/ibc/config.ini + # # force our own ibc config + # - type: bind + # source: ./ibc.ini + # target: /root/ibc/config.ini - # force our noop script - socat isn't needed in host mode. - - type: bind - source: ./fork_ports_delayed.sh - target: /root/scripts/fork_ports_delayed.sh + # # force our noop script - socat isn't needed in host mode. + # - type: bind + # source: ./fork_ports_delayed.sh + # target: /root/scripts/fork_ports_delayed.sh - # force our noop script - socat isn't needed in host mode. - - type: bind - source: ./run_x11_vnc.sh - target: /root/scripts/run_x11_vnc.sh - read_only: true + # # force our noop script - socat isn't needed in host mode. + # - type: bind + # source: ./run_x11_vnc.sh + # target: /root/scripts/run_x11_vnc.sh + # read_only: true - # NOTE: to fill these out, define an `.env` file in the same dir as - # this compose file which looks something like: - environment: - TRADING_MODE: 'live' - VNC_SERVER_PASSWORD: 'doggy' - VNC_SERVER_PORT: '3004' + # # NOTE: to fill these out, define an `.env` file in the same dir as + # # this compose file which looks something like: + # environment: + # TRADING_MODE: 'live' + # VNC_SERVER_PASSWORD: 'doggy' + # VNC_SERVER_PORT: '3004' diff --git a/piker/_daemon.py b/piker/_daemon.py index 6609338b3..d4ca7f212 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -35,12 +35,17 @@ _root_dname = 'pikerd' -_registry_host: str = '127.0.0.1' -_registry_port: int = 6116 -_registry_addr = ( - _registry_host, - _registry_port, +_default_registry_host: str = '127.0.0.1' +_default_registry_port: int = 6116 +_default_reg_addr: tuple[str, int] = ( + _default_registry_host, + _default_registry_port, ) + +# NOTE: this value is set as an actor-global once the first endpoint +# who is capable, spawns a `pikerd` service tree. +_registry_addr: tuple[str, int] | None = None + _tractor_kwargs: dict[str, Any] = { # use a different registry addr then tractor's default 'arbiter_addr': _registry_addr @@ -152,13 +157,20 @@ async def open_pikerd( ''' global _services + global _registry_addr + + if ( + _registry_addr is None + or registry_addr + ): + _registry_addr = registry_addr or _default_reg_addr # XXX: this may open a root actor as well async with ( tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=registry_addr or _registry_addr, + arbiter_addr=_registry_addr, name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, @@ -197,7 +209,7 @@ async def open_piker_runtime( # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, - registry_addr: None | tuple[str, int] = _registry_addr, + registry_addr: None | tuple[str, int] = None, ) -> tractor.Actor: ''' @@ -206,13 +218,20 @@ async def open_piker_runtime( ''' global _services + global _registry_addr + + if ( + _registry_addr is None + or registry_addr + ): + _registry_addr = registry_addr or _default_reg_addr # XXX: this may open a root actor as well async with ( tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=registry_addr, + arbiter_addr=_registry_addr, name=name, loglevel=loglevel, debug_mode=debug_mode, diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index eb4e735a8..95eb6f083 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -26,10 +26,21 @@ __brokers__ = [ 'binance', - 'questrade', - 'robinhood', 'ib', 'kraken', + + # broken but used to work + # 'questrade', + # 'robinhood', + + # TODO: we should get on these stat! + # alpaca + # wstrade + # iex + + # deribit + # kucoin + # bitso ] diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 9d2742bea..5ea7860a6 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -41,10 +41,15 @@ SymbolNotFound, DataUnavailable, ) -from ..log import get_logger, get_console_log -from ..data import ShmArray +from ..log import ( + get_logger, + get_console_log, +) from ..data.types import Struct -from ..data._web_bs import open_autorecon_ws, NoBsWs +from ..data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) log = get_logger(__name__) @@ -142,7 +147,9 @@ class OHLC(Struct): # convert datetime obj timestamp to unixtime in milliseconds -def binance_timestamp(when): +def binance_timestamp( + when: datetime +) -> int: return int((when.timestamp() * 1000) + (when.microsecond / 1000)) @@ -181,7 +188,7 @@ async def symbol_info( params = {} if sym is not None: - sym = sym.upper() + sym = sym.lower() params = {'symbol': sym} resp = await self._api( @@ -238,7 +245,7 @@ async def bars( ) -> dict: if end_dt is None: - end_dt = pendulum.now('UTC') + end_dt = pendulum.now('UTC').add(minutes=1) if start_dt is None: start_dt = end_dt.start_of( @@ -396,8 +403,8 @@ async def open_history_client( async def get_ohlc( timeframe: float, - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, @@ -412,25 +419,20 @@ async def get_ohlc( start_dt=start_dt, end_dt=end_dt, ) - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) - return array, start_dt, end_dt + times = array['time'] + if ( + end_dt is None + ): + inow = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.breakpoint() - yield get_ohlc, {'erlangs': 3, 'rate': 3} + start_dt = pendulum.from_timestamp(times[0]) + end_dt = pendulum.from_timestamp(times[-1]) + return array, start_dt, end_dt -async def backfill_bars( - sym: str, - shm: ShmArray, # type: ignore # noqa - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: - """Fill historical bars into shared mem / storage afap. - """ - with trio.CancelScope() as cs: - async with open_cached_client('binance') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) + yield get_ohlc, {'erlangs': 3, 'rate': 3} async def stream_quotes( @@ -465,7 +467,7 @@ async def stream_quotes( si = sym_infos[sym] = syminfo.to_dict() filters = {} for entry in syminfo.filters: - ftype = entry.pop('filterType') + ftype = entry['filterType'] filters[ftype] = entry # XXX: after manually inspecting the response format we diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index daf9a703e..52b269703 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -371,8 +371,8 @@ async def update_and_audit_msgs( else: entry = f'split_ratio = 1/{int(reverse_split_ratio)}' - raise ValueError( - # log.error( + # raise ValueError( + log.error( f'POSITION MISMATCH ib <-> piker ledger:\n' f'ib: {ibppmsg}\n' f'piker: {msg}\n' @@ -575,17 +575,18 @@ async def trades_dialogue( # if new trades are detected from the API, prepare # them for the ledger file and update the pptable. if api_to_ledger_entries: - trade_entries = api_to_ledger_entries[acctid] + trade_entries = api_to_ledger_entries.get(acctid) - # write ledger with all new trades **AFTER** - # we've updated the `pps.toml` from the - # original ledger state! (i.e. this is - # currently done on exit) - ledger.update(trade_entries) + if trade_entries: + # write ledger with all new trades **AFTER** + # we've updated the `pps.toml` from the + # original ledger state! (i.e. this is + # currently done on exit) + ledger.update(trade_entries) - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) # XXX: not sure exactly why it wouldn't be in # the updated output (maybe this is a bug?) but @@ -883,7 +884,7 @@ async def deliver_trade_events( # execdict.pop('acctNumber') fill_msg = BrokerdFill( - # should match the value returned from + # NOTE: should match the value returned from # `.submit_limit()` reqid=execu.orderId, time_ns=time.time_ns(), # cuz why not diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index e15e84623..442cbdeb2 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -1047,7 +1047,13 @@ async def open_symbol_search( stock_results = [] async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) + try: + results = await target + except tractor.trionics.Lagged: + print("IB SYM-SEARCH OVERRUN?!?") + return + + stock_results.extend(results) for i in range(10): with trio.move_on_after(3) as cs: diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1569429fe..ad512c08f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,6 +27,7 @@ from math import isnan from pprint import pformat import time +from types import ModuleType from typing import ( AsyncIterator, Any, @@ -48,6 +49,7 @@ ) from ..data.feed import ( Feed, + Flume, maybe_open_feed, ) from ..ui._notify import notify_from_ems_status_msg @@ -380,14 +382,15 @@ def get_subs( @acm async def maybe_open_brokerd_dialog( self, - feed: Feed, + brokermod: ModuleType, + portal: tractor.Portal, exec_mode: str, symbol: str, loglevel: str, ) -> None: - brokermod = feed.mod broker = brokermod.name + relay: TradesRelay = self.relays.get(broker) if ( relay @@ -426,7 +429,7 @@ async def maybe_open_brokerd_dialog( else: # open live brokerd trades endpoint - open_trades_endpoint = feed.portal.open_context( + open_trades_endpoint = portal.open_context( trades_endpoint, loglevel=loglevel, ) @@ -523,18 +526,22 @@ async def open_trade_relays( maybe_open_feed( [fqsn], loglevel=loglevel, - ) as (feed, quote_stream), + ) as feed, ): - brokermod = feed.mod + brokername, _, _ = unpack_fqsn(fqsn) + brokermod = feed.mods[brokername] broker = brokermod.name + portal = feed.portals[brokermod] # XXX: this should be initial price quote from target provider - first_quote: dict = feed.first_quotes[fqsn] + flume = feed.flumes[fqsn] + first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last'] async with self.maybe_open_brokerd_dialog( - feed=feed, + brokermod=brokermod, + portal=portal, exec_mode=exec_mode, symbol=symbol, loglevel=loglevel, @@ -547,14 +554,16 @@ async def open_trade_relays( clear_dark_triggers, self, relay.brokerd_stream, - quote_stream, + flume.stream, broker, fqsn, # form: ... book ) client_ready = trio.Event() - task_status.started((relay, feed, client_ready)) + task_status.started( + (relay, feed, client_ready) + ) # sync to the client side by waiting for the stream # connection setup before relaying any existing live @@ -1014,7 +1023,7 @@ async def process_client_order_cmds( brokerd_order_stream: tractor.MsgStream, fqsn: str, - feed: Feed, + flume: Flume, dark_book: DarkBook, router: Router, @@ -1212,7 +1221,7 @@ async def process_client_order_cmds( 'size': size, 'exec_mode': exec_mode, 'action': action, - 'brokers': brokers, # list + 'brokers': _, # list } if ( # "DARK" triggers # submit order to local EMS book and scan loop, @@ -1234,13 +1243,12 @@ async def process_client_order_cmds( # sometimes the real-time feed hasn't come up # so just pull from the latest history. if isnan(last): - last = feed.rt_shm.array[-1]['close'] + last = flume.rt_shm.array[-1]['close'] pred = mk_check(trigger_price, last, action) spread_slap: float = 5 - sym = fqsn.replace(f'.{brokers[0]}', '') - min_tick = feed.symbols[sym].tick_size + min_tick = flume.symbol.tick_size if action == 'buy': tickfilter = ('ask', 'last', 'trade') @@ -1453,11 +1461,12 @@ async def _emsd_main( # start inbound (from attached client) order request processing # main entrypoint, run here until cancelled. try: + flume = feed.flumes[fqsn] await process_client_order_cmds( client_stream, brokerd_stream, fqsn, - feed, + flume, dark_book, _router, ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 211a29fc4..33ca57612 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -578,7 +578,7 @@ async def trades_dialogue( ) # paper engine simulator clearing task - await simulate_fills(feed.stream, client) + await simulate_fills(feed.streams[broker], client) @asynccontextmanager diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 1386bc835..67647a83c 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -29,14 +29,13 @@ from ..brokers import get_brokermod from .._daemon import ( _tractor_kwargs, - _registry_host, - _registry_port, + _default_registry_host, + _default_registry_port, ) from .. import config log = get_logger('cli') -DEFAULT_BROKER = 'questrade' @click.command() @@ -77,8 +76,8 @@ def pikerd( reg_addr: None | tuple[str, int] = None if host or port: reg_addr = ( - host or _registry_host, - int(port) or _registry_port, + host or _default_registry_host, + int(port) or _default_registry_port, ) async def main(): @@ -118,7 +117,7 @@ async def main(): @click.group(context_settings=config._context_defaults) @click.option( '--brokers', '-b', - default=[DEFAULT_BROKER], + default=None, multiple=True, help='Broker backend to use' ) @@ -144,16 +143,19 @@ def cli( ctx.ensure_object(dict) - if len(brokers) == 1: - brokermods = [get_brokermod(brokers[0])] - else: - brokermods = [get_brokermod(broker) for broker in brokers] + if not brokers: + # (try to) load all (supposedly) supported data/broker backends + from piker.brokers import __brokers__ + brokers = __brokers__ + + brokermods = [get_brokermod(broker) for broker in brokers] + assert brokermods reg_addr: None | tuple[str, int] = None if host or port: reg_addr = ( - host or _registry_host, - int(port) or _registry_port, + host or _default_registry_host, + int(port) or _default_registry_port, ) ctx.obj.update({ diff --git a/piker/config.py b/piker/config.py index c7a7acc93..cb250386a 100644 --- a/piker/config.py +++ b/piker/config.py @@ -197,6 +197,9 @@ def load( ''' path = path or get_conf_path(conf_name) + if not os.path.isdir(_config_dir): + os.mkdir(_config_dir) + if not os.path.isfile(path): fn = _conf_fn_w_ext(conf_name) @@ -209,9 +212,9 @@ def load( # if one exists. if os.path.isfile(template): shutil.copyfile(template, path) - else: - with open(path, 'w'): - pass # touch + else: + with open(path, 'r'): + pass # touch it config = toml.load(path, **tomlkws) log.debug(f"Read config file {path}") diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 015de05e3..f8230bd71 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -22,7 +22,9 @@ from __future__ import annotations from collections import Counter import time -from typing import TYPE_CHECKING, Optional, Union +from typing import ( + TYPE_CHECKING, +) import tractor import trio @@ -147,7 +149,7 @@ async def increment_ohlc_buffer( async def broadcast( delay_s: int, - shm: Optional[ShmArray] = None, + shm: ShmArray | None = None, ) -> None: ''' @@ -241,6 +243,8 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: + # print(quotes) + # TODO: ``numba`` this! for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing @@ -304,29 +308,29 @@ async def sample_and_broadcast( volume, ) + # TODO: PUT THIS IN A ``_FeedsBus.broadcast()`` method! # XXX: we need to be very cautious here that no # context-channel is left lingering which doesn't have # a far end receiver actor-task. In such a case you can # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. + sub_key: str = broker_symbol.lower() subs: list[ tuple[ - Union[tractor.MsgStream, trio.MemorySendChannel], - tractor.Context, - Optional[float], # tick throttle in Hz + tractor.MsgStream | trio.MemorySendChannel, + float | None, # tick throttle in Hz ] - ] = bus._subscribers[broker_symbol.lower()] + ] = bus.get_subs(sub_key) # NOTE: by default the broker backend doesn't append # it's own "name" into the fqsn schema (but maybe it # should?) so we have to manually generate the correct # key here. - bsym = f'{broker_symbol}.{brokername}' + fqsn = f'{broker_symbol}.{brokername}' lags: int = 0 - for (stream, ctx, tick_throttle) in subs: - + for (stream, tick_throttle) in subs.copy(): try: with trio.move_on_after(0.2) as cs: if tick_throttle: @@ -334,47 +338,39 @@ async def sample_and_broadcast( # pushes to the ``uniform_rate_send()`` below. try: stream.send_nowait( - (bsym, quote) + (fqsn, quote) ) except trio.WouldBlock: + overruns[sub_key] += 1 + ctx = stream._ctx chan = ctx.chan - if ctx: - log.warning( - f'Feed overrun {bus.brokername} ->' - f'{chan.uid} !!!' - ) - else: - key = id(stream) - overruns[key] += 1 - log.warning( - f'Feed overrun {broker_symbol}' - '@{bus.brokername} -> ' - f'feed @ {tick_throttle} Hz' - ) - if overruns[key] > 6: - # TODO: should we check for the - # context being cancelled? this - # could happen but the - # channel-ipc-pipe is still up. - if not chan.connected(): - log.warning( - 'Dropping broken consumer:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) - await stream.aclose() - raise trio.BrokenResourceError - else: - log.warning( - 'Feed getting overrun bro!\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) - continue + log.warning( + f'Feed OVERRUN {sub_key}' + '@{bus.brokername} -> \n' + f'feed @ {chan.uid}\n' + f'throttle = {tick_throttle} Hz' + ) + + if overruns[sub_key] > 6: + # TODO: should we check for the + # context being cancelled? this + # could happen but the + # channel-ipc-pipe is still up. + if ( + not chan.connected() + or ctx._cancel_called + ): + log.warning( + 'Dropping broken consumer:\n' + f'{sub_key}:' + f'{ctx.cid}@{chan.uid}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: await stream.send( - {bsym: quote} + {fqsn: quote} ) if cs.cancelled_caught: @@ -402,14 +398,10 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. Doing it anyway though # since there seems to be some kinda race.. - try: - subs.remove((stream, tick_throttle)) - except ValueError: - log.error( - f'Stream was already removed from subs!?\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) + bus.remove_subs( + sub_key, + {(stream, tick_throttle)}, + ) # TODO: a less naive throttler, here's some snippets: diff --git a/piker/data/_source.py b/piker/data/_source.py index 73c218ca9..87ba74a3a 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -23,7 +23,8 @@ from bidict import bidict import numpy as np -from msgspec import Struct + +from .types import Struct # from numba import from_dtype @@ -217,6 +218,10 @@ def tokens(self) -> tuple[str]: else: return (key, broker) + @property + def fqsn(self) -> str: + return '.'.join(self.tokens()).lower() + def front_fqsn(self) -> str: ''' fqsn = "fully qualified symbol name" diff --git a/piker/data/feed.py b/piker/data/feed.py index ef4b3634e..93630a130 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,20 +21,19 @@ """ from __future__ import annotations +from collections import defaultdict from contextlib import asynccontextmanager as acm -from dataclasses import ( - dataclass, - field, -) from datetime import datetime from functools import partial from types import ModuleType from typing import ( Any, AsyncIterator, + AsyncContextManager, Callable, Optional, Awaitable, + Sequence, TYPE_CHECKING, Union, ) @@ -43,7 +42,10 @@ from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -from tractor.trionics import maybe_open_context +from tractor.trionics import ( + maybe_open_context, + gather_contexts, +) import pendulum import numpy as np @@ -58,6 +60,7 @@ maybe_open_shm_array, attach_shm_array, ShmArray, + _Token, _secs_in_day, ) from .ingest import get_ingestormod @@ -109,21 +112,16 @@ class _FeedsBus(Struct): task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - # XXX: so weird but, apparently without this being `._` private - # pydantic will complain about private `tractor.Context` instance - # vars (namely `._portal` and `._cancel_scope`) at import time. - # Reported this bug: - # https://github.com/samuelcolvin/pydantic/issues/2816 - _subscribers: dict[ + _subscribers: defaultdict[ str, - list[ + set[ tuple[ - Union[tractor.MsgStream, trio.MemorySendChannel], - tractor.Context, - Optional[float], # tick throttle in Hz + tractor.MsgStream | trio.MemorySendChannel, + # tractor.Context, + float | None, # tick throttle in Hz ] ] - ] = {} + ] = defaultdict(set) async def start_task( self, @@ -151,6 +149,53 @@ async def start_with_cs( # ) -> bool: # ... + def get_subs( + self, + key: str, + ) -> set[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + # tractor.Context, + float | None, # tick throttle in Hz + ] + ]: + ''' + Get the ``set`` of consumer subscription entries for the given key. + + ''' + return self._subscribers[key] + + def add_subs( + self, + key: str, + subs: set[tuple[ + tractor.MsgStream | trio.MemorySendChannel, + # tractor.Context, + float | None, # tick throttle in Hz + ]], + ) -> set[tuple]: + ''' + Add a ``set`` of consumer subscription entries for the given key. + + ''' + _subs = self._subscribers[key] + _subs.update(subs) + return _subs + + def remove_subs( + self, + key: str, + subs: set[tuple], + + ) -> set[tuple]: + ''' + Remove a ``set`` of consumer subscription entries for key. + + ''' + _subs = self.get_subs(key) + _subs.difference_update(subs) + return _subs + _bus: _FeedsBus = None @@ -212,41 +257,20 @@ async def _setup_persistent_brokerd( def diff_history( - array, - start_dt, - end_dt, - last_tsdb_dt: Optional[datetime] = None + array: np.ndarray, + timeframe: int, + start_dt: datetime, + end_dt: datetime, + last_tsdb_dt: datetime | None = None ) -> np.ndarray: - to_push = array + # no diffing with tsdb dt index possible.. + if last_tsdb_dt is None: + return array - if last_tsdb_dt: - s_diff = (start_dt - last_tsdb_dt).seconds - - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if ( - s_diff < 0 - ): - if abs(s_diff) < len(array): - # the + 1 is because ``last_tsdb_dt`` is pulled from - # the last row entry for the ``'time'`` field retreived - # from the tsdb. - to_push = array[abs(s_diff) + 1:] - - else: - # pass back only the portion of the array that is - # greater then the last time stamp in the tsdb. - time = array['time'] - to_push = array[time >= last_tsdb_dt.timestamp()] - - log.info( - f'Pushing partial frame {to_push.size} to shm' - ) - - return to_push + time = array['time'] + return array[time > last_tsdb_dt.timestamp()] async def start_backfill( @@ -290,6 +314,7 @@ async def start_backfill( to_push = diff_history( array, + timeframe, start_dt, end_dt, last_tsdb_dt=last_tsdb_dt, @@ -338,12 +363,12 @@ async def start_backfill( 60: {'years': 6}, } - kwargs = periods[step_size_s] + period_duration = periods[step_size_s] # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history # settings above when the tsdb is detected as being empty. - last_tsdb_dt = start_dt.subtract(**kwargs) + last_tsdb_dt = start_dt.subtract(**period_duration) # configure async query throttling # rate = config.get('rate', 1) @@ -357,8 +382,8 @@ async def start_backfill( # inline sequential loop where we simply pass the # last retrieved start dt to the next request as # it's end dt. - while start_dt > last_tsdb_dt: - log.info( + while end_dt > last_tsdb_dt: + log.debug( f'Requesting {step_size_s}s frame ending in {start_dt}' ) @@ -408,6 +433,7 @@ async def start_backfill( to_push = diff_history( array, + timeframe, start_dt, end_dt, last_tsdb_dt=last_tsdb_dt, @@ -428,6 +454,8 @@ async def start_backfill( log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' ) + # can't push the entire frame? so + # push only the amount that can fit.. break log.info( @@ -514,6 +542,8 @@ async def tsdb_backfill( # start history anal and load missing new data via backend. for timeframe, shm in shms.items(): + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit. tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( fqsn, timeframe=timeframe, @@ -590,34 +620,51 @@ async def back_load_from_tsdb( if bf_done: await bf_done.wait() - # Load tsdb history into shm buffer (for display). - # TODO: eventually it'd be nice to not require a shm array/buffer # to accomplish this.. maybe we can do some kind of tsdb direct to # graphics format eventually in a child-actor? - # do diff against last start frame of history and only fill - # in from the tsdb an allotment that allows for most recent - # to be loaded into mem *before* tsdb data. - if last_tsdb_dt and latest_start_dt: - dt_diff_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - else: - dt_diff_s = 0 - # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field prepend_start = shm._first.value + array = shm.array + if len(array): + shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt - # sanity check on most-recent-data loading - assert prepend_start > dt_diff_s + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. if ( len(tsdb_history) ): - to_push = tsdb_history[:prepend_start] + + # load the first (smaller) bit of history originally loaded + # above from ``Storage.load()``. + to_push = tsdb_history[-prepend_start:] shm.push( to_push, @@ -628,37 +675,35 @@ async def back_load_from_tsdb( # start=prepend_start, field_map=marketstore.ohlc_key_map, ) - prepend_start = shm._first.value - # load as much from storage into shm as space will - # allow according to user's shm size settings. - last_frame_start = tsdb_history['Epoch'][0] + tsdb_last_frame_start = tsdb_history['Epoch'][0] + # load as much from storage into shm possible (depends on + # user's shm size settings). while ( shm._first.value > 0 ): + tsdb_history = await storage.read_ohlcv( fqsn, - end=last_frame_start, + end=tsdb_last_frame_start, timeframe=timeframe, ) + + next_start = tsdb_history['Epoch'][0] if ( - not len(tsdb_history) - ): - # on empty db history - break + not len(tsdb_history) # empty query - time = tsdb_history['Epoch'] - frame_start = time[0] - frame_end = time[0] - print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}") + # no earlier data detected + or next_start >= tsdb_last_frame_start - if frame_start >= last_frame_start: - # no new data loaded was from tsdb, so we can exit. + ): break + else: + tsdb_last_frame_start = next_start prepend_start = shm._first.value - to_push = tsdb_history[:prepend_start] + to_push = tsdb_history[-prepend_start:] # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. @@ -667,8 +712,6 @@ async def back_load_from_tsdb( prepend=True, field_map=marketstore.ohlc_key_map, ) - last_frame_start = frame_start - log.info(f'Loaded {to_push.shape} datums from storage') # manually trigger step update to update charts/fsps @@ -719,9 +762,18 @@ async def manage_history( buffer. ''' + + # TODO: is there a way to make each shm file key + # actor-tree-discovery-addr unique so we avoid collisions + # when doing tests which also allocate shms for certain instruments + # that may be in use on the system by some other running daemons? + # from tractor._state import _runtime_vars + # port = _runtime_vars['_root_mailbox'][1] + # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( + # key=f'{fqsn}_hist_p{port}', key=f'{fqsn}_hist', # use any broker defined ohlc dtype: @@ -739,6 +791,7 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( + # key=f'{fqsn}_rt_p{port}', key=f'{fqsn}_rt', # use any broker defined ohlc dtype: @@ -836,11 +889,157 @@ async def manage_history( await trio.sleep_forever() +class Flume(Struct): + ''' + Composite reference type which points to all the addressing handles + and other meta-data necessary for the read, measure and management + of a set of real-time updated data flows. + + Can be thought of as a "flow descriptor" or "flow frame" which + describes the high level properties of a set of data flows that can + be used seamlessly across process-memory boundaries. + + Each instance's sub-components normally includes: + - a msg oriented quote stream provided via an IPC transport + - history and real-time shm buffers which are both real-time + updated and backfilled. + - associated startup indexing information related to both buffer + real-time-append and historical prepend addresses. + - low level APIs to read and measure the updated data and manage + queuing properties. + + ''' + symbol: Symbol + first_quote: dict + _hist_shm_token: _Token + _rt_shm_token: _Token + + # private shm refs loaded dynamically from tokens + _hist_shm: ShmArray | None = None + _rt_shm: ShmArray | None = None + + stream: tractor.MsgStream | None = None + izero_hist: int = 0 + izero_rt: int = 0 + throttle_rate: int | None = None + + # TODO: do we need this really if we can pull the `Portal` from + # ``tractor``'s internals? + feed: Feed | None = None + + @property + def rt_shm(self) -> ShmArray: + + if self._rt_shm is None: + self._rt_shm = attach_shm_array( + token=self._rt_shm_token, + readonly=True, + ) + + return self._rt_shm + + @property + def hist_shm(self) -> ShmArray: + + if self._hist_shm is None: + self._hist_shm = attach_shm_array( + token=self._hist_shm_token, + readonly=True, + ) + + return self._hist_shm + + async def receive(self) -> dict: + return await self.stream.receive() + + @acm + async def index_stream( + self, + delay_s: int = 1, + + ) -> AsyncIterator[int]: + + if not self.feed: + raise RuntimeError('This flume is not part of any ``Feed``?') + + # TODO: maybe a public (property) API for this in ``tractor``? + portal = self.stream._ctx._portal + assert portal + + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with maybe_open_context( + acm_func=partial( + portal.open_context, + iter_ohlc_periods, + ), + kwargs={'delay_s': delay_s}, + ) as (cache_hit, (ctx, first)): + async with ctx.open_stream() as istream: + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream + + def get_ds_info( + self, + ) -> tuple[float, float, float]: + ''' + Compute the "downsampling" ratio info between the historical shm + buffer and the real-time (HFT) one. + + Return a tuple of the fast sample period, historical sample + period and ratio between them. + + ''' + times = self.hist_shm.array['time'] + end = pendulum.from_timestamp(times[-1]) + start = pendulum.from_timestamp(times[times != times[-1]][-1]) + hist_step_size_s = (end - start).seconds + + times = self.rt_shm.array['time'] + end = pendulum.from_timestamp(times[-1]) + start = pendulum.from_timestamp(times[times != times[-1]][-1]) + rt_step_size_s = (end - start).seconds + + ratio = hist_step_size_s / rt_step_size_s + return ( + rt_step_size_s, + hist_step_size_s, + ratio, + ) + + # TODO: get native msgspec decoding for these workinn + def to_msg(self) -> dict: + msg = self.to_dict() + msg['symbol'] = msg['symbol'].to_dict() + + # can't serialize the stream or feed objects, it's expected + # you'll have a ref to it since this msg should be rxed on + # a stream on whatever far end IPC.. + msg.pop('stream') + msg.pop('feed') + return msg + + @classmethod + def from_msg(cls, msg: dict) -> dict: + symbol = Symbol(**msg.pop('symbol')) + return cls( + symbol=symbol, + **msg, + ) + + async def allocate_persistent_feed( bus: _FeedsBus, + sub_registered: trio.Event, brokername: str, - symbol: str, + symstr: str, loglevel: str, start_stream: bool = True, @@ -882,16 +1081,49 @@ async def allocate_persistent_feed( mod.stream_quotes, send_chan=send, feed_is_live=feed_is_live, - symbols=[symbol], + symbols=[symstr], loglevel=loglevel, ) ) + # TODO: this is indexed by symbol for now since we've planned (for + # some time) to expect backends to handle single + # ``.stream_quotes()`` calls with multiple symbols inputs to just + # work such that a backend can do its own multiplexing if desired. + # + # Likely this will require some design changes: + # - the .started() should return some config output determining + # whether the backend does indeed multiplex multi-symbol quotes + # internally or whether separate task spawns should be done per + # symbol (as it is right now). + # - information about discovery of non-local host daemons which can + # be contacted in the case where we want to support load disti + # over multi-use clusters; eg. some new feed request is + # re-directed to another daemon cluster because the current one is + # at max capacity. + # - the same ideas ^ but when a local core is maxxed out (like how + # binance does often with hft XD + # - if a brokerd is non-local then we can't just allocate a mem + # channel here and have the brokerd write it, we instead need + # a small streaming machine around the remote feed which can then + # do the normal work of sampling and writing shm buffers + # (depending on if we want sampling done on the far end or not?) + msg = init_msg[symstr] + # the broker-specific fully qualified symbol name, # but ensure it is lower-cased for external use. - bfqsn = init_msg[symbol]['fqsn'].lower() - init_msg[symbol]['fqsn'] = bfqsn + bfqsn = msg['fqsn'].lower() + + # true fqsn including broker/provider suffix + fqsn = '.'.join((bfqsn, brokername)) + # msg['fqsn'] = bfqsn + + symbol = Symbol.from_fqsn( + fqsn=fqsn, + info=msg['symbol_info'], + ) + assert symbol.type_key - # HISTORY, run 2 tasks: + # HISTORY storage, run 2 tasks: # - a history loader / maintainer # - a real-time streamer which consumers and sends new data to any # consumers as well as writes to storage backends (as configured). @@ -909,53 +1141,29 @@ async def allocate_persistent_feed( manage_history, mod, bus, - '.'.join((bfqsn, brokername)), + fqsn, some_data_ready, feed_is_live, ) - # we hand an IPC-msg compatible shm token to the caller so it - # can read directly from the memory which will be written by - # this task. - msg = init_msg[symbol] - msg['hist_shm_token'] = hist_shm.token - msg['izero_hist'] = izero_hist - msg['izero_rt'] = izero_rt - msg['rt_shm_token'] = rt_shm.token - - # true fqsn - fqsn = '.'.join((bfqsn, brokername)) - # add a fqsn entry that includes the ``.`` suffix - # and an entry that includes the broker-specific fqsn (including - # any new suffixes or elements as injected by the backend). - init_msg[fqsn] = msg - init_msg[bfqsn] = msg - - # TODO: pretty sure we don't need this? why not just leave 1s as - # the fastest "sample period" since we'll probably always want that - # for most purposes. - # pass OHLC sample rate in seconds (be sure to use python int type) - # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - # yield back control to starting nursery once we receive either # some history or a real-time quote. log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - # append ``.`` suffix to each quote symbol - acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' - - generic_first_quotes = { - acceptable_not_fqsn_with_broker_suffix: first_quote, - fqsn: first_quote, - } + flume = Flume( + symbol=symbol, + _hist_shm_token=hist_shm.token, + _rt_shm_token=rt_shm.token, + first_quote=first_quote, + izero_hist=izero_hist, + izero_rt=izero_rt, + # throttle_rate=tick_throttle, + ) # for ambiguous names we simply apply the retreived # feed to that name (for now). - bus.feeds[symbol] = bus.feeds[bfqsn] = ( - init_msg, - generic_first_quotes, - ) + bus.feeds[symstr] = bus.feeds[bfqsn] = flume # insert 1s ohlc into the increment buffer set # to update and shift every second @@ -999,9 +1207,14 @@ async def allocate_persistent_feed( rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] rt_shm.array['volume'][-2] = 0 + # wait the spawning parent task to register its subscriber + # send-stream entry before we start the sample loop. + await sub_registered.wait() + # start sample loop and shm incrementer task for OHLC style sampling # at the above registered step periods. try: + log.info(f'Starting sampler task for {fqsn}') await sample_and_broadcast( bus, rt_shm, @@ -1019,12 +1232,16 @@ async def open_feed_bus( ctx: tractor.Context, brokername: str, - symbol: str, # normally expected to the broker-specific fqsn - loglevel: str, + symbols: list[str], # normally expected to the broker-specific fqsn + + loglevel: str = 'error', tick_throttle: Optional[float] = None, start_stream: bool = True, -) -> None: +) -> dict[ + str, # fqsn + tuple[dict, dict] # pair of dicts of the initmsg and first quotes +]: ''' Open a data feed "bus": an actor-persistent per-broker task-oriented data feed registry which allows managing real-time quote streams per @@ -1043,63 +1260,70 @@ async def open_feed_bus( # ensure we are who we think we are servicename = tractor.current_actor().name assert 'brokerd' in servicename + + # XXX: figure this not crashing into debug! + # await tractor.breakpoint() + # assert 0 + assert brokername in servicename bus = get_feed_bus(brokername) - - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - entry = bus.feeds.get(symbol) - if entry is None: - # allocate a new actor-local stream bus which - # will persist for this `brokerd`'s service lifetime. - async with bus.task_lock: - await bus.nursery.start( - partial( - allocate_persistent_feed, - - bus=bus, - brokername=brokername, - # here we pass through the selected symbol in native - # "format" (i.e. upper vs. lowercase depending on - # provider). - symbol=symbol, - loglevel=loglevel, - start_stream=start_stream, + sub_registered = trio.Event() + + flumes: dict[str, Flume] = {} + + for symbol in symbols: + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + flume = bus.feeds.get(symbol) + if flume is None: + # allocate a new actor-local stream bus which + # will persist for this `brokerd`'s service lifetime. + async with bus.task_lock: + await bus.nursery.start( + partial( + allocate_persistent_feed, + + bus=bus, + sub_registered=sub_registered, + brokername=brokername, + # here we pass through the selected symbol in native + # "format" (i.e. upper vs. lowercase depending on + # provider). + symstr=symbol, + loglevel=loglevel, + start_stream=start_stream, + ) ) - ) - # TODO: we can remove this? - assert isinstance(bus.feeds[symbol], tuple) - - # XXX: ``first_quotes`` may be outdated here if this is secondary - # subscriber - init_msg, first_quotes = bus.feeds[symbol] - - msg = init_msg[symbol] - bfqsn = msg['fqsn'].lower() - - # true fqsn - fqsn = '.'.join([bfqsn, brokername]) - assert fqsn in first_quotes - assert bus.feeds[bfqsn] - - # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) - bsym = symbol + f'.{brokername}' - assert bsym in first_quotes - - # we use the broker-specific fqsn (bfqsn) for - # the sampler subscription since the backend isn't (yet) - # expected to append it's own name to the fqsn, so we filter - # on keys which *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bfqsn, []) - - # send this even to subscribers to existing feed? - # deliver initial info message a first quote asap - await ctx.started(( - init_msg, - first_quotes, - )) + # TODO: we can remove this? + # assert isinstance(bus.feeds[symbol], tuple) + + # XXX: ``.first_quote`` may be outdated here if this is secondary + # subscriber + flume = bus.feeds[symbol] + sym = flume.symbol + bfqsn = sym.key + fqsn = sym.fqsn # true fqsn + assert bfqsn in fqsn and brokername in fqsn + + if sym.suffix: + bfqsn = fqsn.removesuffix(f'.{brokername}') + log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}') + + # pack for ``.started()`` sync msg + flumes[fqsn] = flume + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, set()) + + # sync feed subscribers with flume handles + await ctx.started( + {fqsn: flume.to_msg() for fqsn, flume in flumes.items()} + ) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -1109,49 +1333,80 @@ async def open_feed_bus( async with ( ctx.open_stream() as stream, ): - # re-send to trigger display loop cycle (necessary especially - # when the mkt is closed and no real-time messages are - # expected). - await stream.send({fqsn: first_quotes}) - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - if tick_throttle: - send, recv = trio.open_memory_channel(2**10) - cs = await bus.start_task( - uniform_rate_send, - tick_throttle, - recv, - stream, - ) - sub = (send, ctx, tick_throttle) - else: - sub = (stream, ctx, tick_throttle) + local_subs: dict[str, set[tuple]] = {} + for fqsn, flume in flumes.items(): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send({fqsn: flume.first_quote}) - subs = bus._subscribers[bfqsn] - subs.append(sub) + # set a common msg stream for all requested symbols + assert stream + flume.stream = stream - try: - uid = ctx.chan.uid + # Add a real-time quote subscription to feed bus: + # This ``sub`` subscriber entry is added to the feed bus set so + # that the ``sample_and_broadcast()`` task (spawned inside + # ``allocate_persistent_feed()``) will push real-time quote + # (ticks) to this new consumer. + + if tick_throttle: + flume.throttle_rate = tick_throttle + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. + send, recv = trio.open_memory_channel(2**10) + + cs = await bus.start_task( + uniform_rate_send, + tick_throttle, + recv, + stream, + ) + # NOTE: so the ``send`` channel here is actually a swapped + # in trio mem chan which gets pushed by the normal sampler + # task but instead of being sent directly over the IPC msg + # stream it's the throttle task does the work of + # incrementally forwarding to the IPC stream at the throttle + # rate. + send._ctx = ctx # mock internal ``tractor.MsgStream`` ref + sub = (send, tick_throttle) + + else: + sub = (stream, tick_throttle) + # TODO: add an api for this on the bus? + # maybe use the current task-id to key the sub list that's + # added / removed? Or maybe we can add a general + # pause-resume by sub-key api? + bfqsn = fqsn.removesuffix(f'.{brokername}') + local_subs.setdefault(bfqsn, set()).add(sub) + bus.add_subs(bfqsn, {sub}) + + # sync caller with all subs registered state + sub_registered.set() + + uid = ctx.chan.uid + try: # ctrl protocol for start/stop of quote streams based on UI # state (eg. don't need a stream when a symbol isn't being # displayed). async for msg in stream: if msg == 'pause': - if sub in subs: + for bfqsn, subs in local_subs.items(): log.info( - f'Pausing {fqsn} feed for {uid}') - subs.remove(sub) + f'Pausing {bfqsn} feed for {uid}') + bus.remove_subs(bfqsn, subs) elif msg == 'resume': - if sub not in subs: + for bfqsn, subs in local_subs.items(): log.info( - f'Resuming {fqsn} feed for {uid}') - subs.append(sub) + f'Resuming {bfqsn} feed for {uid}') + bus.add_subs(bfqsn, subs) + else: raise ValueError(msg) finally: @@ -1162,110 +1417,94 @@ async def open_feed_bus( # TODO: a one-cancels-one nursery # n.cancel_scope.cancel() cs.cancel() - try: - bus._subscribers[bfqsn].remove(sub) - except ValueError: - log.warning(f'{sub} for {symbol} was already removed?') + # drop all subs for this task from the bus + for bfqsn, subs in local_subs.items(): + bus.remove_subs(bfqsn, subs) -@dataclass -class Feed: - ''' - A data feed for client-side interaction with far-process real-time - data sources. - - This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and storage APIs (shm and time-series - db). +class Feed(Struct): ''' - name: str - hist_shm: ShmArray - rt_shm: ShmArray - mod: ModuleType - first_quotes: dict # symbol names to first quote dicts - _portal: tractor.Portal - stream: trio.abc.ReceiveChannel[dict[str, Any]] - status: dict[str, Any] + A per-provider API for client-side consumption from real-time data + (streaming) sources, normally brokers and data services. - izero_hist: int = 0 - izero_rt: int = 0 - - throttle_rate: Optional[int] = None + This is a somewhat thin abstraction on top of + a ``tractor.MsgStream`` plus associate share memory buffers which + can be read in a readers-writer-lock style IPC configuration. - _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None - _max_sample_rate: int = 1 - - # cache of symbol info messages received as first message when - # a stream startsc. - symbols: dict[str, Symbol] = field(default_factory=dict) + Furhter, there is direct access to slower sampled historical data through + similarly allocated shm arrays. - @property - def portal(self) -> tractor.Portal: - return self._portal + ''' + mods: dict[str, ModuleType] = {} + portals: dict[ModuleType, tractor.Portal] = {} + flumes: dict[str, Flume] = {} + streams: dict[ + str, + trio.abc.ReceiveChannel[dict[str, Any]], + ] = {} - async def receive(self) -> dict: - return await self.stream.receive() + # used for UI to show remote state + status: dict[str, Any] = {} @acm - async def index_stream( + async def open_multi_stream( self, - delay_s: int = 1, + brokers: Sequence[str] | None = None, - ) -> AsyncIterator[int]: + ) -> trio.abc.ReceiveChannel: - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with maybe_open_context( - acm_func=partial( - self.portal.open_context, - iter_ohlc_periods, - ), - kwargs={'delay_s': delay_s}, - ) as (cache_hit, (ctx, first)): - async with ctx.open_stream() as istream: - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream + if brokers is None: + mods = self.mods + brokers = list(self.mods) + else: + mods = {name: self.mods[name] for name in brokers} + + if len(mods) == 1: + # just pass the brokerd stream directly if only one provider + # was detected. + stream = self.streams[list(brokers)[0]] + async with stream.subscribe() as bstream: + yield bstream + return - async def pause(self) -> None: - await self.stream.send('pause') + # start multiplexing task tree + tx, rx = trio.open_memory_channel(616) - async def resume(self) -> None: - await self.stream.send('resume') + async def relay_to_common_memchan(stream: tractor.MsgStream): + async with tx: + async for msg in stream: + await tx.send(msg) - def get_ds_info( - self, - ) -> tuple[float, float, float]: - ''' - Compute the "downsampling" ratio info between the historical shm - buffer and the real-time (HFT) one. + async with trio.open_nursery() as nurse: + # spawn a relay task for each stream so that they all + # multiplex to a common channel. + for brokername in mods: + stream = self.streams[brokername] + nurse.start_soon(relay_to_common_memchan, stream) - Return a tuple of the fast sample period, historical sample - period and ratio between them. + try: + yield rx + finally: + nurse.cancel_scope.cancel() - ''' - times = self.hist_shm.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - hist_step_size_s = (end - start).seconds + _max_sample_rate: int = 1 - times = self.rt_shm.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - rt_step_size_s = (end - start).seconds + # @property + # def portal(self) -> tractor.Portal: + # return self._portal - ratio = hist_step_size_s / rt_step_size_s - return ( - rt_step_size_s, - hist_step_size_s, - ratio, - ) + # @property + # def name(self) -> str: + # return self.mod.name + + async def pause(self) -> None: + for stream in set(self.streams.values()): + await stream.send('pause') + + async def resume(self) -> None: + for stream in set(self.streams.values()): + await stream.send('resume') @acm @@ -1305,135 +1544,6 @@ async def search(text: str) -> dict[str, Any]: yield -@acm -async def open_feed( - - fqsns: list[str], - - loglevel: Optional[str] = None, - backpressure: bool = True, - start_stream: bool = True, - tick_throttle: Optional[float] = None, # Hz - -) -> Feed: - ''' - Open a "data feed" which provides streamed real-time quotes. - - ''' - fqsn = fqsns[0].lower() - - brokername, key, suffix = unpack_fqsn(fqsn) - bfqsn = fqsn.replace('.' + brokername, '') - - try: - mod = get_brokermod(brokername) - except ImportError: - mod = get_ingestormod(brokername) - - # no feed for broker exists so maybe spawn a data brokerd - async with ( - - # if no `brokerd` for this backend exists yet we spawn - # and actor for one. - maybe_spawn_brokerd( - brokername, - loglevel=loglevel - ) as portal, - - # (allocate and) connect to any feed bus for this broker - portal.open_context( - open_feed_bus, - brokername=brokername, - symbol=bfqsn, - loglevel=loglevel, - start_stream=start_stream, - tick_throttle=tick_throttle, - - ) as (ctx, (init_msg, first_quotes)), - - ctx.open_stream( - # XXX: be explicit about stream backpressure since we should - # **never** overrun on feeds being too fast, which will - # pretty much always happen with HFT XD - backpressure=backpressure, - ) as stream, - - ): - init = init_msg[bfqsn] - # we can only read from shm - hist_shm = attach_shm_array( - token=init['hist_shm_token'], - readonly=True, - ) - rt_shm = attach_shm_array( - token=init['rt_shm_token'], - readonly=True, - ) - - assert fqsn in first_quotes - - feed = Feed( - name=brokername, - hist_shm=hist_shm, - rt_shm=rt_shm, - mod=mod, - first_quotes=first_quotes, - stream=stream, - _portal=portal, - status={}, - izero_hist=init['izero_hist'], - izero_rt=init['izero_rt'], - throttle_rate=tick_throttle, - ) - - # fill out "status info" that the UI can show - host, port = feed.portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - - feed.status.update({ - 'actor_name': feed.portal.channel.uid[0], - 'host': host, - 'port': port, - 'shm': f'{humanize(feed.hist_shm._shm.size)}', - 'throttle_rate': feed.throttle_rate, - }) - feed.status.update(init_msg.pop('status', {})) - - for sym, data in init_msg.items(): - si = data['symbol_info'] - fqsn = data['fqsn'] + f'.{brokername}' - symbol = Symbol.from_fqsn( - fqsn, - info=si, - ) - - # symbol.broker_info[brokername] = si - feed.symbols[fqsn] = symbol - feed.symbols[sym] = symbol - - # cast shm dtype to list... can't member why we need this - for shm_key, shm in [ - ('rt_shm_token', rt_shm), - ('hist_shm_token', hist_shm), - ]: - shm_token = data[shm_key] - - # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = tuple( - map(tuple, shm_token['dtype_descr'])) - - assert shm_token == shm.token # sanity - - feed._max_sample_rate = 1 - - try: - yield feed - finally: - # drop the infinite stream connection - await ctx.cancel() - - @acm async def maybe_open_feed( @@ -1473,7 +1583,163 @@ async def maybe_open_feed( log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use - async with feed.stream.subscribe() as bstream: - yield feed, bstream + + async with gather_contexts( + mngrs=[stream.subscribe() for stream in feed.streams.values()] + ) as bstreams: + for bstream, flume in zip(bstreams, feed.flumes.values()): + # XXX: TODO: horrible hackery that needs fixing.. + # i guess we have to create context proxies? + bstream._ctx = flume.stream._ctx + flume.stream = bstream + + yield feed else: - yield feed, feed.stream + yield feed + + +@acm +async def open_feed( + + fqsns: list[str], + + loglevel: Optional[str] = None, + backpressure: bool = True, + start_stream: bool = True, + tick_throttle: Optional[float] = None, # Hz + +) -> Feed: + ''' + Open a "data feed" which provides streamed real-time quotes. + + ''' + providers: dict[ModuleType, list[str]] = {} + feed = Feed() + + for fqsn in fqsns: + brokername, key, suffix = unpack_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') + + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # built a per-provider map to instrument names + providers.setdefault(mod, []).append(bfqsn) + feed.mods[mod.name] = mod + + # one actor per brokerd for now + brokerd_ctxs = [] + + for brokermod, bfqsns in providers.items(): + + # if no `brokerd` for this backend exists yet we spawn + # a daemon actor for it. + brokerd_ctxs.append( + maybe_spawn_brokerd( + brokermod.name, + loglevel=loglevel + ) + ) + + portals: tuple[tractor.Portal] + async with gather_contexts( + brokerd_ctxs, + ) as portals: + + bus_ctxs: list[AsyncContextManager] = [] + for ( + portal, + (brokermod, bfqsns), + ) in zip(portals, providers.items()): + + feed.portals[brokermod] = portal + + # fill out "status info" that the UI can show + host, port = portal.channel.raddr + if host == '127.0.0.1': + host = 'localhost' + + feed.status.update({ + 'actor_name': portal.channel.uid[0], + 'host': host, + 'port': port, + 'hist_shm': 'NA', + 'rt_shm': 'NA', + 'throttle_rate': tick_throttle, + }) + # feed.status.update(init_msg.pop('status', {})) + + # (allocate and) connect to any feed bus for this broker + bus_ctxs.append( + portal.open_context( + open_feed_bus, + brokername=brokermod.name, + symbols=bfqsns, + loglevel=loglevel, + start_stream=start_stream, + tick_throttle=tick_throttle, + ) + ) + + assert len(feed.mods) == len(feed.portals) + + async with ( + gather_contexts(bus_ctxs) as ctxs, + ): + stream_ctxs = [] + for ( + (ctx, flumes_msg_dict), + (brokermod, bfqsns), + ) in zip(ctxs, providers.items()): + + for fqsn, flume_msg in flumes_msg_dict.items(): + flume = Flume.from_msg(flume_msg) + assert flume.symbol.fqsn == fqsn + feed.flumes[fqsn] = flume + + # TODO: do we need this? + flume.feed = feed + + # attach and cache shm handles + rt_shm = flume.rt_shm + assert rt_shm + hist_shm = flume.hist_shm + assert hist_shm + + feed.status['hist_shm'] = ( + f'{humanize(hist_shm._shm.size)}' + ) + feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}' + + stream_ctxs.append( + ctx.open_stream( + # XXX: be explicit about stream backpressure + # since we should **never** overrun on feeds + # being too fast, which will pretty much + # always happen with HFT XD + backpressure=backpressure, + ) + ) + + async with ( + gather_contexts(stream_ctxs) as streams, + ): + for ( + stream, + (brokermod, bfqsns), + ) in zip(streams, providers.items()): + + assert stream + feed.streams[brokermod.name] = stream + + # apply `brokerd`-common steam to each flume + # tracking a symbol from that provider. + for fqsn, flume in feed.flumes.items(): + if brokermod.name in flume.symbol.brokers: + flume.stream = stream + + assert len(feed.mods) == len(feed.portals) == len(feed.streams) + + yield feed diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 096c9745e..d354f9b02 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -38,7 +38,7 @@ from bidict import bidict from msgspec.msgpack import encode, decode -import pyqtgraph as pg +# import pyqtgraph as pg import numpy as np import tractor from trio_websocket import open_websocket_url @@ -429,10 +429,7 @@ async def read_ohlcv( end: Optional[int] = None, limit: int = int(800e3), - ) -> dict[ - int, - Union[dict, np.ndarray], - ]: + ) -> np.ndarray: client = self.client syms = await client.list_symbols() @@ -661,7 +658,7 @@ async def tsdb_history_update( [fqsn], start_stream=False, - ) as (feed, stream), + ) as feed, ): profiler(f'opened feed for {fqsn}') @@ -669,12 +666,13 @@ async def tsdb_history_update( # to_prepend = None if fqsn: - symbol = feed.symbols.get(fqsn) + flume = feed.flumes[fqsn] + symbol = flume.symbol if symbol: - fqsn = symbol.front_fqsn() + fqsn = symbol.fqsn # diff db history with shm and only write the missing portions - # ohlcv = feed.hist_shm.array + # ohlcv = flume.hist_shm.array # TODO: use pg profiler # for secs in (1, 60): diff --git a/piker/data/types.py b/piker/data/types.py index 4bdb80637..1359526cb 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -42,16 +42,17 @@ def to_dict(self) -> dict: for f in self.__struct_fields__ } - def __repr__(self): - # only turn on pprint when we detect a python REPL - # at runtime B) - if ( - hasattr(sys, 'ps1') - # TODO: check if we're in pdb - ): - return self.pformat() - - return super().__repr__() + # Lul, doesn't seem to work that well.. + # def __repr__(self): + # # only turn on pprint when we detect a python REPL + # # at runtime B) + # if ( + # hasattr(sys, 'ps1') + # # TODO: check if we're in pdb + # ): + # return self.pformat() + + # return super().__repr__() def pformat(self) -> str: return f'Struct({pformat(self.to_dict())})' diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 084ff5101..eb5eaff41 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -26,7 +26,6 @@ ) import numpy as np -import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor @@ -35,7 +34,9 @@ from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array -from ..data.feed import Feed +from ..data.feed import ( + Flume, +) from ..data._sharedmem import ShmArray from ..data._sampling import _default_delay_s from ..data._source import Symbol @@ -79,7 +80,7 @@ async def filter_quotes_by_sym( async def fsp_compute( symbol: Symbol, - feed: Feed, + flume: Flume, quote_stream: trio.abc.ReceiveChannel, src: ShmArray, @@ -107,7 +108,7 @@ async def fsp_compute( filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg - feed.rt_shm, + flume.rt_shm, ) # Conduct a single iteration of fsp with historical bars input @@ -310,12 +311,12 @@ async def cascade( # needs to get throttled the ticks we generate. # tick_throttle=60, - ) as (feed, quote_stream): - symbol = feed.symbols[fqsn] + ) as feed: + flume = feed.flumes[fqsn] + symbol = flume.symbol + assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - - assert src.token == feed.rt_shm.token # last_len = new_len = len(src.array) func_name = func.__name__ @@ -327,8 +328,8 @@ async def cascade( fsp_compute, symbol=symbol, - feed=feed, - quote_stream=quote_stream, + flume=flume, + quote_stream=flume.stream, # shm src=src, @@ -430,7 +431,7 @@ async def poll_and_sync_to_step( # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream( + async with flume.index_stream( int(delay_s) ) as istream: diff --git a/piker/ui/_app.py b/piker/ui/_app.py index a31fd2da7..23a9d2ede 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -19,15 +19,16 @@ ''' from functools import partial +from types import ModuleType from PyQt5.QtCore import QEvent import trio from .._daemon import maybe_spawn_brokerd -from ..brokers import get_brokermod from . import _event from ._exec import run_qtractor from ..data.feed import install_brokerd_search +from ..data._source import unpack_fqsn from . import _search from ._chart import GodWidget from ..log import get_logger @@ -36,27 +37,26 @@ async def load_provider_search( - - broker: str, + brokermod: str, loglevel: str, ) -> None: - log.info(f'loading brokerd for {broker}..') + name = brokermod.name + log.info(f'loading brokerd for {name}..') async with ( maybe_spawn_brokerd( - broker, + name, loglevel=loglevel ) as portal, install_brokerd_search( portal, - get_brokermod(broker), + brokermod, ), ): - # keep search engine stream up until cancelled await trio.sleep_forever() @@ -66,8 +66,8 @@ async def _async_main( # implicit required argument provided by ``qtractor_run()`` main_widget: GodWidget, - sym: str, - brokernames: str, + syms: list[str], + brokers: dict[str, ModuleType], loglevel: str, ) -> None: @@ -99,6 +99,11 @@ async def _async_main( sbar = godwidget.window.status_bar starting_done = sbar.open_status('starting ze sexy chartz') + needed_brokermods: dict[str, ModuleType] = {} + for fqsn in syms: + brokername, *_ = unpack_fqsn(fqsn) + needed_brokermods[brokername] = brokers[brokername] + async with ( trio.open_nursery() as root_n, ): @@ -113,12 +118,16 @@ async def _async_main( # godwidget.hbox.addWidget(search) godwidget.search = search - symbol, _, provider = sym.rpartition('.') + symbols: list[str] = [] + + for sym in syms: + symbol, _, provider = sym.rpartition('.') + symbols.append(symbol) # this internally starts a ``display_symbol_data()`` task above - order_mode_ready = await godwidget.load_symbol( + order_mode_ready = await godwidget.load_symbols( provider, - symbol, + symbols, loglevel ) @@ -136,8 +145,12 @@ async def _async_main( ): # load other providers into search **after** # the chart's select cache - for broker in brokernames: - root_n.start_soon(load_provider_search, broker, loglevel) + for brokername, mod in needed_brokermods.items(): + root_n.start_soon( + load_provider_search, + mod, + loglevel, + ) await order_mode_ready.wait() @@ -166,8 +179,8 @@ async def _async_main( def _main( - sym: str, - brokernames: [str], + syms: list[str], + brokermods: list[ModuleType], piker_loglevel: str, tractor_kwargs, ) -> None: @@ -178,7 +191,11 @@ def _main( ''' run_qtractor( func=_async_main, - args=(sym, brokernames, piker_loglevel), + args=( + syms, + {mod.name: mod for mod in brokermods}, + piker_loglevel, + ), main_widget_type=GodWidget, tractor_kwargs=tractor_kwargs, ) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index bb2d44486..bad82544a 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -186,10 +186,10 @@ def get_chart_symbol( ) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore return self._chart_cache.get(symbol_key) - async def load_symbol( + async def load_symbols( self, providername: str, - symbol_key: str, + symbol_keys: list[str], loglevel: str, reset: bool = False, @@ -200,12 +200,20 @@ async def load_symbol( Expects a ``numpy`` structured array containing all the ohlcv fields. ''' + fqsns: list[str] = [] + # our symbol key style is always lower case - symbol_key = symbol_key.lower() + for key in list(map(str.lower, symbol_keys)): + + # fully qualified symbol name (SNS i guess is what we're making?) + fqsn = '.'.join([key, providername]) + fqsns.append(fqsn) + + # NOTE: for now we use the first symbol in the set as the "key" + # for the overlay of feeds on the chart. + group_key = fqsns[0] - # fully qualified symbol name (SNS i guess is what we're making?) - fqsn = '.'.join([symbol_key, providername]) - all_linked = self.get_chart_symbol(fqsn) + all_linked = self.get_chart_symbol(group_key) order_mode_started = trio.Event() if not self.vbox.isEmpty(): @@ -238,7 +246,7 @@ async def load_symbol( display_symbol_data, self, providername, - symbol_key, + fqsns, loglevel, order_mode_started, ) @@ -907,14 +915,16 @@ def __init__( def resume_all_feeds(self): try: for feed in self._feeds.values(): - self.linked.godwidget._root_n.start_soon(feed.resume) + for flume in feed.flumes.values(): + self.linked.godwidget._root_n.start_soon(feed.resume) except RuntimeError: # TODO: cancel the qtractor runtime here? raise def pause_all_feeds(self): for feed in self._feeds.values(): - self.linked.godwidget._root_n.start_soon(feed.pause) + for flume in feed.flumes.values(): + self.linked.godwidget._root_n.start_soon(feed.pause) @property def view(self) -> ChartView: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 0a98b4a85..c7ed9299d 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -33,6 +33,7 @@ from ..data.feed import ( open_feed, Feed, + Flume, ) from ..data.types import Struct from ._axes import YAxisLabel @@ -228,7 +229,7 @@ async def graphics_update_loop( nurse: trio.Nursery, godwidget: GodWidget, - feed: Feed, + flume: Flume, wap_in_history: bool = False, vlm_chart: Optional[ChartPlotWidget] = None, @@ -255,8 +256,8 @@ async def graphics_update_loop( fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart - ohlcv = feed.rt_shm - hist_ohlcv = feed.hist_shm + ohlcv = flume.rt_shm + hist_ohlcv = flume.hist_shm # update last price sticky last_price_sticky = fast_chart._ysticks[fast_chart.name] @@ -347,9 +348,9 @@ async def increment_history_view(): 'i_last_append': i_last, 'i_last': i_last, } - _, hist_step_size_s, _ = feed.get_ds_info() + _, hist_step_size_s, _ = flume.get_ds_info() - async with feed.index_stream( + async with flume.index_stream( # int(hist_step_size_s) # TODO: seems this is more reliable at keeping the slow # chart incremented in view more correctly? @@ -393,7 +394,7 @@ async def increment_history_view(): nurse.start_soon(increment_history_view) # main real-time quotes update loop - stream: tractor.MsgStream = feed.stream + stream: tractor.MsgStream = flume.stream async for quotes in stream: ds.quotes = quotes @@ -813,13 +814,13 @@ def graphics_update_cycle( async def link_views_with_region( rt_chart: ChartPlotWidget, hist_chart: ChartPlotWidget, - feed: Feed, + flume: Flume, ) -> None: # these value are be only pulled once during shm init/startup - izero_hist = feed.izero_hist - izero_rt = feed.izero_rt + izero_hist = flume.izero_hist + izero_rt = flume.izero_rt # Add the LinearRegionItem to the ViewBox, but tell the ViewBox # to exclude this item when doing auto-range calculations. @@ -846,7 +847,7 @@ async def link_views_with_region( # poll for datums load and timestep detection for _ in range(100): try: - _, _, ratio = feed.get_ds_info() + _, _, ratio = flume.get_ds_info() break except IndexError: await trio.sleep(0.01) @@ -947,7 +948,7 @@ def update_pi_from_region(): async def display_symbol_data( godwidget: GodWidget, provider: str, - sym: str, + fqsns: list[str], loglevel: str, order_mode_started: trio.Event, @@ -961,11 +962,6 @@ async def display_symbol_data( ''' sbar = godwidget.window.status_bar - loading_sym_key = sbar.open_status( - f'loading {sym}.{provider} ->', - group_key=True - ) - # historical data fetch # brokermod = brokers.get_brokermod(provider) @@ -974,10 +970,17 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) - fqsn = '.'.join((sym, provider)) + for fqsn in fqsns: + + loading_sym_key = sbar.open_status( + f'loading {fqsn} ->', + group_key=True + ) + + feed: Feed async with open_feed( - [fqsn], + fqsns, loglevel=loglevel, # limit to at least display's FPS @@ -985,11 +988,17 @@ async def display_symbol_data( tick_throttle=_quote_throttle_rate, ) as feed: - ohlcv: ShmArray = feed.rt_shm - hist_ohlcv: ShmArray = feed.hist_shm - symbol = feed.symbols[sym] - fqsn = symbol.front_fqsn() + # TODO: right now we only show one symbol on charts, but + # overlays are coming muy pronto guey.. + assert len(feed.flumes) == 1 + flume = list(feed.flumes.values())[0] + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn step_size_s = 1 tf_key = tf_in_1s[step_size_s] @@ -1009,7 +1018,7 @@ async def display_symbol_data( hist_linked._symbol = symbol hist_chart = hist_linked.plot_ohlc_main( symbol, - feed.hist_shm, + hist_ohlcv, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. # sidepane=False, @@ -1025,7 +1034,7 @@ async def display_symbol_data( godwidget.pp_pane = pp_pane # create main OHLC chart - chart = rt_linked.plot_ohlc_main( + ohlc_chart = rt_linked.plot_ohlc_main( symbol, ohlcv, # in the case of history chart we explicitly set `False` @@ -1033,8 +1042,8 @@ async def display_symbol_data( sidepane=pp_pane, ) - chart._feeds[symbol.key] = feed - chart.setFocus() + ohlc_chart._feeds[symbol.key] = feed + ohlc_chart.setFocus() # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! # plot historical vwap if available @@ -1044,7 +1053,7 @@ async def display_symbol_data( # and 'bar_wap' in bars.dtype.fields # ): # wap_in_history = True - # chart.draw_curve( + # ohlc_chart.draw_curve( # name='bar_wap', # shm=ohlcv, # color='default_light', @@ -1097,7 +1106,7 @@ async def display_symbol_data( graphics_update_loop, ln, godwidget, - feed, + flume, wap_in_history, vlm_chart, ) @@ -1105,7 +1114,7 @@ async def display_symbol_data( await trio.sleep(0) # size view to data prior to order mode init - chart.default_view() + ohlc_chart.default_view() rt_linked.graphics_cycle() await trio.sleep(0) @@ -1119,9 +1128,9 @@ async def display_symbol_data( godwidget.resize_all() await link_views_with_region( - chart, + ohlc_chart, hist_chart, - feed, + flume, ) mode: OrderMode @@ -1135,7 +1144,7 @@ async def display_symbol_data( ): if not vlm_chart: # trigger another view reset if no sub-chart - chart.default_view() + ohlc_chart.default_view() rt_linked.mode = mode diff --git a/piker/ui/_overlay.py b/piker/ui/_overlay.py index af66a7354..ac15a9dc2 100644 --- a/piker/ui/_overlay.py +++ b/piker/ui/_overlay.py @@ -304,7 +304,7 @@ def __init__( # NOTE: required for scene layering/relaying; this guarantees # the "root" plot receives priority for interaction # events/signals. - root_plotitem.vb.setZValue(1000) + root_plotitem.vb.setZValue(10) self.overlays: list[PlotItem] = [] self.layout = ComposedGridLayout(root_plotitem) @@ -494,6 +494,8 @@ def size_to_viewbox(vb: 'ViewBox'): root.vb.setFocus() assert root.vb.focusWidget() + vb.setZValue(100) + def get_axis( self, plot: PlotItem, diff --git a/piker/ui/_position.py b/piker/ui/_position.py index f2ec1466f..985841619 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -45,7 +45,10 @@ from ..clearing._allocate import Allocator from ..pp import Position from ..data._normalize import iterticks -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data.types import Struct from ._label import Label from ._lines import LevelLine, order_line @@ -64,7 +67,7 @@ async def update_pnl_from_feed( - feed: Feed, + flume: Flume, order_mode: OrderMode, # noqa tracker: PositionTracker, @@ -95,7 +98,7 @@ async def update_pnl_from_feed( # real-time update pnl on the status pane try: - async with feed.stream.subscribe() as bstream: + async with flume.stream.subscribe() as bstream: # last_tick = time.time() async for quotes in bstream: @@ -390,12 +393,12 @@ def display_pnl( mode = self.order_mode sym = mode.chart.linked.symbol size = tracker.live_pp.size - feed = mode.quote_feed + flume: Feed = mode.feed.flumes[sym.fqsn] pnl_value = 0 if size: # last historical close price - last = feed.rt_shm.array[-1][['close']][0] + last = flume.rt_shm.array[-1][['close']][0] pnl_value = copysign(1, size) * pnl( tracker.live_pp.ppu, last, @@ -408,7 +411,7 @@ def display_pnl( _pnl_tasks[fqsn] = True self.order_mode.nursery.start_soon( update_pnl_from_feed, - feed, + flume, mode, tracker, ) diff --git a/piker/ui/_search.py b/piker/ui/_search.py index bbe883205..6c7c6fd8a 100644 --- a/piker/ui/_search.py +++ b/piker/ui/_search.py @@ -665,9 +665,9 @@ async def chart_current_item( log.info(f'Requesting symbol: {symbol}.{provider}') - await godw.load_symbol( + await godw.load_symbols( provider, - symbol, + [symbol], 'info', ) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index cc571eb67..a72c2f5c9 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -166,16 +166,16 @@ def chart( )) return - # global opts brokernames = config['brokers'] + brokermods = config['brokermods'] + assert brokermods tractorloglevel = config['tractorloglevel'] pikerloglevel = config['loglevel'] - _main( - sym=symbols[0], - brokernames=brokernames, + syms=symbols, + brokermods=brokermods, piker_loglevel=pikerloglevel, tractor_kwargs={ 'debug_mode': pdb, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index fa8ecbce2..7e4ae0663 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -44,7 +44,10 @@ ) from ._style import _font from ..data._source import Symbol -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data.types import Struct from ..log import get_logger from ._editors import LineEditor, ArrowEditor @@ -118,7 +121,6 @@ class OrderMode: chart: ChartPlotWidget # type: ignore # noqa hist_chart: ChartPlotWidget # type: ignore # noqa nursery: trio.Nursery # used by ``ui._position`` code? - quote_feed: Feed book: OrderBook lines: LineEditor arrows: ArrowEditor @@ -514,12 +516,13 @@ def on_fill( # XXX: seems to fail on certain types of races? # assert len(lines) == 2 if lines: - _, _, ratio = self.feed.get_ds_info() + flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn] + _, _, ratio = flume.get_ds_info() for i, chart in [ (arrow_index, self.chart), - (self.feed.izero_hist + (flume.izero_hist + - round((arrow_index - self.feed.izero_rt)/ratio), + round((arrow_index - flume.izero_rt)/ratio), self.hist_chart) ]: self.arrows.add( @@ -801,7 +804,6 @@ async def open_order_mode( chart, hist_chart, tn, - feed, book, lines, arrows, diff --git a/requirements.txt b/requirements.txt index 8f5736257..0f13d8919 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # we require a pinned dev branch to get some edge features that # are often untested in tractor's CI and/or being tested by us # first before committing as core features in tractor's base. --e git+https://github.com/goodboy/tractor.git@master#egg=tractor +-e git+https://github.com/goodboy/tractor.git@piker_pin#egg=tractor # `pyqtgraph` peeps keep breaking, fixing, improving so might as well # pin this to a dev branch that we have more control over especially diff --git a/tests/conftest.py b/tests/conftest.py index aaa125ced..1bd1d86e3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,12 @@ +from contextlib import asynccontextmanager as acm import os import pytest import tractor -import trio -from piker import log, config -from piker.brokers import questrade +from piker import ( + # log, + config, +) def pytest_addoption(parser): @@ -14,15 +16,6 @@ def pytest_addoption(parser): help="Use a practice API account") -@pytest.fixture(scope='session', autouse=True) -def loglevel(request): - orig = tractor.log._default_loglevel - level = tractor.log._default_loglevel = request.config.option.loglevel - log.get_console_log(level) - yield level - tractor.log._default_loglevel = orig - - @pytest.fixture(scope='session') def test_config(): dirname = os.path.dirname @@ -37,9 +30,11 @@ def test_config(): @pytest.fixture(scope='session', autouse=True) def confdir(request, test_config): - """If the `--confdir` flag is not passed use the + ''' + If the `--confdir` flag is not passed use the broker config file found in that dir. - """ + + ''' confdir = request.config.option.confdir if confdir is not None: config._override_config_dir(confdir) @@ -47,49 +42,61 @@ def confdir(request, test_config): return confdir -@pytest.fixture(scope='session', autouse=True) -def travis(confdir): - is_travis = os.environ.get('TRAVIS', False) - if is_travis: - # this directory is cached, see .travis.yaml - conf_file = config.get_broker_conf_path() - refresh_token = os.environ['QT_REFRESH_TOKEN'] - - def write_with_token(token): - # XXX don't pass the dir path here since may be - # written behind the scenes in the `confdir fixture` - if not os.path.isfile(conf_file): - open(conf_file, 'w').close() - conf, path = config.load() - conf.setdefault('questrade', {}).update( - {'refresh_token': token, - 'is_practice': 'True'} - ) - config.write(conf, path) - - async def ensure_config(): - # try to refresh current token using cached brokers config - # if it fails fail try using the refresh token provided by the - # env var and if that fails stop the test run here. - try: - async with questrade.get_client(ask_user=False): - pass - except ( - FileNotFoundError, ValueError, - questrade.BrokerError, questrade.QuestradeError, - trio.MultiError, - ): - # 3 cases: - # - config doesn't have a ``refresh_token`` k/v - # - cache dir does not exist yet - # - current token is expired; take it form env var - write_with_token(refresh_token) - - async with questrade.get_client(ask_user=False): - pass - - # XXX ``pytest_trio`` doesn't support scope or autouse - trio.run(ensure_config) +# @pytest.fixture(scope='session', autouse=True) +# def travis(confdir): +# is_travis = os.environ.get('TRAVIS', False) +# if is_travis: +# # this directory is cached, see .travis.yaml +# conf_file = config.get_broker_conf_path() +# refresh_token = os.environ['QT_REFRESH_TOKEN'] + +# def write_with_token(token): +# # XXX don't pass the dir path here since may be +# # written behind the scenes in the `confdir fixture` +# if not os.path.isfile(conf_file): +# open(conf_file, 'w').close() +# conf, path = config.load() +# conf.setdefault('questrade', {}).update( +# {'refresh_token': token, +# 'is_practice': 'True'} +# ) +# config.write(conf, path) + +# async def ensure_config(): +# # try to refresh current token using cached brokers config +# # if it fails fail try using the refresh token provided by the +# # env var and if that fails stop the test run here. +# try: +# async with questrade.get_client(ask_user=False): +# pass +# except ( +# FileNotFoundError, ValueError, +# questrade.BrokerError, questrade.QuestradeError, +# trio.MultiError, +# ): +# # 3 cases: +# # - config doesn't have a ``refresh_token`` k/v +# # - cache dir does not exist yet +# # - current token is expired; take it form env var +# write_with_token(refresh_token) + +# async with questrade.get_client(ask_user=False): +# pass + +# # XXX ``pytest_trio`` doesn't support scope or autouse +# trio.run(ensure_config) + + +_ci_env: bool = os.environ.get('CI', False) + + +@pytest.fixture(scope='session') +def ci_env() -> bool: + ''' + Detect CI envoirment. + + ''' + return _ci_env @pytest.fixture @@ -105,3 +112,56 @@ def tmx_symbols(): @pytest.fixture def cse_symbols(): return ['TRUL.CN', 'CWEB.CN', 'SNN.CN'] + + +@acm +async def _open_test_pikerd( + reg_addr: tuple[str, int] | None = None, + **kwargs, + +) -> tuple[ + str, + int, + tractor.Portal +]: + ''' + Testing helper to startup the service tree and runtime on + a different port then the default to allow testing alongside + a running stack. + + ''' + import random + from piker._daemon import maybe_open_pikerd + + if reg_addr is None: + port = random.randint(6e3, 7e3) + reg_addr = ('127.0.0.1', port) + + async with ( + maybe_open_pikerd( + registry_addr=reg_addr, + **kwargs, + ), + ): + async with tractor.wait_for_actor( + 'pikerd', + arbiter_sockaddr=reg_addr, + ) as portal: + raddr = portal.channel.raddr + assert raddr == reg_addr + yield ( + raddr[0], + raddr[1], + portal, + ) + + +@pytest.fixture +def open_test_pikerd(): + + yield _open_test_pikerd + + # TODO: teardown checks such as, + # - no leaked subprocs or shm buffers + # - all requested container service are torn down + # - certain ``tractor`` runtime state? diff --git a/tests/test_feeds.py b/tests/test_feeds.py new file mode 100644 index 000000000..2b85301f0 --- /dev/null +++ b/tests/test_feeds.py @@ -0,0 +1,128 @@ +''' +Data feed layer APIs, performance, msg throttling. + +''' +from collections import Counter +from pprint import pprint +from typing import AsyncContextManager + +import pytest +# import tractor +import trio +from piker.data import ( + ShmArray, + open_feed, +) +from piker.data._source import ( + unpack_fqsn, +) + + +@pytest.mark.parametrize( + 'fqsns', + [ + # binance + (100, {'btcusdt.binance', 'ethusdt.binance'}, False), + + # kraken + (20, {'ethusdt.kraken', 'xbtusd.kraken'}, True), + + # binance + kraken + (100, {'btcusdt.binance', 'xbtusd.kraken'}, False), + ], + ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}', +) +def test_multi_fqsn_feed( + open_test_pikerd: AsyncContextManager, + fqsns: set[str], + ci_env: bool +): + ''' + Start a real-time data feed for provided fqsn and pull + a few quotes then simply shut down. + + ''' + max_quotes, fqsns, run_in_ci = fqsns + + if ( + ci_env + and not run_in_ci + ): + pytest.skip('Skipping CI disabled test due to feed restrictions') + + brokers = set() + for fqsn in fqsns: + brokername, key, suffix = unpack_fqsn(fqsn) + brokers.add(brokername) + + async def main(): + async with ( + open_test_pikerd(), + open_feed( + fqsns, + loglevel='info', + + # TODO: ensure throttle rate is applied + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + # tick_throttle=_quote_throttle_rate, + + ) as feed + ): + # verify shm buffers exist + for fqin in fqsns: + flume = feed.flumes[fqin] + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + async with feed.open_multi_stream(brokers) as stream: + + # pull the first startup quotes, one for each fqsn, and + # ensure they match each flume's startup quote value. + fqsns_copy = fqsns.copy() + with trio.fail_after(0.5): + for _ in range(1): + first_quotes = await stream.receive() + for fqsn, quote in first_quotes.items(): + + # XXX: TODO: WTF apparently this error will get + # supressed and only show up in the teardown + # excgroup if we don't have the fix from + # + # assert 0 + + fqsns_copy.remove(fqsn) + flume = feed.flumes[fqsn] + assert quote['last'] == flume.first_quote['last'] + + cntr = Counter() + with trio.fail_after(6): + async for quotes in stream: + for fqsn, quote in quotes.items(): + cntr[fqsn] += 1 + + # await tractor.breakpoint() + flume = feed.flumes[fqsn] + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + # print quote msg, rt and history + # buffer values on console. + rt_row = ohlcv.array[-1] + hist_row = hist_ohlcv.array[-1] + # last = quote['last'] + + # assert last == rt_row['close'] + # assert last == hist_row['close'] + pprint( + f'{fqsn}: {quote}\n' + f'rt_ohlc: {rt_row}\n' + f'hist_ohlc: {hist_row}\n' + ) + + if cntr.total() >= max_quotes: + break + + assert set(cntr.keys()) == fqsns + + trio.run(main)