Skip to content

Commit

Permalink
Fruther generalize json_rpc hook mechanic to allow for multi hook, Ad…
Browse files Browse the repository at this point in the history
…d new maybe_open_ticker_feed to stream greeks, iv, open interest of an instrument
  • Loading branch information
guilledk committed Mar 10, 2023
1 parent fef8073 commit 77fbc7e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 83 deletions.
146 changes: 73 additions & 73 deletions piker/brokers/deribit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class Client:
def __init__(
self,
json_rpc: Callable,
update_hooks: Callable,
append_hooks: Callable,
update_types: Callable,
) -> None:

Expand All @@ -169,7 +169,7 @@ def __init__(
self._key_secret = None

self.json_rpc = json_rpc
self.update_hooks = update_hooks
self.append_hooks = append_hooks
self.update_types = update_types

@property
Expand Down Expand Up @@ -490,6 +490,7 @@ async def sub_hook(msg):
}]
}
))
return True

elif chan == book_chan:
bid, bsize = data['bids'][0]
Expand All @@ -504,11 +505,14 @@ async def sub_hook(msg):
{'type': 'asize', 'price': ask, 'size': asize}
]}
))
return True

return False

async with open_cached_client('deribit') as client:

client.update_hooks({
'request': sub_hook
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
Expand All @@ -517,12 +521,15 @@ async def sub_hook(msg):
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})

assert resp.result == channels
assert not resp.error

log.info(f'Subscribed to {channels}')

yield recv_chann

resp = await client.json_rpc('private/unsubscribe', {'channels': channels})

assert not resp.error

@acm
async def maybe_open_price_feed(
Expand All @@ -543,71 +550,64 @@ async def maybe_open_price_feed(
yield feed


# TODO: order broker support: this is all draft code from @guilledk B)

# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,

# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()

# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()

# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })

# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)

# # sync with trio
# to_trio.send_nowait(None)

# await asyncio.sleep(float('inf'))


# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan


# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:

# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument,
# 'fh': fh
# },
# key=f'{instrument}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed
@acm
async def open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:

instrument_db = sym_fmt_piker_to_deribit(instrument)

ticker_chan = f'incremental_ticker.{instrument_db}'

channels = [ticker_chan]

send_chann, recv_chann = trio.open_memory_channel(0)
async def sub_hook(msg):
chann = msg.params['channel']
if chann == ticker_chan:
data = msg.params['data']
await send_chann.send((
'ticker', {
'symbol': instrument,
'data': data
}
))
return True

return False

async with open_cached_client('deribit') as client:

client.append_hooks({
'request': [sub_hook]
})

resp = await client.json_rpc(
'private/subscribe', {'channels': channels})

assert not resp.error

log.info(f'Subscribed to {channels}')

yield recv_chann

resp = await client.json_rpc('private/unsubscribe', {'channels': channels})

assert not resp.error

@acm
async def maybe_open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:

async with maybe_open_context(
acm_func=open_ticker_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-ticker',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
35 changes: 25 additions & 10 deletions piker/data/_web_bs.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,28 @@ async def open_jsonrpc_session(
raise ValueError(
'Need to path both a request_type and request_hook')

req_hooks = []
if request_hook:
req_hooks.append(request_hook)

err_hooks = []
if error_hook:
err_hooks.append(error_hook)

hook_table = {
'request': request_hook,
'error': error_hook
'request': req_hooks,
'error': err_hooks
}

types_table = {
'response': response_type,
'request': request_type
}

def update_hooks(new_hooks: dict):
def append_hooks(new_hooks: dict):
nonlocal hook_table
hook_table.update(new_hooks)
for htype, hooks in new_hooks.items():
hook_table[htype] += hooks

def update_types(new_types: dict):
nonlocal types_table
Expand All @@ -234,7 +243,7 @@ def update_types(new_types: dict):
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}

async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(method: str, params: dict = {}) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
Expand Down Expand Up @@ -303,19 +312,25 @@ async def recv_task():
'params': _,
}:
log.info(f'Recieved\n{msg}')
if hook_table['request']:
await hook_table['request'](types_table['request'](**msg))
if len(hook_table['request']) > 0:
for hook in hook_table['request']:
result = await hook(types_table['request'](**msg))
if result:
break

case {
'error': error,
}:
log.warning(f'Recieved\n{error}')
if hook_table['error']:
await hook_table['error'](types_table['response'](**msg))
if len(hook_table['error']) > 0:
for hook in hook_table['error']:
result = await hook(types_table['response'](**msg))
if result:
break

case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

n.start_soon(recv_task)
yield json_rpc, update_hooks, update_types
yield json_rpc, append_hooks, update_types
n.cancel_scope.cancel()

0 comments on commit 77fbc7e

Please sign in to comment.