Skip to content

Commit 671d403

Browse files
committed
Properly handle late HTTPExceptions
The code as suggested worked fine only if HTTPExceptions occurred before the 201 response had been sent from an Action. The new code uses a lock to robustly determine whether the exception should be propagated in the main thread, or in the Action. This means that late-raised HTTPExceptions should now show up in the logs properly.
1 parent 75c75b9 commit 671d403

File tree

5 files changed

+120
-26
lines changed

5 files changed

+120
-26
lines changed

src/labthings/actions/pool.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,21 @@ def start(self, thread: ActionThread):
2929
self.add(thread)
3030
thread.start()
3131

32-
def spawn(self, action: str, function, *args, **kwargs):
32+
def spawn(self, action: str, function, http_error_lock=None, *args, **kwargs):
3333
"""
3434
3535
:param function:
3636
:param *args:
3737
:param **kwargs:
3838
3939
"""
40-
thread = ActionThread(action, target=function, args=args, kwargs=kwargs)
40+
thread = ActionThread(
41+
action,
42+
target=function,
43+
http_error_lock=http_error_lock,
44+
args=args,
45+
kwargs=kwargs
46+
)
4147
self.start(thread)
4248
return thread
4349

src/labthings/actions/thread.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any, Callable, Dict, Iterable, Optional
88

99
from flask import copy_current_request_context, has_request_context, request
10-
from werkzeug.exceptions import BadRequest
10+
from werkzeug.exceptions import BadRequest, HTTPException
1111

1212
from ..deque import LockableDeque
1313
from ..utilities import TimeoutTracker
@@ -22,6 +22,42 @@ class ActionKilledException(SystemExit):
2222
class ActionThread(threading.Thread):
2323
"""
2424
A native thread with extra functionality for tracking progress and thread termination.
25+
26+
Arguments:
27+
* `action` is the name of the action that's running
28+
* `target`, `name`, `args`, `kwargs` and `daemon` are passed to `threading.Thread`
29+
(though the defualt for `daemon` is changed to `True`)
30+
* `default_stop_timeout` specifies how long we wait for the `target` function to
31+
stop nicely (e.g. by checking the `stopping` Event )
32+
* `log_len` gives the number of log entries before we start dumping them
33+
* `http_error_lock` allows the calling thread to handle some
34+
errors initially. See below.
35+
36+
## Error propagation
37+
If the `target` function throws an Exception, by default this will result in:
38+
* The thread terminating
39+
* The Action's status being set to `error`
40+
* The exception appearing in the logs with a traceback
41+
* The exception being raised in the background thread.
42+
However, `HTTPException` subclasses are used in Flask/Werkzeug web apps to
43+
return HTTP status codes indicating specific errors, and so merit being
44+
handled differently.
45+
46+
Normally, when an Action is initiated, the thread handling the HTTP request
47+
does not return immediately - it waits for a short period to check whether
48+
the Action has completed or returned an error. If an HTTPError is raised
49+
in the Action thread before the initiating thread has sent an HTTP response,
50+
we **don't** want to propagate the error here, but instead want to re-raise
51+
it in the calling thread. This will then mean that the HTTP request is
52+
answered with the appropriate error code, rather than returning a `201`
53+
code, along with a description of the task (showing that it was successfully
54+
started, but also showing that it subsequently failed with an error).
55+
56+
In order to activate this behaviour, we must pass in a `threading.Lock`
57+
object. This lock should already be acquired by the request-handling
58+
thread. If an error occurs, and this lock is acquired, the exception
59+
should not be re-raised until the calling thread has had the chance to deal
60+
with it.
2561
"""
2662

2763
def __init__(
@@ -34,6 +70,7 @@ def __init__(
3470
daemon: bool = True,
3571
default_stop_timeout: int = 5,
3672
log_len: int = 100,
73+
http_error_lock: Optional[threading.Lock] = None
3774
):
3875
threading.Thread.__init__(
3976
self,
@@ -56,6 +93,8 @@ def __init__(
5693
# Event to track if the user has requested stop
5794
self.stopping: threading.Event = threading.Event()
5895
self.default_stop_timeout: int = default_stop_timeout
96+
# Allow the calling thread to handle HTTP errors for a short time at the start
97+
self.http_error_lock = http_error_lock or threading.Lock()
5998

6099
# Make _target, _args, and _kwargs available to the subclass
61100
self._target: Optional[Callable] = target
@@ -220,16 +259,32 @@ def wrapped(*args, **kwargs):
220259
# Set state to stopped
221260
self._status = "cancelled"
222261
self.progress = None
262+
except HTTPException as e:
263+
self._exception = e
264+
# If the lock is acquired elsewhere, assume the error
265+
# will be handled there.
266+
if self.http_error_lock.acquire(blocking=False):
267+
self.http_error_lock.release()
268+
logging.error(
269+
"An HTTPException occurred in an action thread, but "
270+
"the parent request was no longer waiting for it."
271+
)
272+
logging.error(traceback.format_exc())
273+
raise e
274+
else:
275+
logging.info(f"Propagating {e} back to request handler")
223276
except Exception as e: # skipcq: PYL-W0703
224-
logging.error(traceback.format_exc())
225-
self._return_value = str(e)
226-
self._status = "error"
227277
self._exception = e
278+
logging.error(traceback.format_exc())
228279
raise e
229280
finally:
230281
self._end_time = datetime.datetime.now()
231282
logging.getLogger().removeHandler(handler) # Stop logging this thread
232283
# If we don't remove the handler, it's a memory leak.
284+
if self._exception:
285+
self._return_value = str(self._exception)
286+
self._status = "error"
287+
233288

234289
return wrapped
235290

src/labthings/views/__init__.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime
2+
import threading
23
from collections import OrderedDict
34
from typing import Callable, Dict, List, Optional, Set, cast
45

@@ -216,20 +217,23 @@ def dispatch_request(self, *args, **kwargs):
216217
pool = (
217218
current_labthing().actions if current_labthing() else self._emergency_pool
218219
)
219-
# Make a task out of the views `post` method
220-
task = pool.spawn(self.endpoint, meth, *args, **kwargs)
221-
# Optionally override the threads default_stop_timeout
222-
if self.default_stop_timeout is not None:
223-
task.default_stop_timeout = self.default_stop_timeout
224-
225-
# Wait up to 2 second for the action to complete or error
226-
try:
227-
task.get(block=True, timeout=self.wait_for)
228-
except TimeoutError:
229-
pass
230-
231-
# Log the action to the view's deque
232-
self._deque.append(task)
220+
error_lock = threading.RLock() # This indicates that we'll handle errors for now
221+
with error_lock:
222+
# Make a task out of the views `post` method
223+
task = pool.spawn(self.endpoint, meth, *args, http_error_lock=error_lock, **kwargs)
224+
# Optionally override the threads default_stop_timeout
225+
if self.default_stop_timeout is not None:
226+
task.default_stop_timeout = self.default_stop_timeout
227+
228+
# Log the action to the view's deque
229+
self._deque.append(task)
230+
231+
# Wait up to 2 second for the action to complete or error
232+
try:
233+
task.get(block=True, timeout=self.wait_for)
234+
except TimeoutError:
235+
pass
236+
233237

234238
# If the action returns quickly, and returns a valid Response, return it as-is
235239
if task.output and isinstance(task.output, ResponseBase):

tests/conftest.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import os
3+
import time
34

45
import jsonschema
56
import pytest
@@ -213,26 +214,29 @@ def post(self, args):
213214
thing.add_view(TestFieldProperty, "/TestFieldProperty")
214215

215216
class FailAction(ActionView):
216-
wait_for = 1.0
217+
wait_for = 0.1
217218

218219
def post(self):
219220
raise Exception("This action is meant to fail with an Exception")
220221

221222
thing.add_view(FailAction, "/FailAction")
222223

223224
class AbortAction(ActionView):
224-
wait_for = 1.0
225+
wait_for = 0.1
226+
args = {"abort_after": fields.Number()}
225227

226-
def post(self):
228+
def post(self, args):
229+
if args.get("abort_after", 0) > 0:
230+
time.sleep(args["abort_after"])
227231
abort(418, "I'm a teapot! This action should abort with an HTTP code 418")
228232

229233
thing.add_view(AbortAction, "/AbortAction")
230234

231235
class ActionWithValidation(ActionView):
232-
wait_for = 1.0
236+
wait_for = 0.1
233237
args = {"test_arg": fields.String(validate=validate.OneOf(["one", "two"]))}
234238

235-
def post(self):
239+
def post(self, args):
236240
return True
237241

238242
thing.add_view(ActionWithValidation, "/ActionWithValidation")

tests/test_action_api.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import logging
23
import time
34

45
import pytest
@@ -31,7 +32,7 @@ def test_action_exception_handling(thing_with_some_views, client):
3132
assert action["status"] == "error"
3233

3334

34-
def test_action_abort_and_validation(thing_with_some_views, client):
35+
def test_action_abort(thing_with_some_views, client):
3536
"""Check HTTPExceptions result in error codes.
3637
3738
Subclasses of HTTPError should result in a non-200 return code, not
@@ -42,11 +43,35 @@ def test_action_abort_and_validation(thing_with_some_views, client):
4243
r = client.post("/AbortAction")
4344
assert r.status_code == 418
4445

46+
@pytest.mark.filterwarnings("ignore:Exception in thread")
47+
def test_action_abort_late(thing_with_some_views, client, caplog):
48+
"""Check HTTPExceptions raised late are just regular errors."""
49+
caplog.set_level(logging.ERROR)
50+
caplog.clear()
51+
r = client.post("/AbortAction", data=json.dumps({"abort_after": 0.2}))
52+
assert r.status_code == 201 # Should have started OK
53+
time.sleep(0.3)
54+
# Now check the status - should be error
55+
r2 = client.get(r.get_json()["links"]["self"]["href"])
56+
assert r2.get_json()["status"] == "error"
57+
# Check it was logged as well
58+
error_was_raised = False
59+
for r in caplog.records:
60+
if (
61+
r.levelname == "ERROR"
62+
and "HTTPException" in r.message
63+
):
64+
error_was_raised = True
65+
assert error_was_raised
66+
67+
4568

4669
def test_action_validate(thing_with_some_views, client):
70+
"""Validation errors should result in 422 return codes."""
4771
# `/ActionWithValidation` should fail with a 400 error
4872
# if `test_arg` is not either `one` or `two`
4973
r = client.post("/ActionWithValidation", data=json.dumps({"test_arg": "one"}))
5074
assert r.status_code in [200, 201]
75+
assert r.get_json()["status"] == "completed"
5176
r = client.post("/ActionWithValidation", data=json.dumps({"test_arg": "three"}))
5277
assert r.status_code in [422]

0 commit comments

Comments
 (0)