Skip to content

Commit dd40f8b

Browse files
committed
Refactor dropping a client, resolves #81
Formerly, it was possible that a client, who is currently in the process of being dropped, could still forward messages to other clients. In order to fix this, we have to store the running task loops on the client, so the receive (and forward) loop is being cancelled once the client is being dropped.
1 parent 0f6eb6a commit dd40f8b

File tree

2 files changed

+187
-50
lines changed

2 files changed

+187
-50
lines changed

saltyrtc/server/protocol.py

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
__all__ = (
3333
'Path',
34+
'PathClientTasks',
3435
'PathClient',
3536
'Protocol',
3637
)
@@ -60,6 +61,21 @@ def empty(self):
6061
"""
6162
return all((client is None for client in self._slots.values()))
6263

64+
def has_client(self, client):
65+
"""
66+
Return whether a client's :class:`PathClient` instance is still
67+
available on the path.
68+
69+
Arguments:
70+
- `client`: The :class:`PathClient` instance to look for.
71+
72+
Raises :exc:`KeyError` in case the client has not been assigned
73+
an ID yet.
74+
"""
75+
# Note: No need to check for an unassigned ID since the server's ID will never
76+
# be available in the slots.
77+
return self._slots[client.id] == client
78+
6379
def get_initiator(self):
6480
"""
6581
Return the initiator's :class:`PathClient` instance.
@@ -146,9 +162,9 @@ def remove_client(self, client):
146162
Remove a client (initiator or responder) from the
147163
:class:`Path`.
148164
149-
.. warning:: Shall only be called from the client's
150-
own :class:`Protocol` instance. Other client's SHALL NOT
151-
be removed.
165+
.. important:: Shall only be called from the client's
166+
own :class:`Protocol` instance or from another client's
167+
:class.`Protocol` instance in case it is dropping a client.
152168
153169
Arguments:
154170
- `client`: The :class:`PathClient` instance.
@@ -172,6 +188,60 @@ def remove_client(self, client):
172188
self.log.debug('Removed {}', 'initiator' if is_initiator_id(id_) else 'responder')
173189

174190

191+
# TODO: We should be able to use a NamedTuple for this once we drop Python 3.4 support
192+
class PathClientTasks:
193+
__slots__ = (
194+
'task_loop',
195+
'receive_loop',
196+
'keep_alive_loop',
197+
)
198+
199+
def __init__(
200+
self,
201+
task_loop=None, receive_loop=None, keep_alive_loop=None,
202+
loop=None
203+
):
204+
if loop is None:
205+
asyncio.get_event_loop()
206+
self.task_loop = self._ensure_future_or_none(task_loop, loop)
207+
self.receive_loop = self._ensure_future_or_none(receive_loop, loop=loop)
208+
self.keep_alive_loop = self._ensure_future_or_none(keep_alive_loop, loop=loop)
209+
210+
@property
211+
def tasks(self):
212+
"""
213+
Return all tasks (including those who are set to `None`) as a
214+
tuple.
215+
"""
216+
return (
217+
self.task_loop,
218+
self.receive_loop,
219+
self.keep_alive_loop,
220+
)
221+
222+
@property
223+
def valid(self):
224+
"""
225+
Return all valid tasks (i.e. those who are not set to `None`)
226+
as an iterable.
227+
"""
228+
return (task for task in self.tasks if task is not None)
229+
230+
def cancel_all_but_task_loop(self):
231+
"""
232+
Cancel all valid tasks but the task queue.
233+
"""
234+
for task in self.valid:
235+
if task != self.task_loop:
236+
task.cancel()
237+
238+
@staticmethod
239+
def _ensure_future_or_none(coroutine_or_task, loop):
240+
if coroutine_or_task is None:
241+
return None
242+
return asyncio.ensure_future(coroutine_or_task, loop=loop)
243+
244+
175245
class PathClient:
176246
__slots__ = (
177247
'_loop',
@@ -195,6 +265,7 @@ class PathClient:
195265
'authenticated',
196266
'keep_alive_timeout',
197267
'keep_alive_pings',
268+
'tasks',
198269
'_task_queue',
199270
'_task_queue_state',
200271
)
@@ -223,6 +294,7 @@ def __init__(
223294
self.authenticated = False
224295
self.keep_alive_timeout = KEEP_ALIVE_TIMEOUT
225296
self.keep_alive_pings = 0
297+
self.tasks = None
226298

227299
# Schedule connection closed future
228300
def _connection_closed(_):
@@ -499,6 +571,12 @@ def enqueue_task(self, coroutine_or_task, ignore_closed=False):
499571
Enqueue a coroutine or task into the task queue of the
500572
client.
501573
574+
.. important:: Only the following tasks shall be enqueued:
575+
- Messages from the server towards this client.
576+
- Messages from other clients **towards** this
577+
client (i.e. relayed messages).
578+
- Delayed close operations towards this client.
579+
502580
.. note:: Coroutines will be closed and :class:`asyncio.Task`s
503581
will be cancelled when the task queue has been closed
504582
(unless `ignore_closed` has been set to `True`) or
@@ -557,6 +635,7 @@ def close_task_queue(self):
557635

558636
# Update state
559637
self._task_queue_state = _TaskQueueState.closed
638+
self.log.debug('Closed task queue')
560639

561640
def cancel_task_queue(self):
562641
"""
@@ -698,7 +777,7 @@ def _wait_pong(self, pong_future):
698777
def close(self, code=1000):
699778
"""
700779
Initiate the closing procedure and wait for the connection to
701-
be closed.
780+
become closed.
702781
703782
Arguments:
704783
- `close`: The close code.
@@ -710,6 +789,46 @@ def close(self, code=1000):
710789
# Note: We are not sending a reason for security reasons.
711790
yield from self._connection.close(code=code)
712791

792+
def drop(self, code):
793+
"""
794+
Drop this client. Will enqueue the closing procedure and cancel
795+
the receive loop as well as the keep alive loop of the client.
796+
797+
Return the enqueue operation in form of a
798+
:class:`asyncio.Task`.
799+
800+
.. important:: This should only be called by other client's
801+
protocols dropping this client.
802+
803+
Arguments:
804+
- `close`: The close code.
805+
"""
806+
# Enqueue the close procedure on our own task queue.
807+
# Note: The closing procedure would interrupt further send operations, thus we
808+
# MUST enqueue it as a coroutine and NOT wrap in a Future. That way, it
809+
# will not initiate the closing procedure before this client has executed
810+
# all other pending tasks.
811+
self.log.debug('Scheduling delayed closing procedure', code)
812+
close_coroutine = self.close(code=code)
813+
enqueue_task = asyncio.ensure_future(
814+
self.enqueue_task(close_coroutine, ignore_closed=True), loop=self._loop)
815+
816+
# Close the task queue to ensure no further tasks can be
817+
# enqueued while the client is in the closing process.
818+
self.close_task_queue()
819+
820+
# Cancel all loops for the client but the task queue.
821+
# Note: This will ensure that all messages forwarded towards the client to be
822+
# dropped will still be forwarded. But the to be dropped client will not be
823+
# able to send any more messages towards the server or relay messages
824+
# towards other clients.
825+
self.log.debug('Cancelling all running tasks but the task loop')
826+
self.tasks.cancel_all_but_task_loop()
827+
828+
# Dropped
829+
self.log.debug('Client dropped, close code: {}', code)
830+
return enqueue_task
831+
713832

714833
class Protocol:
715834
PATH_LENGTH = KEY_LENGTH * 2

0 commit comments

Comments
 (0)