Skip to content

Commit

Permalink
Move iter_dfs_from_shms into .data.history
Browse files Browse the repository at this point in the history
Thinking about just moving all of that module (after a content breakup)
to a new `.piker.tsp` which will mostly depend on the `.data` and
`.storage` sub-pkgs; the idea is to move biz-logic for tsdb IO/mgmt and
orchestration with real-time (shm) buffers and the graphics layer into
a common spot for both manual analysis/research work and better
separation of low level data structure primitives from their higher
level usage.

Add a better `data.history` mod doc string in prep for this move
as well as clean out a bunch of legacy commented cruft from the
`trimeter` and `marketstore` days.

TO CHERRY #486 (if we can)
  • Loading branch information
goodboy committed Dec 15, 2023
1 parent 3639f36 commit 8989c73
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 112 deletions.
132 changes: 98 additions & 34 deletions piker/data/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@
# <https://www.gnu.org/licenses/>.

'''
Historical data business logic for load, backfill and tsdb storage.
Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
'''
from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime
from functools import partial
# import time
from pathlib import Path
from types import ModuleType
from typing import (
Callable,
Generator,
TYPE_CHECKING,
)

Expand Down Expand Up @@ -118,6 +125,7 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()]


# TODO: can't we just make this a sync func now?
async def shm_push_in_between(
shm: ShmArray,
to_push: np.ndarray,
Expand All @@ -126,6 +134,10 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False,

) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push(
to_push,
prepend=True,
Expand All @@ -146,24 +158,6 @@ async def shm_push_in_between(
else None
),
)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]

# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
'open',
'high',
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
# await tractor.pause()


async def maybe_fill_null_segments(
Expand Down Expand Up @@ -260,6 +254,20 @@ async def maybe_fill_null_segments(
):
await tractor.pause()

array = shm.array
zeros = array[array['low'] == 0]

# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
'open',
'high',
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']

# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
Expand Down Expand Up @@ -331,17 +339,6 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration)


# TODO: can we drop this? without conc i don't think this
# is necessary any more?
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()

# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
Expand Down Expand Up @@ -1198,3 +1195,70 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing.
await trio.sleep_forever()


def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}

# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')

for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name

# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue

assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)

for shmfile in shmfiles:

# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]

# skip FSP buffers for now..
if key not in sizes:
continue

size: int = sizes[key]

# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array

from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)

yield (
shmfile,
shm,
df,
)


14 changes: 12 additions & 2 deletions piker/data/tsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,16 @@ def get_null_segs(
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
absi_zeros[0] - 1,
max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum.
absi_zeros[-1] + 1,
min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]]
else:
# XXX EDGE CASE: only one null-datum found so
Expand Down Expand Up @@ -484,6 +490,10 @@ def iter_null_segs(
start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t)

if absi_start < 0:
import pdbp
pdbp.set_trace()

yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
Expand Down
97 changes: 21 additions & 76 deletions piker/storage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from __future__ import annotations
from pathlib import Path
import time
from typing import Generator
# from typing import TYPE_CHECKING

import polars as pl
import numpy as np
Expand All @@ -37,14 +35,11 @@
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.data.history import (
_default_hist_size,
_default_rt_size,
iter_dfs_from_shms,
)
from . import (
log,
Expand Down Expand Up @@ -190,6 +185,13 @@ async def main():
)
assert first_dt < last_dt

null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()

shm_df: pl.DataFrame = await client.as_df(
fqme,
period,
Expand All @@ -204,6 +206,7 @@ async def main():
diff,
) = tsp.dedupe(shm_df)


if diff:
await client.write_ohlcv(
fqme,
Expand All @@ -219,69 +222,6 @@ async def main():
trio.run(main)


def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}

# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')

for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name

# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue

assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)

for shmfile in shmfiles:

# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]

# skip FSP buffers for now..
if key not in sizes:
continue

size: int = sizes[key]

# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array

from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)

yield (
shmfile,
shm,
df,
)


@store.command()
def ldshm(
fqme: str,
Expand All @@ -307,8 +247,8 @@ async def main():

# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
period_s: float = times[-1] - times[-2]
if period_s < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
Expand All @@ -323,17 +263,22 @@ async def main():
diff,
) = tsp.dedupe(shm_df)

null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)

# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if (
not gaps.is_empty()
or secs > 2
):
if not gaps.is_empty():
await tractor.pause()

if null_segs:
await tractor.pause()

# write to parquet file?
if write_parquet:
timeframe: str = f'{secs}s'
timeframe: str = f'{period_s}s'

datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
Expand Down

0 comments on commit 8989c73

Please sign in to comment.