From fe148c5f8b0e775005ad535ba7326502484bc7d3 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 29 May 2026 18:27:34 +0800 Subject: [PATCH] fix: include task execution errors in task logs Executor failures only reached the task's error field, never the task log stream users read via `flowmesh logs`: the failure handler logged a bare "Task failed" line and TaskLogEmitter dropped record exc_info. Emit the formatted traceback for unexpected exceptions and a clean message for controlled ExecutionError, so the reason is visible in the stream. Signed-off-by: Noppanat Wadlom --- src/worker/runner.py | 7 ++- src/worker/utils/logging.py | 9 ++++ tests/worker/test_task_log_emitter.py | 73 +++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 tests/worker/test_task_log_emitter.py diff --git a/src/worker/runner.py b/src/worker/runner.py index 7b7086de..e68b5b61 100644 --- a/src/worker/runner.py +++ b/src/worker/runner.py @@ -21,7 +21,7 @@ from shared.utils.manifest import prepare_output_dir, sync_manifest from shared.utils.time import now_iso -from .executors.base_executor import Executor, TaskCancelledError +from .executors.base_executor import ExecutionError, Executor, TaskCancelledError from .executors.utils.checkpoints import ( get_http_destination, write_executor_result, @@ -561,7 +561,10 @@ def start(self) -> None: shard_total=shard_total, ) self.lifecycle.set_failed(task_id, str(e), metadata=metadata) - self.logger.exception("Task %s failed", task_id) + if isinstance(e, ExecutionError): + self.logger.error("Task %s failed: %s", task_id, e) + else: + self.logger.exception("Task %s failed", task_id) finally: self._current_task_id = None with self._active_executor_lock: diff --git a/src/worker/utils/logging.py b/src/worker/utils/logging.py index d9ea4782..c8e46406 100644 --- a/src/worker/utils/logging.py +++ b/src/worker/utils/logging.py @@ -258,6 +258,8 @@ def _run(self) -> None: class TaskLogEmitter(logging.Handler): """Per-task log handler that emits Python logging records to the server.""" + _traceback_formatter = logging.Formatter() + def __init__( self, stub: supervisor_pb2_grpc.SupervisorStub, @@ -304,6 +306,13 @@ def emit(self, record: logging.LogRecord) -> None: message = record.getMessage() except Exception: message = str(getattr(record, "msg", "")) + if record.exc_info: + if not record.exc_text: + record.exc_text = self._traceback_formatter.formatException( + record.exc_info + ) + if exc_text := record.exc_text: + message = f"{message}\n{exc_text}" if message else exc_text if not message: return diff --git a/tests/worker/test_task_log_emitter.py b/tests/worker/test_task_log_emitter.py new file mode 100644 index 00000000..2414afca --- /dev/null +++ b/tests/worker/test_task_log_emitter.py @@ -0,0 +1,73 @@ +"""Tests for TaskLogEmitter payload construction.""" + +import logging +import sys +from typing import Any, cast +from unittest import mock + +from worker.utils.logging import TaskLogEmitter + + +class _CapturingStream: + def __init__(self) -> None: + self.payloads: list[dict[str, Any]] = [] + + def send(self, payload: dict[str, Any]) -> None: + self.payloads.append(payload) + + def close(self) -> None: + pass + + +def _make_emitter() -> tuple[TaskLogEmitter, _CapturingStream]: + with mock.patch("worker.utils.logging._GrpcLogStream"): + emitter = TaskLogEmitter( + stub=mock.Mock(), + metadata=(), + struct_from_payload=lambda payload: payload, + logger=logging.getLogger("test_task_log_emitter"), + task_id="tsk-1", + workflow_id="wfl-1", + owner_id="own-1", + worker_id="wrk-1", + ) + capture = _CapturingStream() + emitter._stream = cast(Any, capture) + return emitter, capture + + +def _record(msg: str, args: tuple[Any, ...], exc_info: Any = None) -> logging.LogRecord: + return logging.LogRecord( + name="task", + level=logging.ERROR, + pathname=__file__, + lineno=1, + msg=msg, + args=args, + exc_info=exc_info, + ) + + +def test_emit_includes_traceback_when_exc_info_present() -> None: + emitter, capture = _make_emitter() + try: + raise ValueError("spec.api.url is required") + except ValueError: + record = _record("Task %s failed", ("tsk-1",), exc_info=sys.exc_info()) + + emitter.emit(record) + + assert len(capture.payloads) == 1 + message = capture.payloads[0]["message"] + assert "Task tsk-1 failed" in message + assert "Traceback (most recent call last)" in message + assert "ValueError: spec.api.url is required" in message + + +def test_emit_plain_message_without_exc_info() -> None: + emitter, capture = _make_emitter() + + emitter.emit(_record("Task %s failed: %s", ("tsk-1", "bad spec"))) + + assert len(capture.payloads) == 1 + assert capture.payloads[0]["message"] == "Task tsk-1 failed: bad spec"