Skip to content

Commit

Permalink
fix http server exception handle bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gj authored and gj committed Sep 8, 2018
1 parent 62a3c48 commit 8a428c1
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 63 deletions.
4 changes: 3 additions & 1 deletion botflow/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def call_wrap_r(self,func, bdata):
else:
result = r_or_c
except Exception as e:

logger.error(e)
if config.exception_policy == config.Exception_raise:
raise e
elif config.exception_policy == config.Exception_ignore:
Expand Down Expand Up @@ -153,6 +153,8 @@ def create_coro(self,bdata):
coro = self.append_q(self.merge_list,self.func, bdata, self.output_q)

return coro
elif isinstance(bdata.data,Exception):
return self.output_q.put(bdata)


else:
Expand Down
254 changes: 229 additions & 25 deletions botflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .config import config
import logging
import datetime
import collections
logger = logging.getLogger(__name__)

class QueueManager(object,metaclass=Singleton):
Expand Down Expand Up @@ -51,28 +52,7 @@ def __init__(self,maxsize=None,loop=None):
self.put_callback=None


async def readable(self):
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel() # Just in case getter is not done yet.

try:
self._getters.remove(getter)
except ValueError:
pass

if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise

pass
#TODO

async def writable(self):
pass
Expand Down Expand Up @@ -120,8 +100,8 @@ async def put(self, item):
if self.qsize()>self.high_water:
self.high_water=self.qsize()

if self.put_callback is not None:
asyncio.ensure_future(self.put_callback(item))
# if self.put_callback is not None:
# asyncio.ensure_future(self.put_callback(item))
return r

async def get(self):
Expand All @@ -130,6 +110,226 @@ async def get(self):
#r.destroy()
return r

async def get_by(self,ori):
while True:
await self.readable()
item=self._queue[-1]
if item.ori == ori:
return self._queue.popleft()


async def readable(self):
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel() # Just in case getter is not done yet.

try:
self._getters.remove(getter)
except ValueError:
pass

if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise

return


class ConditionalQueue:


def __init__(self, maxsize=0, *, loop=None):

self._loop = asyncio.get_event_loop()

self._maxsize = maxsize

# Futures.
self._getters = {} #collections.deque()
# Futures.
self._putters = collections.deque()
# self._unfinished_tasks = 0
# self._finished = asyncio.Lock.Event(loop=self._loop)
# self._finished.set()
self._init(maxsize)
self.qm=QueueManager()
self.debug = True
self.high_water = 0
self.qm.add(self)
self.put_count=0

# These three are overridable in subclasses.

def _init(self, maxsize):
self._queue = {} #collections.deque()
def _init_dict(self,ori):
if ori not in self._getters:
self._getters[ori]=collections.deque()

if ori not in self._queue:
self._queue[ori]=collections.deque()

def clean(self,ori):
del self._getters[ori]
del self._queue[ori]


def _get(self,ori):
return self._queue[ori].popleft()



def _put(self, item):

self._queue[item.ori].append(item)

# End of the overridable methods.

def _wakeup_next(self, waiters):
# Wake up the next waiter (if any) that isn't cancelled.
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
break

def __repr__(self):
return '<{} at {:#x} {}>'.format(
type(self).__name__, id(self), self._format())

def __str__(self):
return '<{} {}>'.format(type(self).__name__, self._format())

def _format(self):
result = 'maxsize={!r}'.format(self._maxsize)
if getattr(self, '_queue', None):
result += ' _queue={!r}'.format(list(self._queue))
if self._getters:
result += ' _getters[{}]'.format(len(self._getters))
if self._putters:
result += ' _putters[{}]'.format(len(self._putters))
if self._unfinished_tasks:
result += ' tasks={}'.format(self._unfinished_tasks)
return result

def qsize(self):
"""Number of items in the queue."""
size=0
for k,v in self._queue.items():
size+=len(v)
return size

@property
def maxsize(self):
"""Number of items allowed in the queue."""
return self._maxsize

def empty(self,ori):
"""Return True if the queue is empty, False otherwise."""
return not self._queue[ori]

def full(self):
"""Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default),
then full() is never True.
"""
if self._maxsize <= 0:
return False
else:
return self.qsize() >= self._maxsize


async def put(self, item):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
This method is a coroutine.
"""
self._init_dict(item.ori)
while self.full():
putter = self._loop.create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters[item.ori])
raise
return self.put_nowait(item)

def put_nowait(self, item):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
"""
if self.full():
raise asyncio.QueueFull
self._put(item)
# self._unfinished_tasks += 1
# self._finished.clear()
self._wakeup_next(self._getters[item.ori])

async def readable(self,ori):

while self.empty(ori):
getter = self._loop.create_future()
self._getters[ori].append(getter)
try:
await getter
except:
getter.cancel() # Just in case getter is not done yet.

try:
self._getters[ori].remove(getter)
except ValueError:
pass

if not self.empty(ori) and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters[ori])
raise


async def get_by(self,ori):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
This method is a coroutine.
"""
self._init_dict(ori)
await self.readable(ori)
return self.get_nowait(ori)

def get_nowait(self,ori):
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
"""
if self.empty(ori):
raise asyncio.QueueEmpty
item = self._get(ori)
self._wakeup_next(self._putters)
return item






class NullQueue(asyncio.Queue):

# |
Expand Down Expand Up @@ -202,9 +402,9 @@ class ProxyQueue(asyncio.Queue):

# |
# X
def __init__(self, maxsize=0, loop=None):
def __init__(self, q,maxsize=0, loop=None):
super().__init__(maxsize=maxsize,loop=loop)
self._q=DataQueue(maxsize=maxsize,loop=loop)
self._q=q


def set_q(self,q):
Expand All @@ -225,5 +425,9 @@ async def put(self, item):
async def get(self):
return await self._q.get()

async def get_by(self,ori):
return await self._q.get_by(ori)
def clean(self,ori):
return self._q.clean(ori)
def get_nowait(self):
return self._q.get_nowait()
22 changes: 15 additions & 7 deletions botflow/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .botframe import BotFrame
from .config import config

from .queue import DataQueue,NullQueue,CachedQueue,ProxyQueue
from .queue import DataQueue,NullQueue,CachedQueue,ProxyQueue,ConditionalQueue
from botflow.bdata import Bdata,Databoard
from .botbase import BotManager
from .routebase import Route
Expand All @@ -29,7 +29,7 @@ def __init__(self, *args):
for idx,func in enumerate(args):
q_i = q_o
if idx == len(args)-1:
q_o = ProxyQueue()
q_o = ProxyQueue(ConditionalQueue())

else:
if config.replay_mode:
Expand Down Expand Up @@ -225,9 +225,14 @@ async def read(self):


async def __call__(self, data):

await self.start_q.put(Bdata.make_Bdata_zori(data))
r=await self.output_q.get()
ori=Bdata.make_Bdata_zori(data)
await self.start_q.put(Bdata(data,ori))
r=await self.output_q.get_by(ori)
self.output_q.clean(ori)
if isinstance(r.data,list):
for i,v in enumerate(r.data):
if isinstance(v,Exception):
r.data[i]=None
return r.data


Expand Down Expand Up @@ -462,8 +467,11 @@ def make_route_bot(self,iq,oq):

async def route_in(self,bdata):
if self.merge_node is None:
new_data=Bdata(bdata.data, bdata)
await super().route_in(new_data)
if bdata.ori ==0:
new_data=Bdata(bdata.data, bdata)
await super().route_in(new_data)
else:
await super().route_in(bdata)
else:
await self.route_in_joinmerge(bdata)

Expand Down
Loading

0 comments on commit 8a428c1

Please sign in to comment.