From 8a428c129911d2ff42da90572e4a02937da3f543 Mon Sep 17 00:00:00 2001 From: gj Date: Sat, 8 Sep 2018 23:36:04 +0800 Subject: [PATCH] fix http server exception handle bug --- botflow/bot.py | 4 +- botflow/queue.py | 254 ++++++++++++++++-- botflow/route.py | 22 +- docs/change/0.1.9.rst | 54 ++-- ...server_rest.py => aiohttpserver_search.py} | 6 +- requirements.txt | 3 +- tests/test_queue.py | 40 +++ 7 files changed, 320 insertions(+), 63 deletions(-) rename examples/{aiohttpserver_rest.py => aiohttpserver_search.py} (90%) create mode 100644 tests/test_queue.py diff --git a/botflow/bot.py b/botflow/bot.py index c2bf249..e32c46f 100644 --- a/botflow/bot.py +++ b/botflow/bot.py @@ -84,7 +84,7 @@ async def call_wrap_r(self,func, bdata): else: result = r_or_c except Exception as e: - + logger.error(e) if config.exception_policy == config.Exception_raise: raise e elif config.exception_policy == config.Exception_ignore: @@ -153,6 +153,8 @@ def create_coro(self,bdata): coro = self.append_q(self.merge_list,self.func, bdata, self.output_q) return coro + elif isinstance(bdata.data,Exception): + return self.output_q.put(bdata) else: diff --git a/botflow/queue.py b/botflow/queue.py index 7f6115a..b3b5e45 100644 --- a/botflow/queue.py +++ b/botflow/queue.py @@ -4,6 +4,7 @@ from .config import config import logging import datetime +import collections logger = logging.getLogger(__name__) class QueueManager(object,metaclass=Singleton): @@ -51,28 +52,7 @@ def __init__(self,maxsize=None,loop=None): self.put_callback=None - async def readable(self): - while self.empty(): - getter = self._loop.create_future() - self._getters.append(getter) - try: - await getter - except: - getter.cancel() # Just in case getter is not done yet. - - try: - self._getters.remove(getter) - except ValueError: - pass - - if not self.empty() and not getter.cancelled(): - # We were woken up by put_nowait(), but can't take - # the call. Wake up the next in line. - self._wakeup_next(self._getters) - raise - pass - #TODO async def writable(self): pass @@ -120,8 +100,8 @@ async def put(self, item): if self.qsize()>self.high_water: self.high_water=self.qsize() - if self.put_callback is not None: - asyncio.ensure_future(self.put_callback(item)) + # if self.put_callback is not None: + # asyncio.ensure_future(self.put_callback(item)) return r async def get(self): @@ -130,6 +110,226 @@ async def get(self): #r.destroy() return r + async def get_by(self,ori): + while True: + await self.readable() + item=self._queue[-1] + if item.ori == ori: + return self._queue.popleft() + + + async def readable(self): + while self.empty(): + getter = self._loop.create_future() + self._getters.append(getter) + try: + await getter + except: + getter.cancel() # Just in case getter is not done yet. + + try: + self._getters.remove(getter) + except ValueError: + pass + + if not self.empty() and not getter.cancelled(): + # We were woken up by put_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._getters) + raise + + return + + +class ConditionalQueue: + + + def __init__(self, maxsize=0, *, loop=None): + + self._loop = asyncio.get_event_loop() + + self._maxsize = maxsize + + # Futures. + self._getters = {} #collections.deque() + # Futures. + self._putters = collections.deque() +# self._unfinished_tasks = 0 +# self._finished = asyncio.Lock.Event(loop=self._loop) +# self._finished.set() + self._init(maxsize) + self.qm=QueueManager() + self.debug = True + self.high_water = 0 + self.qm.add(self) + self.put_count=0 + + # These three are overridable in subclasses. + + def _init(self, maxsize): + self._queue = {} #collections.deque() + def _init_dict(self,ori): + if ori not in self._getters: + self._getters[ori]=collections.deque() + + if ori not in self._queue: + self._queue[ori]=collections.deque() + + def clean(self,ori): + del self._getters[ori] + del self._queue[ori] + + + def _get(self,ori): + return self._queue[ori].popleft() + + + + def _put(self, item): + + self._queue[item.ori].append(item) + + # End of the overridable methods. + + def _wakeup_next(self, waiters): + # Wake up the next waiter (if any) that isn't cancelled. + while waiters: + waiter = waiters.popleft() + if not waiter.done(): + waiter.set_result(None) + break + + def __repr__(self): + return '<{} at {:#x} {}>'.format( + type(self).__name__, id(self), self._format()) + + def __str__(self): + return '<{} {}>'.format(type(self).__name__, self._format()) + + def _format(self): + result = 'maxsize={!r}'.format(self._maxsize) + if getattr(self, '_queue', None): + result += ' _queue={!r}'.format(list(self._queue)) + if self._getters: + result += ' _getters[{}]'.format(len(self._getters)) + if self._putters: + result += ' _putters[{}]'.format(len(self._putters)) + if self._unfinished_tasks: + result += ' tasks={}'.format(self._unfinished_tasks) + return result + + def qsize(self): + """Number of items in the queue.""" + size=0 + for k,v in self._queue.items(): + size+=len(v) + return size + + @property + def maxsize(self): + """Number of items allowed in the queue.""" + return self._maxsize + + def empty(self,ori): + """Return True if the queue is empty, False otherwise.""" + return not self._queue[ori] + + def full(self): + """Return True if there are maxsize items in the queue. + + Note: if the Queue was initialized with maxsize=0 (the default), + then full() is never True. + """ + if self._maxsize <= 0: + return False + else: + return self.qsize() >= self._maxsize + + + async def put(self, item): + """Put an item into the queue. + + Put an item into the queue. If the queue is full, wait until a free + slot is available before adding item. + + This method is a coroutine. + """ + self._init_dict(item.ori) + while self.full(): + putter = self._loop.create_future() + self._putters.append(putter) + try: + await putter + except: + putter.cancel() # Just in case putter is not done yet. + if not self.full() and not putter.cancelled(): + # We were woken up by get_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._putters[item.ori]) + raise + return self.put_nowait(item) + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise QueueFull. + """ + if self.full(): + raise asyncio.QueueFull + self._put(item) + # self._unfinished_tasks += 1 + # self._finished.clear() + self._wakeup_next(self._getters[item.ori]) + + async def readable(self,ori): + + while self.empty(ori): + getter = self._loop.create_future() + self._getters[ori].append(getter) + try: + await getter + except: + getter.cancel() # Just in case getter is not done yet. + + try: + self._getters[ori].remove(getter) + except ValueError: + pass + + if not self.empty(ori) and not getter.cancelled(): + # We were woken up by put_nowait(), but can't take + # the call. Wake up the next in line. + self._wakeup_next(self._getters[ori]) + raise + + + async def get_by(self,ori): + """Remove and return an item from the queue. + + If queue is empty, wait until an item is available. + + This method is a coroutine. + """ + self._init_dict(ori) + await self.readable(ori) + return self.get_nowait(ori) + + def get_nowait(self,ori): + """Remove and return an item from the queue. + + Return an item if one is immediately available, else raise QueueEmpty. + """ + if self.empty(ori): + raise asyncio.QueueEmpty + item = self._get(ori) + self._wakeup_next(self._putters) + return item + + + + + + class NullQueue(asyncio.Queue): # | @@ -202,9 +402,9 @@ class ProxyQueue(asyncio.Queue): # | # X - def __init__(self, maxsize=0, loop=None): + def __init__(self, q,maxsize=0, loop=None): super().__init__(maxsize=maxsize,loop=loop) - self._q=DataQueue(maxsize=maxsize,loop=loop) + self._q=q def set_q(self,q): @@ -225,5 +425,9 @@ async def put(self, item): async def get(self): return await self._q.get() + async def get_by(self,ori): + return await self._q.get_by(ori) + def clean(self,ori): + return self._q.clean(ori) def get_nowait(self): return self._q.get_nowait() \ No newline at end of file diff --git a/botflow/route.py b/botflow/route.py index 5c67cbc..c93869a 100644 --- a/botflow/route.py +++ b/botflow/route.py @@ -3,7 +3,7 @@ from .botframe import BotFrame from .config import config -from .queue import DataQueue,NullQueue,CachedQueue,ProxyQueue +from .queue import DataQueue,NullQueue,CachedQueue,ProxyQueue,ConditionalQueue from botflow.bdata import Bdata,Databoard from .botbase import BotManager from .routebase import Route @@ -29,7 +29,7 @@ def __init__(self, *args): for idx,func in enumerate(args): q_i = q_o if idx == len(args)-1: - q_o = ProxyQueue() + q_o = ProxyQueue(ConditionalQueue()) else: if config.replay_mode: @@ -225,9 +225,14 @@ async def read(self): async def __call__(self, data): - - await self.start_q.put(Bdata.make_Bdata_zori(data)) - r=await self.output_q.get() + ori=Bdata.make_Bdata_zori(data) + await self.start_q.put(Bdata(data,ori)) + r=await self.output_q.get_by(ori) + self.output_q.clean(ori) + if isinstance(r.data,list): + for i,v in enumerate(r.data): + if isinstance(v,Exception): + r.data[i]=None return r.data @@ -462,8 +467,11 @@ def make_route_bot(self,iq,oq): async def route_in(self,bdata): if self.merge_node is None: - new_data=Bdata(bdata.data, bdata) - await super().route_in(new_data) + if bdata.ori ==0: + new_data=Bdata(bdata.data, bdata) + await super().route_in(new_data) + else: + await super().route_in(bdata) else: await self.route_in_joinmerge(bdata) diff --git a/docs/change/0.1.9.rst b/docs/change/0.1.9.rst index dda2591..45e2c24 100644 --- a/docs/change/0.1.9.rst +++ b/docs/change/0.1.9.rst @@ -1,10 +1,10 @@ -Version 0.1.9 -============= +Botflow release v0.1.9 with Http Server support +=============================================== #. Officially rename project to Botflow. -#. Enable Http Server support.Pipe can be work as Coroutine for intergate other asyncio framework. +#. Enable Http Server support.Pipe can be work as Coroutine for intergate other Asyncio framework. .. code-block:: python @@ -27,38 +27,37 @@ Version 0.1.9 .. code-block:: python + def filter_out(url): + global count + if 'http' not in url: + url = "http://127.0.0.1:8080{}".format(url) - def filter(url): - global count - if 'http' not in url: - url = "http://127.0.0.1:8080{}".format(url) + if url in seen : #filter out processed links + return None + seen.add(url) + return url - if url in seen : #filter out processed links - return None - seen.add(url) - return url + def find_all_links(r): + for a in r.soup.find_all('a', href=True): + yield a.get('href') - def find_all_links(r): - for a in r.soup.find_all('a', href=True): - yield a.get('href') + b = Return( - b = Return( - - filter, - HttpLoader(), - find_all_links, - ) + filter_out, #filter out processed links + HttpLoader(), + find_all_links, #find all links in new page. + ) - Pipe( - "http://127.0.0.1:8080/", - b, - SendTo(b), + Pipe( + "http://127.0.0.1:8080/", + b, + SendTo(b), #send new url to process again . it will make a loop - ) + ) #. Add new Node type "SpeedLimit" "Delay" .For speed control @@ -66,4 +65,7 @@ Version 0.1.9 #. Rewrite whole project for code more readable. -#. import flow graph performance by reduce the node . \ No newline at end of file +#. Import flow performance by optimize graph generate algorithm . + +#. Import crawler case performance. For local server broad crawler bench test. + it can reach 760 pages per second. 10x faster than Scrapy. \ No newline at end of file diff --git a/examples/aiohttpserver_rest.py b/examples/aiohttpserver_search.py similarity index 90% rename from examples/aiohttpserver_rest.py rename to examples/aiohttpserver_search.py index ce442c4..bc00cb0 100644 --- a/examples/aiohttpserver_rest.py +++ b/examples/aiohttpserver_search.py @@ -7,7 +7,7 @@ logging.basicConfig(level=logging.DEBUG) -config.exception_policy=config.Exception_ignore +config.exception_policy=config.Exception_pipein def parse_search(response): # raise Exception() @@ -23,7 +23,7 @@ def parse_search(response): p=Pipe( - lambda r:r.query['q'], + lambda r:r.query.get('q',''), Join( lambda q:"https://www.bing.com/search?q={}".format(q), lambda q:"https://www.google.com/search?q={}".format(q), @@ -31,7 +31,7 @@ def parse_search(response): ), Zip(n_stream=3), - HttpLoader(), + HttpLoader(timeout=3), parse_search, ) diff --git a/requirements.txt b/requirements.txt index 8d29d40..6ca1974 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aiohttp -aiomysql +beautifulsoup4 +lxml graphviz \ No newline at end of file diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..2d58e6d --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,40 @@ +import asyncio +from botflow.queue import DataQueue ,ConditionalQueue +from botflow.bdata import Bdata + +import logging +logger=logging.getLogger("botflow.queue") +logger.setLevel(logging.DEBUG) +sum=0 + + +import asyncio +import aiomysql + +loop = asyncio.get_event_loop() + + + + + + + +async def fa(): + ori1=Bdata.make_Bdata_zori(1) + ori2 = Bdata.make_Bdata_zori(2) + q = ConditionalQueue() + for i in range(100): + await q.put(Bdata(4, ori2)) + await q.put(Bdata(3,ori1)) + + + o1= asyncio.ensure_future(q.get(ori1)) + + result=await asyncio.gather(o1) + assert result[0].data==3 + + + + +def test_get_by(): + loop.run_until_complete(fa())