From 7b4472e37e7eda687f3b47321fe8dd2d20b65d4c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Jun 2023 19:30:20 -0400 Subject: [PATCH 1/7] data._sampling.frame_ticks(): slight rework to generalize --- piker/data/_sampling.py | 73 ++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 641edf539..aeabea0b4 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -27,6 +27,7 @@ from contextlib import asynccontextmanager as acm import time from typing import ( + Any, AsyncIterator, TYPE_CHECKING, ) @@ -587,9 +588,9 @@ async def sample_and_broadcast( # TODO: we should probably not write every single # value to an OHLC sample stream XD # for a tick stream sure.. but this is excessive.. - ticks = quote['ticks'] + ticks: list[dict] = quote['ticks'] for tick in ticks: - ticktype = tick['type'] + ticktype: str = tick['type'] # write trade events to shm last OHLC sample if ticktype in ('trade', 'utrade'): @@ -599,13 +600,14 @@ async def sample_and_broadcast( # more compact inline-way to do this assignment # to both buffers? for shm in [rt_shm, hist_shm]: + # update last entry # benchmarked in the 4-5 us range o, high, low, v = shm.array[-1][ ['open', 'high', 'low', 'volume'] ] - new_v = tick.get('size', 0) + new_v: float = tick.get('size', 0) if v == 0 and new_v: # no trades for this bar yet so the open @@ -654,7 +656,7 @@ async def sample_and_broadcast( # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct # key here. - fqme = f'{broker_symbol}.{brokername}' + fqme: str = f'{broker_symbol}.{brokername}' lags: int = 0 # TODO: speed up this loop in an AOT compiled lang (like @@ -757,28 +759,21 @@ async def sample_and_broadcast( def frame_ticks( - first_quote: dict, - last_quote: dict, - ticks_by_type: dict, -) -> None: + quote: dict[str, Any], + + ticks_by_type: dict[str, list[dict[str, Any]]] = {}, + ticks_in_order: list[dict[str, Any]] | None = None + +) -> dict: + # append quotes since last iteration into the last quote's # tick array/buffer. - ticks = last_quote.get('ticks') - # TODO: once we decide to get fancy really we should # have a shared mem tick buffer that is just # continually filled and the UI just ready from it # at it's display rate. - if ticks: - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - first_quote['ticks'].extend(ticks) + + if ticks := quote.get('ticks'): # XXX: build a tick-by-type table of lists # of tick messages. This allows for less @@ -797,9 +792,25 @@ def frame_ticks( # append in reverse FIFO order for in-order iteration on # receiver side. + tick: dict[str, Any] for tick in ticks: - ttype = tick['type'] - ticks_by_type[ttype].append(tick) + ticks_by_type.setdefault( + tick['type'], + [], + ).append(tick) + + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + if ticks_in_order: + ticks_in_order.extend(ticks) + + return ticks_by_type async def uniform_rate_send( @@ -835,10 +846,10 @@ async def uniform_rate_send( diff = 0 task_status.started() - ticks_by_type: defaultdict[ + ticks_by_type: dict[ str, - list[dict], - ] = defaultdict(list) + list[dict[str, Any]], + ] = {} clear_types = _tick_groups['clears'] @@ -866,9 +877,9 @@ async def uniform_rate_send( # expired we aren't supposed to send yet so append # to the tick frame. frame_ticks( - first_quote, last_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # send cycle isn't due yet so continue waiting @@ -888,8 +899,8 @@ async def uniform_rate_send( frame_ticks( first_quote, - first_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # we have a quote already so send it now. @@ -905,9 +916,9 @@ async def uniform_rate_send( break frame_ticks( - first_quote, last_quote, - ticks_by_type, + ticks_in_order=first_quote['ticks'], + ticks_by_type=ticks_by_type, ) # measured_rate = 1 / (time.time() - last_send) From eacc59226f84718612cbe98cf5b8338e099296bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Jun 2023 19:41:27 -0400 Subject: [PATCH 2/7] rename `.data._normalize` -> `.ticktools` --- piker/clearing/_ems.py | 4 ++-- piker/clearing/_paper_engine.py | 2 +- piker/data/__init__.py | 2 +- piker/data/{_normalize.py => ticktools.py} | 0 piker/fsp/_momo.py | 2 +- piker/fsp/_volume.py | 2 +- piker/ui/_position.py | 4 ++-- 7 files changed, 8 insertions(+), 8 deletions(-) rename piker/data/{_normalize.py => ticktools.py} (100%) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index c18631a66..b43d8fd16 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -47,12 +47,12 @@ log, # sub-sys logger get_console_log, ) -from ..data._normalize import iterticks from ..accounting._mktinfo import ( unpack_fqme, dec_digits, ) from ..ui._notify import notify_from_ems_status_msg +from ..data import iterticks from ..data.types import Struct from ._messages import ( Order, @@ -67,7 +67,7 @@ ) if TYPE_CHECKING: - from ..data.feed import ( + from ..data import ( Feed, Flume, ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index ac5f3d3fd..2df4eb4e8 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -51,7 +51,7 @@ open_trade_ledger, open_pps, ) -from ..data._normalize import iterticks +from ..data import iterticks from ..accounting import unpack_fqme from ._util import ( log, # sub-sys logger diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 087928ec7..6c621248a 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -22,7 +22,7 @@ sharing live streams over a network. """ -from ._normalize import iterticks +from .ticktools import iterticks from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, diff --git a/piker/data/_normalize.py b/piker/data/ticktools.py similarity index 100% rename from piker/data/_normalize.py rename to piker/data/ticktools.py diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 488ae22c1..d1463c224 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -24,7 +24,7 @@ from numba import jit, float64, optional, int64 from ._api import fsp -from ..data._normalize import iterticks +from ..data import iterticks from ..data._sharedmem import ShmArray diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 06d0be914..594e80e4e 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -20,7 +20,7 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp -from ..data._normalize import iterticks +from ..data import iterticks from ..data._sharedmem import ShmArray from ._momo import _wma from ..log import get_logger diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 0cf181369..90976a1e8 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -56,8 +56,8 @@ _derivs, ) -from ..data._normalize import iterticks -from ..data.feed import ( +from ..data import ( + iterticks, Feed, Flume, ) From 621634b5a244094eb2cc880288c629cc18dc4941 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Jun 2023 19:46:51 -0400 Subject: [PATCH 3/7] Move `frame_ticks()` and tick-type defs into `.ticktools` --- piker/data/_sampling.py | 75 +++-------------------------------------- piker/data/ticktools.py | 75 ++++++++++++++++++++++++++++++++++++++++- piker/ui/_display.py | 10 +++--- 3 files changed, 84 insertions(+), 76 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index aeabea0b4..1db800153 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -39,6 +39,10 @@ import trio from trio_typing import TaskStatus +from .ticktools import ( + frame_ticks, + _tick_groups, +) from ._util import ( log, get_console_log, @@ -742,77 +746,6 @@ async def sample_and_broadcast( ) -# tick-type-classes template for all possible "lowest level" events -# that can can be emitted by the "top of book" L1 queues and -# price-matching (with eventual clearing) in a double auction -# market (queuing) system. -_tick_groups = { - 'clears': {'trade', 'dark_trade', 'last'}, - 'bids': {'bid', 'bsize'}, - 'asks': {'ask', 'asize'}, -} - -# XXX alo define the flattened set of all such "fundamental ticks" -# so that it can be used as filter, eg. in the graphics display -# loop to compute running windowed y-ranges B) -_auction_ticks: set[str] = set.union(*_tick_groups.values()) - - -def frame_ticks( - quote: dict[str, Any], - - ticks_by_type: dict[str, list[dict[str, Any]]] = {}, - ticks_in_order: list[dict[str, Any]] | None = None - -) -> dict: - - # append quotes since last iteration into the last quote's - # tick array/buffer. - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - - if ticks := quote.get('ticks'): - - # XXX: build a tick-by-type table of lists - # of tick messages. This allows for less - # iteration on the receiver side by allowing for - # a single "latest tick event" look up by - # indexing the last entry in each sub-list. - # tbt = { - # 'types': ['bid', 'asize', 'last', .. ''], - - # 'bid': [tick0, tick1, tick2, .., tickn], - # 'asize': [tick0, tick1, tick2, .., tickn], - # 'last': [tick0, tick1, tick2, .., tickn], - # ... - # '': [tick0, tick1, tick2, .., tickn], - # } - - # append in reverse FIFO order for in-order iteration on - # receiver side. - tick: dict[str, Any] - for tick in ticks: - ticks_by_type.setdefault( - tick['type'], - [], - ).append(tick) - - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - if ticks_in_order: - ticks_in_order.extend(ticks) - - return ticks_by_type - - async def uniform_rate_send( rate: float, diff --git a/piker/data/ticktools.py b/piker/data/ticktools.py index 13708252a..bc3543f87 100644 --- a/piker/data/ticktools.py +++ b/piker/data/ticktools.py @@ -19,7 +19,25 @@ ''' from itertools import chain -from typing import AsyncIterator +from typing import ( + Any, + AsyncIterator, +) + +# tick-type-classes template for all possible "lowest level" events +# that can can be emitted by the "top of book" L1 queues and +# price-matching (with eventual clearing) in a double auction +# market (queuing) system. +_tick_groups: dict[str, set[str]] = { + 'clears': {'trade', 'dark_trade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} + +# XXX alo define the flattened set of all such "fundamental ticks" +# so that it can be used as filter, eg. in the graphics display +# loop to compute running windowed y-ranges B) +_auction_ticks: set[str] = set.union(*_tick_groups.values()) def iterticks( @@ -80,3 +98,58 @@ def iterticks( ttype = tick.get('type') if ttype in types: yield tick + + +def frame_ticks( + quote: dict[str, Any], + + ticks_by_type: dict[str, list[dict[str, Any]]] = {}, + ticks_in_order: list[dict[str, Any]] | None = None + +) -> dict: + + # append quotes since last iteration into the last quote's + # tick array/buffer. + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + + if ticks := quote.get('ticks'): + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + + # append in reverse FIFO order for in-order iteration on + # receiver side. + tick: dict[str, Any] + for tick in ticks: + ticks_by_type.setdefault( + tick['type'], + [], + ).append(tick) + + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + if ticks_in_order: + ticks_in_order.extend(ticks) + + return ticks_by_type diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 1884d018b..610b38f3d 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -39,20 +39,23 @@ from ..accounting import ( MktPair, ) -from ..data.feed import ( +from ..data import ( open_feed, Feed, Flume, ) +from ..data.ticktools import ( + _tick_groups, + _auction_ticks, +) from ..data.types import Struct from ..data._sharedmem import ( ShmArray, ) from ..data._sampling import ( - _tick_groups, - _auction_ticks, open_sample_stream, ) +# from ..data._source import tf_in_1s from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -72,7 +75,6 @@ mk_order_pane_layout, ) from . import _pg_overrides as pgo -# from ..data._source import tf_in_1s from .order_mode import ( open_order_mode, OrderMode, From ea270d3396ad31302b73bfa34fbc913c1eeb5be1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 09:31:08 -0400 Subject: [PATCH 4/7] .data.ticktools: add reverse flag, better docs Since it may be handy to get the latest ticks first, add a `reverse: bool` to `iterticks()` and add some cleaner logic and a proper doc string to `frame_ticks()`. --- piker/data/ticktools.py | 134 +++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 58 deletions(-) diff --git a/piker/data/ticktools.py b/piker/data/ticktools.py index bc3543f87..1ce7fa455 100644 --- a/piker/data/ticktools.py +++ b/piker/data/ticktools.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,7 +15,7 @@ # along with this program. If not, see . ''' -Stream format enforcement. +Tick event stream processing, filter-by-types, format-normalization. ''' from itertools import chain @@ -40,6 +40,69 @@ _auction_ticks: set[str] = set.union(*_tick_groups.values()) +def frame_ticks( + quote: dict[str, Any], + + ticks_by_type: dict | None = None, + ticks_in_order: list[dict[str, Any]] | None = None + +) -> dict[ + str, + list[dict[str, Any]] +]: + ''' + XXX: build a tick-by-type table of lists + of tick messages. This allows for less + iteration on the receiver side by allowing for + a single "latest tick event" look up by + indexing the last entry in each sub-list. + + tbt = { + 'types': ['bid', 'asize', 'last', .. ''], + + 'bid': [tick0, tick1, tick2, .., tickn], + 'asize': [tick0, tick1, tick2, .., tickn], + 'last': [tick0, tick1, tick2, .., tickn], + ... + '': [tick0, tick1, tick2, .., tickn], + } + + If `ticks_in_order` is provided, append any retrieved ticks + since last iteration into this array/buffer/list. + + ''' + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + + tbt = ticks_by_type if ticks_by_type is not None else {} + if not (ticks := quote.get('ticks')): + return tbt + + # append in reverse FIFO order for in-order iteration on + # receiver side. + tick: dict[str, Any] + for tick in ticks: + tbt.setdefault( + tick['type'], + [], + ).append(tick) + + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + if ticks_in_order: + ticks_in_order.extend(ticks) + + return tbt + + def iterticks( quote: dict, types: tuple[str] = ( @@ -47,10 +110,16 @@ def iterticks( 'dark_trade', ), deduplicate_darks: bool = False, + reverse: bool = False, + + # TODO: should we offer delegating to `frame_ticks()` above + # with this? + frame_by_type: bool = False, ) -> AsyncIterator: ''' - Iterate through ticks delivered per quote cycle. + Iterate through ticks delivered per quote cycle, filter and + yield any declared in `types`. ''' if deduplicate_darks: @@ -93,63 +162,12 @@ def iterticks( # re-insert ticks ticks.extend(list(chain(trades.values(), darks.values()))) + # most-recent-first + if reverse: + ticks = reversed(ticks) + for tick in ticks: # print(f"{quote['symbol']}: {tick}") ttype = tick.get('type') if ttype in types: yield tick - - -def frame_ticks( - quote: dict[str, Any], - - ticks_by_type: dict[str, list[dict[str, Any]]] = {}, - ticks_in_order: list[dict[str, Any]] | None = None - -) -> dict: - - # append quotes since last iteration into the last quote's - # tick array/buffer. - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - - if ticks := quote.get('ticks'): - - # XXX: build a tick-by-type table of lists - # of tick messages. This allows for less - # iteration on the receiver side by allowing for - # a single "latest tick event" look up by - # indexing the last entry in each sub-list. - # tbt = { - # 'types': ['bid', 'asize', 'last', .. ''], - - # 'bid': [tick0, tick1, tick2, .., tickn], - # 'asize': [tick0, tick1, tick2, .., tickn], - # 'last': [tick0, tick1, tick2, .., tickn], - # ... - # '': [tick0, tick1, tick2, .., tickn], - # } - - # append in reverse FIFO order for in-order iteration on - # receiver side. - tick: dict[str, Any] - for tick in ticks: - ticks_by_type.setdefault( - tick['type'], - [], - ).append(tick) - - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - if ticks_in_order: - ticks_in_order.extend(ticks) - - return ticks_by_type From 66d402b80e60460176a531f65061d07ee2d4b5d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 09:33:24 -0400 Subject: [PATCH 5/7] Load ledger records into `pl.DataFrame` for `disect`-tion --- piker/accounting/cli.py | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index c184614ce..290c1a5e5 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -19,8 +19,11 @@ ''' from __future__ import annotations + + from rich.console import Console from rich.markdown import Markdown +import polars as pl import tractor import trio import typer @@ -35,7 +38,7 @@ from ._ledger import ( load_ledger, # open_trade_ledger, - TransactionLedger, + # TransactionLedger, ) from ._pos import ( PpTable, @@ -237,7 +240,7 @@ async def main(): def disect( # "fully_qualified_account_name" fqan: str, - bs_mktid: int, # for ib + bs_mktid: str, # for ib pdb: bool = False, loglevel: str = typer.Option( @@ -251,14 +254,35 @@ def disect( brokername, account = pair - ledger: TransactionLedger + # ledger: TransactionLedger + records: dict[str, dict] table: PpTable records, table = load_pps_from_ledger( brokername, account, - # filter_by_id = {568549458}, filter_by_ids={bs_mktid}, ) + df = pl.DataFrame( + list(records.values()), + # schema=[ + # ('tid', str), + # ('fqme', str), + # ('dt', str), + # ('size', pl.Float64), + # ('price', pl.Float64), + # ('cost', pl.Float64), + # ('expiry', str), + # ('bs_mktid', str), + # ], + ).select([ + pl.col('fqme'), + pl.col('dt').str.to_datetime(), + # pl.col('expiry').dt.datetime(), + pl.col('size'), + pl.col('price'), + ]) + + assert not df.is_empty() breakpoint() # tractor.pause_from_sync() # with open_trade_ledger( @@ -267,8 +291,3 @@ def disect( # ) as ledger: # for tid, rec in ledger.items(): # bs_mktid: str = rec['bs_mktid'] - - - -if __name__ == "__main__": - ledger() # this is called from ``>> ledger `` From c0d575c009a8c3cb90495c9b52b435fda78518eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 12:58:50 -0400 Subject: [PATCH 6/7] Change `Position.clears` -> `._clears[list[dict]]` When you look at usage we don't end up really needing clear entries to be keyed by their `Transaction.tid`, instead it's much more important to ensure the time sorted order of trade-clearing transactions such that position properties such as the size and ppu are calculated correctly. Thus, this instead simplified the `.clears` table to a list of clear dict entries making a bunch of things simpler: - object form `Position._clears` compared to the offline TOML schema (saved in account files) is now data-structure-symmetrical. - `Position.add_clear()` now uses `bisect.insort()` to datetime-field-sort-insert into the *list* which saves having to worry about sorting on every sequence *read*. Further deats: - adjust `.accounting._ledger.iter_by_dt()` to expect an input `list`. - change `Position.iter_clears()` to iterate only the clearing entry dicts without yielding a key/tid; no more tuples. - drop `Position.to_dict()` since parent `Struct` already implements it. --- piker/accounting/_ledger.py | 30 +++---- piker/accounting/_pos.py | 170 +++++++++++++++++++----------------- 2 files changed, 106 insertions(+), 94 deletions(-) diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py index 04ee04b7e..268a81fc8 100644 --- a/piker/accounting/_ledger.py +++ b/piker/accounting/_ledger.py @@ -160,15 +160,16 @@ def iter_trans( # normer = mod.norm_trade_record(txdict) # TODO: use tx_sort here yah? - for tid, txdict in self.data.items(): + for txdict in self.tx_sort(self.data.values()): + # for tid, txdict in self.data.items(): # special field handling for datetimes # to ensure pendulum is used! - fqme = txdict.get('fqme') or txdict['fqsn'] - dt = parse(txdict['dt']) - expiry = txdict.get('expiry') + tid: str = txdict['tid'] + fqme: str = txdict.get('fqme') or txdict['fqsn'] + dt: DateTime = parse(txdict['dt']) + expiry: str | None = txdict.get('expiry') - mkt = mkt_by_fqme.get(fqme) - if not mkt: + if not (mkt := mkt_by_fqme.get(fqme)): # we can't build a trans if we don't have # the ``.sys: MktPair`` info, so skip. continue @@ -229,7 +230,7 @@ def write_config( def iter_by_dt( - records: dict[str, Any], + records: dict[str, dict[str, Any]] | list[dict], # NOTE: parsers are looked up in the insert order # so if you know that the record stats show some field @@ -247,21 +248,20 @@ def iter_by_dt( datetime presumably set at the ``'dt'`` field in each entry. ''' - def dyn_parse_to_dt( - pair: tuple[str, dict], - ) -> DateTime: - _, txdict = pair + def dyn_parse_to_dt(txdict: dict[str, Any]) -> DateTime: k, v, parser = next( (k, txdict[k], parsers[k]) for k in parsers if k in txdict ) - return parser(v) if parser else v - for tid, data in sorted( - records.items(), + if isinstance(records, dict): + records = records.values() + + for entry in sorted( + records, key=key or dyn_parse_to_dt, ): - yield tid, data + yield entry def load_ledger( diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index f50040cbc..1288c6886 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -22,6 +22,7 @@ ''' from __future__ import annotations +from bisect import insort from contextlib import contextmanager as cm from decimal import Decimal from math import copysign @@ -30,7 +31,6 @@ from typing import ( Any, Iterator, - Union, Generator ) @@ -52,7 +52,6 @@ from .. import config from ..clearing._messages import ( BrokerdPosition, - Status, ) from ..data.types import Struct from ..log import get_logger @@ -66,16 +65,17 @@ class Position(Struct): A financial "position" in `piker` terms is a summary of accounting metrics computed from a transaction ledger; generally it describes - some acumulative "size" and "average price" from the summarized + some accumulative "size" and "average price" from the summarized underlying transaction set. In piker we focus on the `.ppu` (price per unit) and the `.bep` (break even price) including all transaction entries and exits since the last "net-zero" size of the destination asset's holding. - This interface serves as an object API for computing and tracking - positions as well as supports serialization for storage in the local - file system (in TOML) and to interchange as a msg over IPC. + This interface serves as an object API for computing and + tracking positions as well as supports serialization for + storage in the local file system (in TOML) and to interchange + as a msg over IPC. ''' mkt: MktPair @@ -100,10 +100,9 @@ class Position(Struct): split_ratio: int | None = None # ordered record of known constituent trade messages - clears: dict[ - Union[str, int, Status], # trade id + _clears: list[ dict[str, Any], # transaction history summaries - ] = {} + ] = [] first_clear_dt: datetime | None = None expiry: datetime | None = None @@ -111,34 +110,30 @@ class Position(Struct): def __repr__(self) -> str: return pformat(self.to_dict()) - def to_dict(self) -> dict: - return { - f: getattr(self, f) - for f in self.__struct_fields__ - } - def to_pretoml(self) -> tuple[str, dict]: ''' - Prep this position's data contents for export to toml including - re-structuring of the ``.clears`` table to an array of - inline-subtables for better ``pps.toml`` compactness. + Prep this position's data contents for export as an entry + in a TOML "account file" (such as + `account.binance.paper.toml`) including re-structuring of + the ``._clears`` entries as an array of inline-subtables + for better ``pps.toml`` compactness. ''' - d = self.to_dict() - clears = d.pop('clears') - expiry = d.pop('expiry') + asdict = self.to_dict() + clears: list[dict] = asdict.pop('_clears') + expiry = asdict.pop('expiry') if self.split_ratio is None: - d.pop('split_ratio') + asdict.pop('split_ratio') # should be obvious from clears/event table - d.pop('first_clear_dt') + asdict.pop('first_clear_dt') # TODO: we need to figure out how to have one top level # listing venue here even when the backend isn't providing # it via the trades ledger.. # drop symbol obj in serialized form - mkt: MktPair = d.pop('mkt') + mkt: MktPair = asdict.pop('mkt') assert isinstance(mkt, MktPair) fqme = mkt.fqme @@ -148,15 +143,15 @@ def to_pretoml(self) -> tuple[str, dict]: # each tradeable asset in the market. if mkt.resolved: dst: Asset = mkt.dst - d['asset_type'] = dst.atype + asdict['asset_type'] = dst.atype - d['price_tick'] = mkt.price_tick - d['size_tick'] = mkt.size_tick + asdict['price_tick'] = mkt.price_tick + asdict['size_tick'] = mkt.size_tick if self.expiry is None: - d.pop('expiry', None) + asdict.pop('expiry', None) elif expiry: - d['expiry'] = str(expiry) + asdict['expiry'] = str(expiry) clears_table: tomlkit.Array = tomlkit.array() clears_table.multiline( @@ -165,30 +160,29 @@ def to_pretoml(self) -> tuple[str, dict]: ) # reverse sort so latest clears are at top of section? - for tid, data in iter_by_dt(clears): + for entry in iter_by_dt(clears): inline_table = tomlkit.inline_table() # serialize datetime to parsable `str` - dtstr = inline_table['dt'] = data['dt'].isoformat('T') + dtstr = inline_table['dt'] = entry['dt'].isoformat('T') assert 'Datetime' not in dtstr # insert optional clear fields in column order for k in ['ppu', 'accum_size']: - val = data.get(k) - if val: + if val := entry.get(k): inline_table[k] = val # insert required fields for k in ['price', 'size', 'cost']: - inline_table[k] = data[k] + inline_table[k] = entry[k] - inline_table['tid'] = tid + inline_table['tid'] = entry['tid'] clears_table.append(inline_table) - d['clears'] = clears_table + asdict['clears'] = clears_table - return fqme, d + return fqme, asdict def ensure_state(self) -> None: ''' @@ -197,18 +191,16 @@ def ensure_state(self) -> None: they differ and log warnings to console. ''' - clears = list(self.clears.values()) - self.first_clear_dt = min( - list(entry['dt'] for entry in clears) - ) - last_clear = clears[-1] + clears: list[dict] = self._clears + self.first_clear_dt = min(clears, key=lambda e: e['dt'])['dt'] + last_clear: dict = clears[-1] + csize: float = self.calc_size() + accum: float = last_clear['accum_size'] - csize = self.calc_size() - accum = last_clear['accum_size'] if not self.expired(): if ( csize != accum - and csize != round(accum * self.split_ratio or 1) + and csize != round(accum * (self.split_ratio or 1)) ): raise ValueError(f'Size mismatch: {csize}') else: @@ -221,11 +213,12 @@ def ensure_state(self) -> None: ) self.size = csize - cppu = self.calc_ppu() - ppu = last_clear['ppu'] + cppu: float = self.calc_ppu() + ppu: float = last_clear['ppu'] if ( cppu != ppu and self.split_ratio is not None + # handle any split info entered (for now) manually by user and cppu != (ppu / self.split_ratio) ): @@ -281,15 +274,15 @@ def dsize(self) -> float: def iter_clears(self) -> Iterator[tuple[str, dict]]: ''' - Iterate the internally managed ``.clears: dict`` table in + Iterate the internally managed ``._clears: dict`` table in datetime-stamped order. ''' # sort on the already existing datetime that should have # been generated for the entry's table return iter_by_dt( - self.clears, - key=lambda entry: entry[1]['dt'] + self._clears, + key=lambda entry: entry['dt'] ) def calc_ppu( @@ -323,9 +316,8 @@ def calc_ppu( asize_h: list[float] = [] # historical accumulative size ppu_h: list[float] = [] # historical price-per-unit - tid: str entry: dict[str, Any] - for (tid, entry) in self.iter_clears(): + for entry in self.iter_clears(): clear_size = entry['size'] clear_price: str | float = entry['price'] is_clear: bool = not isinstance(clear_price, str) @@ -451,7 +443,7 @@ def calc_size(self) -> float: if self.expired(): return 0. - for tid, entry in self.clears.items(): + for entry in self._clears: size += entry['size'] # XXX: do we need it every step? # no right since rounding is an LT? @@ -474,11 +466,11 @@ def minimize_clears( ''' Minimize the position's clears entries by removing all transactions before the last net zero size to avoid - unecessary history irrelevant to the current pp state. + unnecessary history irrelevant to the current pp state. ''' size: float = 0 - clears_since_zero: list[tuple(str, dict)] = [] + clears_since_zero: list[dict] = [] # TODO: we might just want to always do this when iterating # a ledger? keep a state of the last net-zero and only do the @@ -486,34 +478,44 @@ def minimize_clears( # scan for the last "net zero" position by iterating # transactions until the next net-zero size, rinse, repeat. - for tid, clear in self.clears.items(): + for clear in self._clears: size = float( self.mkt.quantize(size + clear['size']) ) - clears_since_zero.append((tid, clear)) + clears_since_zero.append(clear) if size == 0: clears_since_zero.clear() - self.clears = dict(clears_since_zero) - return self.clears + self._clears = clears_since_zero + return self._clears def add_clear( self, t: Transaction, ) -> dict: ''' - Update clearing table and populate rolling ppu and accumulative - size in both the clears entry and local attrs state. + Update clearing table by calculating the rolling ppu and + (accumulative) size in both the clears entry and local + attrs state. + + Inserts are always done in datetime sorted order. ''' - clear = self.clears[t.tid] = { + clear: dict[str, float | str | int] = { + 'tid': t.tid, 'cost': t.cost, 'price': t.price, 'size': t.size, 'dt': t.dt } + insort( + self._clears, + clear, + key=lambda entry: entry['dt'] + ) + # TODO: compute these incrementally instead # of re-looping through each time resulting in O(n**2) # behaviour..? @@ -526,10 +528,14 @@ def add_clear( return clear - # def sugest_split(self) -> float: + # TODO: once we have an `.events` table with diff + # mkt event types..? + # def suggest_split(self) -> float: # ... +# TODO: maybe a better name is just `Account` and we include +# a table of asset balances as `.balances: dict[Asset, float]`? class PpTable(Struct): brokername: str @@ -544,7 +550,12 @@ def update_from_trans( cost_scalar: float = 2, ) -> dict[str, Position]: + ''' + Update the internal `.pps[str, Position]` table from input + transactions recomputing the price-per-unit (ppu) and + accumulative size for each entry. + ''' pps = self.pps updated: dict[str, Position] = {} @@ -553,7 +564,7 @@ def update_from_trans( for t in sorted( trans.values(), key=lambda t: t.dt, - reverse=True, + # reverse=True, ): fqme = t.fqme bs_mktid = t.bs_mktid @@ -561,10 +572,10 @@ def update_from_trans( # template the mkt-info presuming a legacy market ticks # if no info exists in the transactions.. mkt: MktPair = t.sys - pp = pps.get(bs_mktid) - if not pp: - # if no existing pp, allocate fresh one. - pp = pps[bs_mktid] = Position( + pos = pps.get(bs_mktid) + if not pos: + # if no existing pos, allocate fresh one. + pos = pps[bs_mktid] = Position( mkt=mkt, size=0.0, ppu=0.0, @@ -577,12 +588,12 @@ def update_from_trans( # a shorter string), instead use the one from the # transaction since it likely has (more) full # information from the provider. - if len(pp.mkt.fqme) < len(fqme): - pp.mkt = mkt + if len(pos.mkt.fqme) < len(fqme): + pos.mkt = mkt - clears = pp.clears + clears: list[dict] = pos._clears if clears: - first_clear_dt = pp.first_clear_dt + first_clear_dt = pos.first_clear_dt # don't do updates for ledger records we already have # included in the current pps state. @@ -601,15 +612,16 @@ def update_from_trans( continue # update clearing table - pp.add_clear(t) - updated[t.bs_mktid] = pp + pos.add_clear(t) + updated[t.bs_mktid] = pos - # minimize clears tables and update sizing. - for bs_mktid, pp in updated.items(): - pp.ensure_state() + # re-calc ppu and accumulative sizing. + for bs_mktid, pos in updated.items(): + pos.ensure_state() - # deliver only the position entries that were actually updated - # (modified the state) from the input transaction set. + # NOTE: deliver only the position entries that were + # actually updated (modified the state) from the input + # transaction set. return updated def dump_active( From f2fff5a5fa9f0162aac1cba6c2d276d307517676 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 13:21:59 -0400 Subject: [PATCH 7/7] ib._ledger: move trades transaction processing helpers into new module --- piker/brokers/ib/__init__.py | 2 + piker/brokers/ib/broker.py | 292 +++++------------------------------ piker/brokers/ib/ledger.py | 250 ++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 255 deletions(-) create mode 100644 piker/brokers/ib/ledger.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 07ed8af58..d42002a16 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -35,6 +35,8 @@ ) from .broker import ( open_trade_dialog, +) +from .ledger import ( norm_trade_records, ) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 21d4baa5c..9be0e13e6 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -19,10 +19,8 @@ """ from __future__ import annotations -from bisect import insort from contextlib import ExitStack from dataclasses import asdict -from decimal import Decimal from functools import partial from pprint import pformat import time @@ -55,8 +53,8 @@ from piker import config from piker.accounting import ( - dec_digits, - digits_to_dec, + # dec_digits, + # digits_to_dec, Position, Transaction, open_trade_ledger, @@ -76,9 +74,6 @@ BrokerdFill, BrokerdError, ) -from piker.accounting import ( - MktPair, -) from ._util import log from .api import ( _accounts2clients, @@ -89,6 +84,10 @@ MethodProxy, ) from ._flex_reports import parse_flex_dt +from .ledger import ( + norm_trade_records, + api_trades_to_ledger_entries, +) @@ -546,17 +545,6 @@ async def open_trade_dialog( acctids = set() cids2pps: dict[str, BrokerdPosition] = {} - # TODO: this causes a massive tractor bug when you run marketstored - # with ``--tsdb``... you should get: - # - first error the assertion - # - chart should get that error and die - # - pikerd goes to debugger again from trio nursery multi-error - # - hitting final control-c to kill daemon will lead to hang - # assert 0 - - # TODO: just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? async with ( open_client_proxies() as ( proxies, @@ -630,15 +618,19 @@ async def open_trade_dialog( ledger: dict = ledgers[acctid] table: PpTable = tables[acctid] + # update position table with latest ledger from all + # gathered transactions: ledger file + api records. + trans: dict[str, Transaction] = norm_trade_records(ledger) + # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. api_trades = await proxy.trades() if api_trades: - trans_by_acct: dict[str, Transaction] + api_trans_by_acct: dict[str, Transaction] api_to_ledger_entries: dict[str, dict] ( - trans_by_acct, + api_trans_by_acct, api_to_ledger_entries, ) = await update_ledger_from_api_trades( api_trades, @@ -648,29 +640,26 @@ async def open_trade_dialog( # if new api_trades are detected from the API, prepare # them for the ledger file and update the pptable. - if api_to_ledger_entries: - trade_entries = api_to_ledger_entries.get(acctid) + if ( + api_to_ledger_entries + and (trade_entries := api_to_ledger_entries.get(acctid)) + ): # TODO: fix this `tractor` BUG! # https://github.com/goodboy/tractor/issues/354 # await tractor.pp() - if trade_entries: - # write ledger with all new api_trades - # **AFTER** we've updated the `pps.toml` - # from the original ledger state! (i.e. this - # is currently done on exit) - - for tid, entry in trade_entries.items(): - ledger.setdefault(tid, {}).update(entry) + # write ledger with all new api_trades + # **AFTER** we've updated the `pps.toml` + # from the original ledger state! (i.e. this + # is currently done on exit) + for tid, entry in trade_entries.items(): + ledger.setdefault(tid, {}).update(entry) - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) + if api_trans := api_trans_by_acct.get(acctid): + trans.update(api_trans) - # update position table with latest ledger from all - # gathered transactions: ledger file + api records. - trans: dict[str, Transaction] = norm_trade_records(ledger) + # update account (and thus pps) from all gathered transactions table.update_from_trans(trans) # process pp value reported from ib's system. we only @@ -765,8 +754,11 @@ async def open_trade_dialog( tables, ) + # write account and ledger files immediately! # TODO: make this thread-async! - table.write_config() + for acctid, table in tables.items(): + table.write_config() + ledgers[acctid].write_config() # block until cancelled await trio.sleep_forever() @@ -784,10 +776,12 @@ async def emit_pp_update( ) -> None: - # compute and relay incrementally updated piker pp accounts_def_inv: bidict[str, str] = accounts_def.inverse - fq_acctid = accounts_def_inv[trade_entry['execution']['acctNumber']] - proxy = proxies[fq_acctid] + accnum: str = trade_entry['execution']['acctNumber'] + fq_acctid: str = accounts_def_inv[accnum] + proxy: MethodProxy = proxies[fq_acctid] + + # compute and relay incrementally updated piker pp ( records_by_acct, api_to_ledger_entries, @@ -796,8 +790,8 @@ async def emit_pp_update( proxy, accounts_def_inv, ) - trans = records_by_acct[fq_acctid] - r = list(trans.values())[0] + trans: dict[str, Transaction] = records_by_acct[fq_acctid] + tx: Transaction = list(trans.values())[0] acctid = fq_acctid.strip('ib.') table = tables[acctid] @@ -818,7 +812,7 @@ async def emit_pp_update( # re-formatted pps as msgs to the ems. for pos in filter( bool, - [active.get(r.bs_mktid), closed.get(r.bs_mktid)] + [active.get(tx.bs_mktid), closed.get(tx.bs_mktid)] ): msgs = await update_and_audit_msgs( acctid, @@ -1150,215 +1144,3 @@ async def deliver_trade_events( case _: log.error(f'WTF: {event_name}: {item}') - - -def norm_trade_records( - ledger: dict[str, Any], - -) -> dict[str, Transaction]: - ''' - Normalize a flex report or API retrieved executions - ledger into our standard record format. - - ''' - records: list[Transaction] = [] - - for tid, record in ledger.items(): - conid = record.get('conId') or record['conid'] - comms = record.get('commission') - if comms is None: - comms = -1*record['ibCommission'] - - price = record.get('price') or record['tradePrice'] - - # the api doesn't do the -/+ on the quantity for you but flex - # records do.. are you fucking serious ib...!? - size = record.get('quantity') or record['shares'] * { - 'BOT': 1, - 'SLD': -1, - }[record['side']] - - exch = record['exchange'] - lexch = record.get('listingExchange') - - # NOTE: remove null values since `tomlkit` can't serialize - # them to file. - dnc = record.pop('deltaNeutralContract', False) - if dnc is not None: - record['deltaNeutralContract'] = dnc - - suffix = lexch or exch - symbol = record['symbol'] - - # likely an opts contract record from a flex report.. - # TODO: no idea how to parse ^ the strike part from flex.. - # (00010000 any, or 00007500 tsla, ..) - # we probably must do the contract lookup for this? - if ' ' in symbol or '--' in exch: - underlying, _, tail = symbol.partition(' ') - suffix = exch = 'opt' - expiry = tail[:6] - # otype = tail[6] - # strike = tail[7:] - - print(f'skipping opts contract {symbol}') - continue - - # timestamping is way different in API records - dtstr = record.get('datetime') - date = record.get('date') - flex_dtstr = record.get('dateTime') - - if dtstr or date: - dt = pendulum.parse(dtstr or date) - - elif flex_dtstr: - # probably a flex record with a wonky non-std timestamp.. - dt = parse_flex_dt(record['dateTime']) - - # special handling of symbol extraction from - # flex records using some ad-hoc schema parsing. - asset_type: str = record.get( - 'assetCategory' - ) or record.get('secType', 'STK') - - # TODO: XXX: WOA this is kinda hacky.. probably - # should figure out the correct future pair key more - # explicitly and consistently? - if asset_type == 'FUT': - # (flex) ledger entries don't have any simple 3-char key? - symbol = record['symbol'][:3] - asset_type: str = 'future' - - elif asset_type == 'STK': - asset_type: str = 'stock' - - # try to build out piker fqme from record. - expiry = ( - record.get('lastTradeDateOrContractMonth') - or record.get('expiry') - ) - - if expiry: - expiry = str(expiry).strip(' ') - suffix = f'{exch}.{expiry}' - expiry = pendulum.parse(expiry) - - # src: str = record['currency'] - price_tick: Decimal = digits_to_dec(dec_digits(price)) - - pair = MktPair.from_fqme( - fqme=f'{symbol}.{suffix}.ib', - bs_mktid=str(conid), - _atype=str(asset_type), # XXX: can't serlialize `tomlkit.String` - - price_tick=price_tick, - # NOTE: for "legacy" assets, volume is normally discreet, not - # a float, but we keep a digit in case the suitz decide - # to get crazy and change it; we'll be kinda ready - # schema-wise.. - size_tick='1', - ) - - fqme = pair.fqme - - # NOTE: for flex records the normal fields for defining an fqme - # sometimes won't be available so we rely on two approaches for - # the "reverse lookup" of piker style fqme keys: - # - when dealing with API trade records received from - # `IB.trades()` we do a contract lookup at he time of processing - # - when dealing with flex records, it is assumed the record - # is at least a day old and thus the TWS position reporting system - # should already have entries if the pps are still open, in - # which case, we can pull the fqme from that table (see - # `trades_dialogue()` above). - insort( - records, - Transaction( - fqme=fqme, - sym=pair, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bs_mktid=str(conid), - ), - key=lambda t: t.dt - ) - - return {r.tid: r for r in records} - - -def api_trades_to_ledger_entries( - accounts: bidict[str, str], - - # TODO: maybe we should just be passing through the - # ``ib_insync.order.Trade`` instance directly here - # instead of pre-casting to dicts? - trade_entries: list[dict], - -) -> dict: - ''' - Convert API execution objects entry objects into ``dict`` form, - pretty much straight up without modification except add - a `pydatetime` field from the parsed timestamp. - - ''' - trades_by_account = {} - for t in trade_entries: - # NOTE: example of schema we pull from the API client. - # { - # 'commissionReport': CommissionReport(... - # 'contract': {... - # 'execution': Execution(... - # 'time': 1654801166.0 - # } - - # flatten all sub-dicts and values into one top level entry. - entry = {} - for section, val in t.items(): - match section: - case 'contract' | 'execution' | 'commissionReport': - # sub-dict cases - entry.update(val) - - case 'time': - # ib has wack ns timestamps, or is that us? - continue - - case _: - entry[section] = val - - tid = str(entry['execId']) - dt = pendulum.from_timestamp(entry['time']) - # TODO: why isn't this showing seconds in the str? - entry['pydatetime'] = dt - entry['datetime'] = str(dt) - acctid = accounts[entry['acctNumber']] - - if not tid: - # this is likely some kind of internal adjustment - # transaction, likely one of the following: - # - an expiry event that will show a "book trade" indicating - # some adjustment to cash balances: zeroing or itm settle. - # - a manual cash balance position adjustment likely done by - # the user from the accounts window in TWS where they can - # manually set the avg price and size: - # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST - log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') - continue - - trades_by_account.setdefault( - acctid, {} - )[tid] = entry - - # sort entries in output by python based datetime - for acctid in trades_by_account: - trades_by_account[acctid] = dict(sorted( - trades_by_account[acctid].items(), - key=lambda entry: entry[1].pop('pydatetime'), - )) - - return trades_by_account diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py new file mode 100644 index 000000000..2d1c1003b --- /dev/null +++ b/piker/brokers/ib/ledger.py @@ -0,0 +1,250 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Trade transaction accounting and normalization. + +''' +from bisect import insort +from decimal import Decimal +from pprint import pformat +from typing import ( + Any, +) + +from bidict import bidict +import pendulum + +from piker.accounting import ( + dec_digits, + digits_to_dec, + Transaction, + MktPair, +) +from ._flex_reports import parse_flex_dt +from ._util import log + + +def norm_trade_records( + ledger: dict[str, Any], + +) -> dict[str, Transaction]: + ''' + Normalize a flex report or API retrieved executions + ledger into our standard record format. + + ''' + records: list[Transaction] = [] + + for tid, record in ledger.items(): + conid = record.get('conId') or record['conid'] + comms = record.get('commission') + if comms is None: + comms = -1*record['ibCommission'] + + price = record.get('price') or record['tradePrice'] + + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + exch = record['exchange'] + lexch = record.get('listingExchange') + + # NOTE: remove null values since `tomlkit` can't serialize + # them to file. + dnc = record.pop('deltaNeutralContract', False) + if dnc is not None: + record['deltaNeutralContract'] = dnc + + suffix = lexch or exch + symbol = record['symbol'] + + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ' ' in symbol or '--' in exch: + underlying, _, tail = symbol.partition(' ') + suffix = exch = 'opt' + expiry = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') + continue + + # timestamping is way different in API records + dtstr = record.get('datetime') + date = record.get('date') + flex_dtstr = record.get('dateTime') + + if dtstr or date: + dt = pendulum.parse(dtstr or date) + + elif flex_dtstr: + # probably a flex record with a wonky non-std timestamp.. + dt = parse_flex_dt(record['dateTime']) + + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + asset_type: str = record.get( + 'assetCategory' + ) or record.get('secType', 'STK') + + # TODO: XXX: WOA this is kinda hacky.. probably + # should figure out the correct future pair key more + # explicitly and consistently? + if asset_type == 'FUT': + # (flex) ledger entries don't have any simple 3-char key? + symbol = record['symbol'][:3] + asset_type: str = 'future' + + elif asset_type == 'STK': + asset_type: str = 'stock' + + # try to build out piker fqme from record. + expiry = ( + record.get('lastTradeDateOrContractMonth') + or record.get('expiry') + ) + + if expiry: + expiry = str(expiry).strip(' ') + suffix = f'{exch}.{expiry}' + expiry = pendulum.parse(expiry) + + # src: str = record['currency'] + price_tick: Decimal = digits_to_dec(dec_digits(price)) + + pair = MktPair.from_fqme( + fqme=f'{symbol}.{suffix}.ib', + bs_mktid=str(conid), + _atype=str(asset_type), # XXX: can't serlialize `tomlkit.String` + + price_tick=price_tick, + # NOTE: for "legacy" assets, volume is normally discreet, not + # a float, but we keep a digit in case the suitz decide + # to get crazy and change it; we'll be kinda ready + # schema-wise.. + size_tick='1', + ) + + fqme = pair.fqme + + # NOTE: for flex records the normal fields for defining an fqme + # sometimes won't be available so we rely on two approaches for + # the "reverse lookup" of piker style fqme keys: + # - when dealing with API trade records received from + # `IB.trades()` we do a contract lookup at he time of processing + # - when dealing with flex records, it is assumed the record + # is at least a day old and thus the TWS position reporting system + # should already have entries if the pps are still open, in + # which case, we can pull the fqme from that table (see + # `trades_dialogue()` above). + insort( + records, + Transaction( + fqme=fqme, + sym=pair, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bs_mktid=str(conid), + ), + key=lambda t: t.dt + ) + + return {r.tid: r for r in records} + + +def api_trades_to_ledger_entries( + accounts: bidict[str, str], + + # TODO: maybe we should just be passing through the + # ``ib_insync.order.Trade`` instance directly here + # instead of pre-casting to dicts? + trade_entries: list[dict], + +) -> dict: + ''' + Convert API execution objects entry objects into ``dict`` form, + pretty much straight up without modification except add + a `pydatetime` field from the parsed timestamp. + + ''' + trades_by_account = {} + for t in trade_entries: + # NOTE: example of schema we pull from the API client. + # { + # 'commissionReport': CommissionReport(... + # 'contract': {... + # 'execution': Execution(... + # 'time': 1654801166.0 + # } + + # flatten all sub-dicts and values into one top level entry. + entry = {} + for section, val in t.items(): + match section: + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) + + case 'time': + # ib has wack ns timestamps, or is that us? + continue + + case _: + entry[section] = val + + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? + entry['pydatetime'] = dt + entry['datetime'] = str(dt) + acctid = accounts[entry['acctNumber']] + + if not tid: + # this is likely some kind of internal adjustment + # transaction, likely one of the following: + # - an expiry event that will show a "book trade" indicating + # some adjustment to cash balances: zeroing or itm settle. + # - a manual cash balance position adjustment likely done by + # the user from the accounts window in TWS where they can + # manually set the avg price and size: + # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST + log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') + continue + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + # sort entries in output by python based datetime + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + + return trades_by_account