Skip to content

Commit

Permalink
Unregister on_event callbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
leedm777 committed Oct 27, 2013
1 parent 58eb44d commit 0220c8f
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 23 deletions.
3 changes: 0 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ TODO
====

* Create asynchronous bindings that can be used with Twisted, Tornado, etc.
* Client needs a ``close`` method, which can properly close the HTTP client
and any current WebSocket connections.
* Need a way to unregister an ``on_event`` callback.

License
-------
Expand Down
69 changes: 50 additions & 19 deletions ari/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def __init__(self, base_url, http_client):
else:
self.event_models = {}

self.websockets = set()
self.event_listeners = {}
self.global_listeners = []
self.exception_handler = \
lambda ex: log.exception("Event listener threw exception")

Expand All @@ -56,6 +56,16 @@ def __getattr__(self, item):
"'%r' object has no attribute '%s'" % (self, item))
return repo

def close(self):
"""Close this ARI client.
This method will close any currently open WebSockets, and close the
underlying Swaggerclient.
"""
for ws in self.websockets:
ws.send_close()
self.swagger.close()

def get_repo(self, name):
"""Get a specific repo by name.
Expand All @@ -65,18 +75,12 @@ def get_repo(self, name):
"""
return self.repositories.get(name)

def run(self, apps):
"""Connect to the WebSocket and begin processing messages.
This method will block until all messages have been received from the
WebSocket.
def __run(self, ws):
"""Drains all messages from a WebSocket, sending them to the client's
listeners.
:param apps: Application (or list of applications) to connect for
:type apps: str or list of str
:param ws: WebSocket to drain.
"""
if isinstance(apps, list):
apps = ','.join(apps)
ws = self.swagger.events.eventWebsocket(app=apps)
# TypeChecker false positive on iter(callable, sentinel) -> iterator
# Fixed in plugin v3.0.1
# noinspection PyTypeChecker
Expand All @@ -86,27 +90,54 @@ def run(self, apps):
log.error("Invalid event: %s" % msg_str)
continue

listeners = self.global_listeners + self.event_listeners.get(
msg_json['type'], [])
listeners = list(self.event_listeners.get(msg_json['type'], []))
for listener in listeners:
# noinspection PyBroadException
try:
listener(msg_json)
except Exception as e:
self.exception_handler(e)

def run(self, apps):
"""Connect to the WebSocket and begin processing messages.
This method will block until all messages have been received from the
WebSocket, or until this client has been closed.
:param apps: Application (or list of applications) to connect for
:type apps: str or list of str
"""
if isinstance(apps, list):
apps = ','.join(apps)
ws = self.swagger.events.eventWebsocket(app=apps)
self.websockets.add(ws)
try:
self.__run(ws)
finally:
ws.close()
self.websockets.remove(ws)

def on_event(self, event_type, event_cb):
"""Register callback for events with given type.
:param event_type: String name of the event to register for.
:param event_cb: Callback function
:type event_cb: (dict) -> None
"""
listeners = self.event_listeners.get(event_type)
if listeners is None:
listeners = []
self.event_listeners[event_type] = listeners
listeners.append(event_cb)
listeners = self.event_listeners.setdefault(event_type, set())
listeners.add(event_cb)
client = self

class EventUnsubscriber(object):
"""Class to allow events to be unsubscribed.
"""

def close(self):
"""Unsubscribe the associated event callback.
"""
client.event_listeners[event_type].discard(event_cb)

return EventUnsubscriber()

def on_object_event(self, event_type, event_cb, factory_fn, model_id):
"""Register callback for events with the given type. Event fields of
Expand Down Expand Up @@ -150,7 +181,7 @@ def extract_objects(event):
obj = None
event_cb(obj, event)

self.on_event(event_type, extract_objects)
return self.on_event(event_type, extract_objects)

def on_channel_event(self, event_type, fn):
"""Register callback for Channel related events
Expand Down
2 changes: 1 addition & 1 deletion ari/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def fn_filter(objects, event):
if self.id == objects.id:
fn(objects, event)

self.event_reg(event_type, fn_filter)
return self.event_reg(event_type, fn_filter)


class Channel(BaseObject):
Expand Down
46 changes: 46 additions & 0 deletions ari_test/websocket_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,33 @@ def test_series(self):
]
self.assertEqual(expected, self.actual)

def test_unsubscribe(self):
messages = [
'{"type": "ev", "data": 1}',
'{"type": "ev", "data": 2}'
]
uut = connect(BASE_URL, messages)
self.once_ran = 0

def only_once(event):
self.once_ran += 1
self.assertEqual(1, event['data'])
self.once.close()

def both_events(event):
self.record_event(event)

self.once = uut.on_event("ev", only_once)
self.both = uut.on_event("ev", both_events)
uut.run('test')

expected = [
{"type": "ev", "data": 1},
{"type": "ev", "data": 2}
]
self.assertEqual(expected, self.actual)
self.assertEqual(1, self.once_ran)

def test_on_channel(self):
self.serve(DELETE, 'channels', 'test-channel')
messages = [
Expand All @@ -70,6 +97,25 @@ def cb(channel, event):
]
self.assertEqual(expected, self.actual)

def test_on_channel_unsubscribe(self):
messages = [
'{ "type": "StasisStart", "channel": { "id": "test-channel1" } }',
'{ "type": "StasisStart", "channel": { "id": "test-channel2" } }'
]
uut = connect(BASE_URL, messages)

def only_once(channel, event):
self.record_event(event)
self.once.close()

self.once = uut.on_channel_event('StasisStart', only_once)
uut.run('test')

expected = [
{"type": "StasisStart", "channel": {"id": "test-channel1"}}
]
self.assertEqual(expected, self.actual)

def test_channel_on_event(self):
self.serve(GET, 'channels', 'test-channel',
body='{"id": "test-channel"}')
Expand Down

0 comments on commit 0220c8f

Please sign in to comment.