Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66395.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Suppressed task instance listener exceptions now log the failing hook name as a structured field.
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,25 @@ def _validate_patch_task_instance_body(
def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInstanceState) -> None:
"""Fire listener hooks for the given TIs based on their new state. Listener errors are logged."""
for ti in updated_tis:
try:
if new_state == TaskInstanceState.SUCCESS:
if new_state == TaskInstanceState.SUCCESS:
try:
get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti)
elif new_state == TaskInstanceState.FAILED:
except Exception:
log.exception("error calling listener for hook %r", "on_task_instance_success")
elif new_state == TaskInstanceState.FAILED:
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=None,
task_instance=ti,
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
)
elif new_state == TaskInstanceState.SKIPPED:
except Exception:
log.exception("error calling listener for hook %r", "on_task_instance_failed")
elif new_state == TaskInstanceState.SKIPPED:
try:
get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti)
except Exception:
log.exception("error calling listener")
except Exception:
log.exception("error calling listener for hook %r", "on_task_instance_skipped")


def _reload_tis_with_rendered_fields(tis: list[TI], session: Session) -> list[TI]:
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ def fetch_handle_failure_context(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_failed")

return ti

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/listeners/test_listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc

ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
ti.run()
assert "error calling listener" in cap_structlog
assert "error calling listener for hook 'on_task_instance_success'" in cap_structlog


@provide_session
Expand Down
10 changes: 5 additions & 5 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> ToSuperv
previous_state=TaskInstanceState.QUEUED, task_instance=ti
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_running")

# No error, carry on and execute the task
return None
Expand Down Expand Up @@ -1903,23 +1903,23 @@ def finalize(
previous_state=TaskInstanceState.RUNNING, task_instance=ti
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_success")
elif state == TaskInstanceState.SKIPPED:
_run_task_state_change_callbacks(task, "on_skipped_callback", context, log)
try:
get_listener_manager().hook.on_task_instance_skipped(
previous_state=TaskInstanceState.RUNNING, task_instance=ti
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_skipped")
elif state == TaskInstanceState.UP_FOR_RETRY:
_run_task_state_change_callbacks(task, "on_retry_callback", context, log)
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_failed")
if error and task.email_on_retry and task.email:
_send_error_email_notification(task, ti, context, error, log)
elif state == TaskInstanceState.FAILED:
Expand All @@ -1929,7 +1929,7 @@ def finalize(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_task_instance_failed")
if error and task.email_on_failure and task.email:
_send_error_email_notification(task, ti, context, error, log)

Expand Down
37 changes: 37 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3941,6 +3941,43 @@ def execute(self, context):

assert listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS]

def test_listener_error_log_includes_hook_name(
self, mocked_parse, mock_supervisor_comms, listener_manager
):
"""When a listener hook raises, the exception log must identify which hook
raised so plugin authors can debug across multiple registered listeners."""

class ThrowingListener:
@hookimpl
def on_task_instance_success(self, previous_state, task_instance):
raise RuntimeError("listener boom")

listener_manager(ThrowingListener())

class CustomOperator(BaseOperator):
def execute(self, context):
pass

task = CustomOperator(task_id="test_listener_error_log_includes_hook_name")
dag = get_inline_dag(dag_id="test_dag", task=task)
ti = TaskInstance(
id=uuid7(),
task_id=task.task_id,
dag_id=dag.dag_id,
run_id="test_run",
try_number=1,
dag_version_id=uuid7(),
)
runtime_ti = RuntimeTaskInstance.model_construct(
**ti.model_dump(exclude_unset=True), task=task, start_date=timezone.utcnow()
)
log = mock.MagicMock()
context = runtime_ti.get_template_context()
state, _, _ = run(runtime_ti, context, log)
finalize(runtime_ti, state, context, log)

log.exception.assert_any_call("error calling listener for hook %r", "on_task_instance_success")

@pytest.mark.parametrize(
"exception",
[
Expand Down
Loading