diff --git a/piker/data/feed.py b/piker/data/feed.py index 047bd40dc..52316e990 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -333,7 +333,7 @@ async def allocate_persistent_feed( manage_history, mod, bus, - fqme, + mkt, some_data_ready, feed_is_live, ) @@ -378,7 +378,12 @@ async def allocate_persistent_feed( # NOTE: if not configured otherwise, we always sum tick volume # values in the OHLCV sampler. - sum_tick_vlm: bool = (init.shm_write_opts or {}).get('sum_tick_vlm', True) + sum_tick_vlm: bool = True + if init.shm_write_opts: + sum_tick_vlm: bool = init.shm_write_opts.get( + 'sum_tick_vlm', + True, + ) # NOTE: if no high-freq sampled data has (yet) been loaded, # seed the buffer with a history datum - this is most handy @@ -525,7 +530,7 @@ async def open_feed_bus( # NOTE we allow this since it's common to have the live # quote feed actor's sampling task push faster then the # the local UI-graphics code during startup. - allow_overruns=True, + # allow_overruns=True, ) as stream, ): diff --git a/piker/data/history.py b/piker/data/history.py index 3e0a3a625..00cc019ef 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -38,6 +38,11 @@ import pendulum import numpy as np +from .. import config +from ..accounting._mktinfo import ( + MktPair, + unpack_fqme, +) from ._util import ( log, ) @@ -84,7 +89,7 @@ def diff_history( async def start_backfill( mod: ModuleType, - bfqsn: str, + mkt: MktPair, shm: ShmArray, timeframe: float, sampler_stream: tractor.MsgStream, @@ -104,7 +109,11 @@ async def start_backfill( tuple[np.ndarray, str] ] config: dict[str, int] - async with mod.open_history_client(bfqsn) as (hist, config): + + bs_fqme: str = mkt.bs_fqme + async with mod.open_history_client( + bs_fqme, + ) as (hist, config): # get latest query's worth of history all the way # back to what is recorded in the tsdb @@ -134,7 +143,7 @@ async def start_backfill( surr = array[-6:] diff_in_mins = round(diff/60., ndigits=2) log.warning( - f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' + f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n' f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' 'Surrounding 6 time stamps:\n' f'{list(surr["time"])}\n' @@ -161,7 +170,7 @@ async def start_backfill( shm.push(to_push, prepend=True) # TODO: *** THIS IS A BUG *** - # we need to only broadcast to subscribers for this fqsn.. + # we need to only broadcast to subscribers for this fqme.. # otherwise all fsps get reset on every chart.. await sampler_stream.send('broadcast_all') @@ -248,7 +257,7 @@ async def start_backfill( ): start_dt = min(starts) log.warning( - f"{bfqsn}: skipping duplicate frame @ {next_start_dt}" + f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}" ) starts[start_dt] += 1 continue @@ -321,7 +330,7 @@ async def start_backfill( f'{start_dt} -> {end_dt}' ) await storage.write_ohlcv( - f'{bfqsn}.{mod.name}', # lul.. + f'{mkt.fqme}', to_push, timeframe, ) @@ -342,7 +351,7 @@ async def start_backfill( async def basic_backfill( bus: _FeedsBus, mod: ModuleType, - bfqsn: str, + mkt: MktPair, shms: dict[int, ShmArray], sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, @@ -361,7 +370,7 @@ async def basic_backfill( partial( start_backfill, mod, - bfqsn, + mkt, shm, timeframe, sampler_stream, @@ -378,8 +387,7 @@ async def tsdb_backfill( marketstore: ModuleType, bus: _FeedsBus, storage: Storage, - fqsn: str, - bfqsn: str, + mkt: MktPair, shms: dict[int, ShmArray], sampler_stream: tractor.MsgStream, feed_is_live: trio.Event, @@ -393,17 +401,17 @@ async def tsdb_backfill( # TODO: this should be used verbatim for the pure # shm backfiller approach below. dts_per_tf: dict[int, datetime] = {} + fqme: str = mkt.fqme # start history anal and load missing new data via backend. for timeframe, shm in shms.items(): # loads a (large) frame of data from the tsdb depending # on the db's query size limit. tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( - fqsn, + fqme, timeframe=timeframe, ) - broker, *_ = unpack_fqme(fqsn) try: ( latest_start_dt, @@ -413,7 +421,7 @@ async def tsdb_backfill( partial( start_backfill, mod, - bfqsn, + mkt, shm, timeframe, sampler_stream, @@ -541,7 +549,7 @@ async def back_load_from_tsdb( while shm._first.value > 0: tsdb_history = await storage.read_ohlcv( - fqsn, + fqme, timeframe=timeframe, end=tsdb_last_frame_start, ) @@ -599,7 +607,7 @@ async def back_load_from_tsdb( async def manage_history( mod: ModuleType, bus: _FeedsBus, - fqsn: str, + mkt: MktPair, some_data_ready: trio.Event, feed_is_live: trio.Event, timeframe: float = 60, # in seconds @@ -628,11 +636,12 @@ async def manage_history( name, uuid = uid service = name.rstrip(f'.{mod.name}') + fqme: str = mkt.fqme + # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( - # key=f'{fqsn}_hist_p{port}', - key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist', + key=f'piker.{service}[{uuid[:16]}.{fqme}.hist', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -649,9 +658,7 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( - # key=f'{fqsn}_rt_p{port}', - # key=f'piker.{service}.{fqsn}_rt.{uuid}', - key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt', + key=f'piker.{service}[{uuid[:16]}.{fqme}.rt', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -691,23 +698,47 @@ async def manage_history( ) as sample_stream: - log.info('Scanning for existing `marketstored`') - tsdb_is_up = await check_for_service('marketstored') - - bfqsn = fqsn.replace('.' + mod.name, '') - open_history_client = getattr(mod, 'open_history_client', None) + open_history_client = getattr( + mod, + 'open_history_client', + None, + ) assert open_history_client + conf, path = config.load('conf') + tsdbconf = conf['network'].get('tsdb') + + # lookup backend tsdb module by name and load any user service + # settings for connecting to the tsdb service. + tsdb_backend: str = tsdbconf.pop('backend') + tsdb_host: str = tsdbconf['host'] + + # TODO: import and load storagemod by name + # mod = get_storagemod(tsdb_backend) + from ..service import marketstore + + tsdb_is_up: bool = False + try_remote_tsdb: bool = False + if tsdb_host == 'localhost': + log.info('Scanning for existing `{tsbd_backend}`') + tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d') + else: + try_remote_tsdb: bool = True + if ( tsdb_is_up - and opened - and open_history_client + or try_remote_tsdb + and ( + opened + and open_history_client + ) ): log.info('Found existing `marketstored`') - from ..service import marketstore async with ( - marketstore.open_storage_client(fqsn)as storage, + marketstore.open_storage_client( + **tsdbconf + ) as storage, ): # TODO: drop returning the output that we pass in? await bus.nursery.start( @@ -716,8 +747,7 @@ async def manage_history( marketstore, bus, storage, - fqsn, - bfqsn, + mkt, { 1: rt_shm, 60: hist_shm, @@ -752,7 +782,7 @@ async def manage_history( await basic_backfill( bus, mod, - bfqsn, + mkt, { 1: rt_shm, 60: hist_shm,