Skip to content

Commit

Permalink
Implement plugins infrastructure
Browse files Browse the repository at this point in the history
Plugins are python classes supposed to serve as workers to perform any additional
operations which are not performed from strategies. Example use-cases:

* Download trading history
* Analyze trading history
* Check for updates
* Report dexbot statistic

Current implementation uses separate thread and asyncio event loop
inside. This is a temporary solution before refactoring
WorkerInfrastructure to asyncio.

Closes: #500
  • Loading branch information
bitphage committed Feb 28, 2019
1 parent 85fce2f commit 43092dc
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 0 deletions.
10 changes: 10 additions & 0 deletions dexbot/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import errno
import logging
import pkgutil
from appdirs import user_data_dir

from dexbot import APP_NAME, AUTHOR
Expand Down Expand Up @@ -77,6 +78,15 @@ def initialize_orders_log():
logger.info("worker_name;ID;operation_type;base_asset;base_amount;quote_asset;quote_amount;timestamp")


def iter_namespace(ns_pkg):
# https://packaging.python.org/guides/creating-and-discovering-plugins/
# Specifying the second argument (prefix) to iter_modules makes the
# returned name an absolute name instead of a relative one. This allows
# import_module to work without having to do additional modification to
# the name.
return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")


try:
# Unfortunately setuptools is only "kinda-sorta" a standard module
# it's available on pretty much any modern Python system, but some embedded Pythons may not have it
Expand Down
81 changes: 81 additions & 0 deletions dexbot/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
import threading
import importlib
import logging

import dexbot.plugins
from dexbot.helper import iter_namespace

from bitshares import BitShares

log = logging.getLogger(__name__)

class PluginInfrastructure(threading.Thread):
""" Run plugins as asyncio tasks
:param dict config: dexbot config
PluginInfrastructure class is needed to be able to run asyncio plugins while having synchronous core. After
switching to asyncio-aware main thread we may continue to use all plugins without refactoring them.
"""

def __init__(self, config):
super().__init__()

self.bitshares = BitShares(node=config['node'], num_retries=-1)
self.config = config
self.loop = None
self.need_stop = False
self.plugins = []

def run(self):
log.debug('Starting PluginInfrastructure thread')
self.init_plugins()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.create_task(self.run_plugins())
self.loop.create_task(self.stop_handler())
self.loop.run_forever()

def init_plugins(self):
""" Initialize plugin instances
"""
plugins = {name: importlib.import_module(name) for finder, name, ispkg in iter_namespace(dexbot.plugins)}

for name, plugin in plugins.items():
self.plugins.append(plugin.Plugin(config=self.config, bitshares_instance=self.bitshares))

async def run_plugins(self):
""" Run each discovered plugin by calling Plugin.main()
"""
# Schedule every plugin as asyncio Task; use ensure_future() for python3.6 compatibility
tasks = [asyncio.ensure_future(plugin.main()) for plugin in self.plugins]
try:
# Wait until all plugins are finished, but catch exceptions immediately as they occure
await asyncio.gather(*tasks, return_exceptions=False)
except asyncio.CancelledError:
# Note: task.cancel() will not propagate this exception here, so it will appear only on current task cancel
log.debug('Stopping run_plugins()')
except Exception:
log.exception('Task finished with exception:')

async def stop_handler(self):
""" Watch for self.need_stop flag to cancel tasks and stop the thread
With this solution it's easier to achieve correct tasks stopping. self.loop.call_soon_threadsafe() requires
additional wrapping to stop tasks or catch exceptions.
"""
while True:
if self.need_stop:
log.debug('Stopping event loop')
tasks = [task for task in asyncio.Task.all_tasks() if task is not asyncio.tasks.Task.current_task()]
# Cancel all tasks
list(map(lambda task: task.cancel(), tasks))
# Wait for tasks finish
results = await asyncio.gather(*tasks, return_exceptions=True)
log.debug('Finished awaiting cancelled tasks, results: {0}'.format(results))
# Stop the event loop
self.loop.stop()
return
else:
await asyncio.sleep(1)
Empty file added dexbot/plugins/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions dexbot/plugins/dummy.py.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import logging

log = logging.getLogger(__name__)


class Plugin:
""" Example plugin class

Plugin must have main() method to run. main() is expected to be an asyncio coroutine
"""

def __init__(self, config=None, bitshares_instance=None):
pass

async def do_stuff(self):
log.info('Doing some stuff')
await asyncio.sleep(10)
log.info('Stuff done')

async def boom(self):
raise Exception('Boom!')

async def main(self):
try:
while True:
await self.do_stuff()
await asyncio.sleep(5)
await self.boom()
except asyncio.CancelledError:
log.info('Stopping correctly')
6 changes: 6 additions & 0 deletions dexbot/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import copy

import dexbot.errors as errors
from dexbot.plugin import PluginInfrastructure
from dexbot.strategies.base import StrategyBase

from bitshares import BitShares
Expand Down Expand Up @@ -173,6 +174,8 @@ def add_worker(self, worker_name, config):
self.update_notify()

def run(self):
self.plugins_thread = PluginInfrastructure(self.config)
self.plugins_thread.start()
self.init_workers(self.config)
self.update_notify()
self.notify.listen()
Expand Down Expand Up @@ -206,6 +209,9 @@ def stop(self, worker_name=None, pause=False):
self.workers[worker].pause()
self.workers = []

# Notify plugins to stop
self.plugins_thread.need_stop = True

# Update other workers
if len(self.workers) > 0:
self.update_notify()
Expand Down

0 comments on commit 43092dc

Please sign in to comment.