Skip to content

Commit

Permalink
ib: don't bother with recursive not-enough-bars queries for now, caus…
Browse files Browse the repository at this point in the history
…es more problems then it solves..
  • Loading branch information
goodboy committed Dec 15, 2023
1 parent 97e2403 commit ba154ef
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 37 deletions.
14 changes: 13 additions & 1 deletion piker/brokers/ib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,15 @@ async def bars(
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
if end_dt:
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
Expand All @@ -410,6 +418,7 @@ async def bars(
if (
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
# and False
):
end_dt: DateTime = from_timestamp(first)
log.warning(
Expand Down Expand Up @@ -859,6 +868,9 @@ async def get_quote(
timeout=timeout,
)
except TimeoutError:
import pdbp
pdbp.set_trace()

if raise_on_timeout:
raise
return None
Expand Down
108 changes: 72 additions & 36 deletions piker/brokers/ib/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,15 @@ async def get_hist(
start_dt: datetime | None = None,

) -> tuple[np.ndarray, str]:

nonlocal max_timeout, mean, count

if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()

query_start = time.time()
out, timedout = await get_bars(
proxy,
Expand Down Expand Up @@ -403,34 +410,54 @@ async def query():

bars, bars_array, dt_duration = out

if bars_array is None:
raise SymbolNotFound(fqme)

# not enough bars signal, likely due to venue
# operational gaps.
too_little: bool = False
if (
end_dt
and (
not bars
or (too_little :=
start_dt
and (len(bars) * timeframe)
< dt_duration.in_seconds()
)
)
):
if (
end_dt
or too_little
):
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
f'History is blank for {dt_duration} from {end_dt}'
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
end_dt -= dt_duration
continue
raise NoData(f'{end_dt}')

raise NoData(f'{end_dt}')

if bars_array is None:
raise SymbolNotFound(fqme)
else:
dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()

first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
Expand Down Expand Up @@ -854,26 +881,20 @@ async def stream_quotes(
init_msgs.append(init_msg)

con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con)
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)

if first_ticker:
first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)

# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:

with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)

# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
Expand All @@ -884,6 +905,8 @@ async def stream_quotes(
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
Expand All @@ -907,6 +930,19 @@ async def stream_quotes(
await trio.sleep_forever()
return # we never expect feed to come up?

# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:

# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
cs: trio.CancelScope | None = None
startup: bool = True
while (
Expand Down

0 comments on commit ba154ef

Please sign in to comment.