Skip to content

Commit

Permalink
Pass and use MktPair throughout history routines
Browse files Browse the repository at this point in the history
Previously we were passing the `fqme: str` which isn't as extensive nor
were we able to pass `MktPair` direct to backend history manager-loading
routines (which should be able to rely on always receiving it since
currently `stream_quotes()` is always called first for setup).

This also starts a slight bit of configuration oriented tsdb info
loading (via a new `conf.toml`) such that a user can decide to host
their (marketstore) db on a remote host and our container spawning and
client code will do the right startup automatically based on the config.
|-> Related to this I've added some comments about doing storage
backend module loading which should get actually written out as part of
patches coming in #486 (or something related).

Don't allow overruns again in history context since it seems it was
never a problem?
  • Loading branch information
goodboy committed May 17, 2023
1 parent 5c8a45c commit ae049eb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 35 deletions.
11 changes: 8 additions & 3 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
fqme,
mkt,
some_data_ready,
feed_is_live,
)
Expand Down Expand Up @@ -378,7 +378,12 @@ async def allocate_persistent_feed(

# NOTE: if not configured otherwise, we always sum tick volume
# values in the OHLCV sampler.
sum_tick_vlm: bool = (init.shm_write_opts or {}).get('sum_tick_vlm', True)
sum_tick_vlm: bool = True
if init.shm_write_opts:
sum_tick_vlm: bool = init.shm_write_opts.get(
'sum_tick_vlm',
True,
)

# NOTE: if no high-freq sampled data has (yet) been loaded,
# seed the buffer with a history datum - this is most handy
Expand Down Expand Up @@ -525,7 +530,7 @@ async def open_feed_bus(
# NOTE we allow this since it's common to have the live
# quote feed actor's sampling task push faster then the
# the local UI-graphics code during startup.
allow_overruns=True,
# allow_overruns=True,
) as stream,
):

Expand Down
94 changes: 62 additions & 32 deletions piker/data/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
import pendulum
import numpy as np

from .. import config
from ..accounting._mktinfo import (
MktPair,
unpack_fqme,
)
from ._util import (
log,
)
Expand Down Expand Up @@ -84,7 +89,7 @@ def diff_history(

async def start_backfill(
mod: ModuleType,
bfqsn: str,
mkt: MktPair,
shm: ShmArray,
timeframe: float,
sampler_stream: tractor.MsgStream,
Expand All @@ -104,7 +109,11 @@ async def start_backfill(
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(bfqsn) as (hist, config):

bs_fqme: str = mkt.bs_fqme
async with mod.open_history_client(
bs_fqme,
) as (hist, config):

# get latest query's worth of history all the way
# back to what is recorded in the tsdb
Expand Down Expand Up @@ -134,7 +143,7 @@ async def start_backfill(
surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2)
log.warning(
f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n'
f'STEP ERROR `{bs_fqme}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n'
Expand All @@ -161,7 +170,7 @@ async def start_backfill(
shm.push(to_push, prepend=True)

# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# we need to only broadcast to subscribers for this fqme..
# otherwise all fsps get reset on every chart..
await sampler_stream.send('broadcast_all')

Expand Down Expand Up @@ -248,7 +257,7 @@ async def start_backfill(
):
start_dt = min(starts)
log.warning(
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}"
f"{bs_fqme}: skipping duplicate frame @ {next_start_dt}"
)
starts[start_dt] += 1
continue
Expand Down Expand Up @@ -321,7 +330,7 @@ async def start_backfill(
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
f'{mkt.fqme}',
to_push,
timeframe,
)
Expand All @@ -342,7 +351,7 @@ async def start_backfill(
async def basic_backfill(
bus: _FeedsBus,
mod: ModuleType,
bfqsn: str,
mkt: MktPair,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
Expand All @@ -361,7 +370,7 @@ async def basic_backfill(
partial(
start_backfill,
mod,
bfqsn,
mkt,
shm,
timeframe,
sampler_stream,
Expand All @@ -378,8 +387,7 @@ async def tsdb_backfill(
marketstore: ModuleType,
bus: _FeedsBus,
storage: Storage,
fqsn: str,
bfqsn: str,
mkt: MktPair,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
Expand All @@ -393,17 +401,17 @@ async def tsdb_backfill(
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme

# start history anal and load missing new data via backend.
for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit.
tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
fqsn,
fqme,
timeframe=timeframe,
)

broker, *_ = unpack_fqme(fqsn)
try:
(
latest_start_dt,
Expand All @@ -413,7 +421,7 @@ async def tsdb_backfill(
partial(
start_backfill,
mod,
bfqsn,
mkt,
shm,
timeframe,
sampler_stream,
Expand Down Expand Up @@ -541,7 +549,7 @@ async def back_load_from_tsdb(
while shm._first.value > 0:

tsdb_history = await storage.read_ohlcv(
fqsn,
fqme,
timeframe=timeframe,
end=tsdb_last_frame_start,
)
Expand Down Expand Up @@ -599,7 +607,7 @@ async def back_load_from_tsdb(
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
fqsn: str,
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
Expand Down Expand Up @@ -628,11 +636,12 @@ async def manage_history(
name, uuid = uid
service = name.rstrip(f'.{mod.name}')

fqme: str = mkt.fqme

# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_hist_p{port}',
key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist',
key=f'piker.{service}[{uuid[:16]}.{fqme}.hist',

# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
Expand All @@ -649,9 +658,7 @@ async def manage_history(
)

rt_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_rt_p{port}',
# key=f'piker.{service}.{fqsn}_rt.{uuid}',
key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt',
key=f'piker.{service}[{uuid[:16]}.{fqme}.rt',

# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
Expand Down Expand Up @@ -691,23 +698,47 @@ async def manage_history(

) as sample_stream:

log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')

bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
open_history_client = getattr(
mod,
'open_history_client',
None,
)
assert open_history_client

conf, path = config.load('conf')
tsdbconf = conf['network'].get('tsdb')

# lookup backend tsdb module by name and load any user service
# settings for connecting to the tsdb service.
tsdb_backend: str = tsdbconf.pop('backend')
tsdb_host: str = tsdbconf['host']

# TODO: import and load storagemod by name
# mod = get_storagemod(tsdb_backend)
from ..service import marketstore

tsdb_is_up: bool = False
try_remote_tsdb: bool = False
if tsdb_host == 'localhost':
log.info('Scanning for existing `{tsbd_backend}`')
tsdb_is_up: bool = await check_for_service(f'{tsdb_backend}d')
else:
try_remote_tsdb: bool = True

if (
tsdb_is_up
and opened
and open_history_client
or try_remote_tsdb
and (
opened
and open_history_client
)
):
log.info('Found existing `marketstored`')

from ..service import marketstore
async with (
marketstore.open_storage_client(fqsn)as storage,
marketstore.open_storage_client(
**tsdbconf
) as storage,
):
# TODO: drop returning the output that we pass in?
await bus.nursery.start(
Expand All @@ -716,8 +747,7 @@ async def manage_history(
marketstore,
bus,
storage,
fqsn,
bfqsn,
mkt,
{
1: rt_shm,
60: hist_shm,
Expand Down Expand Up @@ -752,7 +782,7 @@ async def manage_history(
await basic_backfill(
bus,
mod,
bfqsn,
mkt,
{
1: rt_shm,
60: hist_shm,
Expand Down

0 comments on commit ae049eb

Please sign in to comment.