Skip to content

Commit fe87ee3

Browse files
committed
Fix cancellation of pending tasks
Formerly, all uncancelled tasks have been treated as if they are done. This resulted in exceptions when trying to fetch the result (or exception) from the task for logging purposes. Now, all tasks are cancelled if they are not already cancelled or not done. Tests have been added for cancelling active and queued tasks as well as coroutines since this wasn't adequately tested previously.
1 parent 0d2600d commit fe87ee3

File tree

2 files changed

+115
-5
lines changed

2 files changed

+115
-5
lines changed

Diff for: saltyrtc/server/protocol.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,11 @@ def _cancel_coroutine_or_task(self, coroutine_or_task, mark_as_done=False):
602602
else:
603603
if mark_as_done:
604604
coroutine_or_task.add_done_callback(self.task_done)
605-
if not coroutine_or_task.cancelled():
605+
# Note: We need to check for .cancelled first since a task is also marked
606+
# .done when it is cancelled.
607+
if coroutine_or_task.cancelled():
608+
self.log.debug('Already cancelled task {}', coroutine_or_task)
609+
elif coroutine_or_task.done():
606610
exc = coroutine_or_task.exception()
607611
if exc is not None:
608612
message = 'Ignoring exception of queued task {}: {}'

Diff for: tests/test_server.py

+110-4
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,83 @@ def initiator_receive_loop(self):
343343
assert len([record for record in log_handler.records
344344
if 'closed while waiting for pong' in record.message]) == 1
345345

346+
@pytest.mark.asyncio
347+
def test_misbehaving_coroutine(
348+
self, mocker, event_loop, sleep, log_ignore_filter, log_handler,
349+
initiator_key, server, client_factory
350+
):
351+
"""
352+
Check that the server handles a misbehaving coroutine
353+
correctly.
354+
"""
355+
log_ignore_filter(lambda record: 'queue did not close' in record.message)
356+
357+
# Initiator handshake
358+
initiator, _ = yield from client_factory(initiator_handshake=True)
359+
connection_closed_future = server.wait_connection_closed_marker()
360+
361+
# Mock the task queue join timeout
362+
mocker.patch('saltyrtc.server.server._TASK_QUEUE_JOIN_TIMEOUT', 0.1)
363+
364+
# Get path instance of server and initiator's PathClient instance
365+
path = server.paths.get(initiator_key.pk)
366+
path_client = path.get_initiator()
367+
368+
@asyncio.coroutine
369+
def bad_coroutine(cancelled_future):
370+
try:
371+
yield from sleep(60.0)
372+
except asyncio.CancelledError:
373+
cancelled_future.set_result(None)
374+
yield from sleep(60.0)
375+
raise
376+
377+
@asyncio.coroutine
378+
def enqueue_bad_coroutine():
379+
cancelled_future = asyncio.Future(loop=event_loop)
380+
yield from path_client.enqueue_task(bad_coroutine(cancelled_future))
381+
return cancelled_future
382+
383+
# Enqueue misbehaving coroutine
384+
# Note: We need to add two of these since one of them will be dequeued
385+
# immediately and waited for which runs in a different code
386+
# section.
387+
active_coroutine_cancelled_future = yield from enqueue_bad_coroutine()
388+
queued_coroutine_cancelled_future = yield from enqueue_bad_coroutine()
389+
390+
# Close and wait
391+
yield from initiator.ws_client.close()
392+
393+
# Expect a normal closure (seen on the server side)
394+
close_code = yield from connection_closed_future()
395+
assert close_code == 1000
396+
yield from server.wait_connections_closed()
397+
398+
# The active coroutine was activated and thus will be cancelled
399+
assert active_coroutine_cancelled_future.result() is None
400+
# Since the active coroutine does not re-raise the cancellation, it should
401+
# never be marked as cancelled by the task loop.
402+
assert len([record for record in log_handler.records
403+
if 'Cancelling active task' in record.message]) == 0
404+
405+
# The queued coroutine was never waited for and it has not been added as a task
406+
# to the event loop either. Thus, it will not be cancelled.
407+
assert not queued_coroutine_cancelled_future.done()
408+
# The queued task will be cancelled.
409+
assert len([record for record in log_handler.records
410+
if 'Cancelling 1 queued tasks' in record.message]) == 1
411+
# Ensure it has been picked up as a coroutine
412+
assert len([record for record in log_handler.records
413+
if 'Closing queued coroutine' in record.message]) == 1
414+
415+
# Check log messages
416+
assert len([record for record in log_handler.records
417+
if 'queue did not close' in record.message]) == 1
418+
346419
@pytest.mark.asyncio
347420
def test_misbehaving_task(
348-
self, mocker, sleep, log_ignore_filter, log_handler, initiator_key, server,
349-
client_factory
421+
self, mocker, event_loop, sleep, log_ignore_filter, log_handler,
422+
initiator_key, server, client_factory
350423
):
351424
"""
352425
Check that the server handles a misbehaving task correctly.
@@ -365,13 +438,27 @@ def test_misbehaving_task(
365438
path_client = path.get_initiator()
366439

367440
@asyncio.coroutine
368-
def bad_task():
441+
def bad_coroutine(cancelled_future):
369442
try:
370443
yield from sleep(60.0)
371444
except asyncio.CancelledError:
445+
cancelled_future.set_result(None)
372446
yield from sleep(60.0)
447+
raise
373448

374-
yield from path_client.enqueue_task(bad_task())
449+
@asyncio.coroutine
450+
def enqueue_bad_task():
451+
cancelled_future = asyncio.Future(loop=event_loop)
452+
yield from path_client.enqueue_task(
453+
asyncio.ensure_future(bad_coroutine(cancelled_future)))
454+
return cancelled_future
455+
456+
# Enqueue misbehaving task
457+
# Note: We need to add two of these since one of them will be dequeued
458+
# immediately and waited for which runs in a different code
459+
# section.
460+
active_task_cancelled_future = yield from enqueue_bad_task()
461+
queued_task_cancelled_future = yield from enqueue_bad_task()
375462

376463
# Close and wait
377464
yield from initiator.ws_client.close()
@@ -380,6 +467,25 @@ def bad_task():
380467
close_code = yield from connection_closed_future()
381468
assert close_code == 1000
382469
yield from server.wait_connections_closed()
470+
471+
# The active task will be implicitly cancelled by cancellation of the task loop
472+
assert active_task_cancelled_future.result() is None
473+
# Since the active task does not re-raise the cancellation, it should never be
474+
# marked as cancelled by the task loop.
475+
assert len([record for record in log_handler.records
476+
if 'Cancelling active task' in record.message]) == 0
477+
478+
# The queued task has been scheduled on the event loop and thus will be
479+
# cancelled by the task queue cancellation.
480+
assert queued_task_cancelled_future.result() is None
481+
# The queued task will be cancelled.
482+
assert len([record for record in log_handler.records
483+
if 'Cancelling 1 queued tasks' in record.message]) == 1
484+
# Ensure it has been picked up as a task
485+
assert len([record for record in log_handler.records
486+
if 'Cancelling queued task' in record.message]) == 1
487+
488+
# Check log messages
383489
assert len([record for record in log_handler.records
384490
if 'queue did not close' in record.message]) == 1
385491

0 commit comments

Comments
 (0)