diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index cee5b428f6db6..f440261dafc86 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -308,8 +308,10 @@ ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") + ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES") ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") + ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX") ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") @@ -322,6 +324,8 @@ "host": ELASTICSEARCH_HOST, "frontend": ELASTICSEARCH_FRONTEND, "write_stdout": ELASTICSEARCH_WRITE_STDOUT, + "write_to_es": ELASTICSEARCH_WRITE_TO_ES, + "target_index": ELASTICSEARCH_TARGET_INDEX, "json_format": ELASTICSEARCH_JSON_FORMAT, "json_fields": ELASTICSEARCH_JSON_FIELDS, "host_field": ELASTICSEARCH_HOST_FIELD, diff --git a/airflow/config_templates/provider_config_fallback_defaults.cfg b/airflow/config_templates/provider_config_fallback_defaults.cfg index ba92feaef473c..b49c633c5af1d 100644 --- a/airflow/config_templates/provider_config_fallback_defaults.cfg +++ b/airflow/config_templates/provider_config_fallback_defaults.cfg @@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message host_field = host offset_field = offset index_patterns = _all +write_to_es = False +target_index = airflow-logs [elasticsearch_configs] use_ssl = False diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst b/docs/apache-airflow-providers-elasticsearch/logging/index.rst index 7b56958cafe3a..eaa46def53b96 100644 --- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst @@ -22,6 +22,8 @@ Writing logs to Elasticsearch Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others. +Airflow also supports writing log to Elasticsearch directly without requiring additional software like filebeat and logstash. To enable this feature, set ``write_to_es`` and ``json_format`` to ``True`` and ``write_stdout`` to ``False`` in ``airflow.cfg``. Please be aware that if you set both ``write_to_es`` and ``delete_local_logs`` in logging section to true, airflow will delete the local copy of task logs upon successfully writing task logs to ElasticSearch. + You can choose to have all task logs from workers output to the highest parent level process, instead of the standard file locations. This allows for some additional flexibility in container environments like Kubernetes, where container stdout is already being logged to the host nodes. From there a log shipping tool can be used to forward them along to Elasticsearch. To use this feature, set the ``write_stdout`` option in ``airflow.cfg``. You can also choose to have the logs output in a JSON format, using the ``json_format`` option. Airflow uses the standard Python logging module and JSON fields are directly extracted from the LogRecord object. To use this feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to the comma-delimited string that you want collected for the logs. These fields are from the LogRecord object in the ``logging`` module. `Documentation on different attributes can be found here `_. @@ -47,6 +49,21 @@ To output task logs to stdout in JSON format, the following config could be used write_stdout = True json_format = True +To output task logs to ElasticSearch, the following config could be used: (set ``delete_local_logs`` to true if you don't want retain a local copy of task log) + +.. code-block:: ini + + [logging] + remote_logging = True + delete_local_logs = False + + [elasticsearch] + host = : + write_stdout = False + json_format = True + write_to_es = True + target_index = [name of the index to store logs] + .. _write-logs-elasticsearch-tls: Writing logs to Elasticsearch over TLS @@ -55,6 +72,8 @@ Writing logs to Elasticsearch over TLS To add custom configurations to ElasticSearch (e.g. turning on ``ssl_verify``, adding a custom self-signed cert, etc.) use the ``elasticsearch_configs`` setting in your ``airflow.cfg`` +Note that these configurations also apply when you enable writing logs to ElasticSearch + .. code-block:: ini [logging] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index dafe706a1c3a2..d1a1e62d521fe 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -633,6 +633,7 @@ fetchmany fetchone FieldMask Filebeat +filebeat filehandle fileloc filelocs diff --git a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py index 241fb14aa23d6..9881f1ac5dca7 100644 --- a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -19,7 +19,11 @@ import contextlib import inspect +import json import logging +import os +import pathlib +import shutil import sys import time from collections import defaultdict @@ -30,6 +34,7 @@ # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch import pendulum +from elasticsearch import helpers from elasticsearch.exceptions import NotFoundError from airflow.configuration import conf @@ -106,10 +111,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. - Note that Airflow does not handle the indexing of logs into Elasticsearch. Instead, + Note that Airflow by default does not handle the indexing of logs into Elasticsearch. Instead, Airflow flushes logs into local files. Additional software setup is required to index the logs into Elasticsearch, such as using Filebeat and Logstash. + Airflow can be configured to support directly writing logging to Elasticsearch. To enable this feature, + set `json_format` and `write_to_es` to `True`. + To efficiently query and sort Elasticsearch results, this handler assumes each log message has a field `log_id` consists of ti primary keys: `log_id = {dag_id}-{task_id}-{logical_date}-{try_number}` @@ -136,6 +144,8 @@ def __init__( write_stdout: bool, json_format: bool, json_fields: str, + write_to_es: bool = False, + target_index: str = "airflow-logs", host_field: str = "host", offset_field: str = "offset", host: str = "http://localhost:9200", @@ -166,6 +176,11 @@ def __init__( self.index_patterns = index_patterns self.index_patterns_callable = index_patterns_callable self.context_set = False + self.write_to_es = write_to_es + self.target_index = target_index + self.delete_local_copy = kwargs.get( + "delete_local_copy", conf.getboolean("logging", "delete_local_logs") + ) self.formatter: logging.Formatter self.handler: logging.FileHandler | logging.StreamHandler # type: ignore[assignment] @@ -428,9 +443,11 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non extras={ "dag_id": str(ti.dag_id), "task_id": str(ti.task_id), - date_key: self._clean_date(ti.logical_date) - if AIRFLOW_V_3_0_PLUS - else self._clean_date(ti.execution_date), + date_key: ( + self._clean_date(ti.logical_date) + if AIRFLOW_V_3_0_PLUS + else self._clean_date(ti.execution_date) + ), "try_number": str(ti.try_number), "log_id": self._render_log_id(ti, ti.try_number), }, @@ -480,6 +497,18 @@ def close(self) -> None: self.handler.close() sys.stdout = sys.__stdout__ + if self.write_to_es and not self.write_stdout: + full_path = self.handler.baseFilename # type: ignore[union-attr] + log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix() + local_loc = os.path.join(self.local_base, log_relative_path) + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + log = pathlib.Path(local_loc).read_text() + log_lines = self._parse_raw_log(log) + success = self._write_to_es(log_lines) + if success and self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) + super().close() self.closed = True @@ -599,6 +628,31 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) + def _parse_raw_log(self, log: str) -> list[dict[str, Any]]: + logs = log.split("\n") + parsed_logs = [] + for line in logs: + # Make sure line is not empty + if line.strip(): + parsed_logs.append(json.loads(line)) + + return parsed_logs + + def _write_to_es(self, log_lines: list[dict[str, Any]]) -> bool: + """ + Write the log to ElasticSearch; return `True` or fails silently and return `False`. + + :param log_lines: the log_lines to write to the ElasticSearch. + """ + # Prepare the bulk request for Elasticsearch + bulk_actions = [{"_index": self.target_index, "_source": log} for log in log_lines] + try: + _ = helpers.bulk(self.client, bulk_actions) + return True + except Exception as e: + self.log.exception("Unable to insert logs into Elasticsearch. Reason: %s", str(e)) + return False + def getattr_nested(obj, item, default): """ diff --git a/providers/src/airflow/providers/elasticsearch/provider.yaml b/providers/src/airflow/providers/elasticsearch/provider.yaml index a1ddbae845c0f..88ebba2a510c4 100644 --- a/providers/src/airflow/providers/elasticsearch/provider.yaml +++ b/providers/src/airflow/providers/elasticsearch/provider.yaml @@ -136,6 +136,20 @@ config: type: string example: ~ default: "False" + write_to_es: + description: | + Write the task logs to the ElasticSearch + version_added: 5.5.4 + type: string + example: ~ + default: "False" + target_index: + description: | + Name of the index to write to, when enabling writing the task logs to the ElasticSearch + version_added: 5.5.4 + type: string + example: ~ + default: "airflow-logs" json_format: description: | Instead of the default log formatter, write the log lines as JSON diff --git a/providers/tests/elasticsearch/log/test_es_task_handler.py b/providers/tests/elasticsearch/log/test_es_task_handler.py index f87d4ffb14342..f6b3f79395009 100644 --- a/providers/tests/elasticsearch/log/test_es_task_handler.py +++ b/providers/tests/elasticsearch/log/test_es_task_handler.py @@ -81,9 +81,11 @@ class TestElasticsearchTaskHandler: def ti(self, create_task_instance, create_log_template): create_log_template( self.FILENAME_TEMPLATE, - "{dag_id}-{task_id}-{logical_date}-{try_number}" - if AIRFLOW_V_3_0_PLUS - else "{dag_id}-{task_id}-{execution_date}-{try_number}", + ( + "{dag_id}-{task_id}-{logical_date}-{try_number}" + if AIRFLOW_V_3_0_PLUS + else "{dag_id}-{task_id}-{execution_date}-{try_number}" + ), ) yield get_ti( dag_id=self.DAG_ID, @@ -673,6 +675,24 @@ def test_filename_template_for_backward_compatibility(self): filename_template=None, ) + def test_write_to_es(self, ti): + self.es_task_handler.write_to_es = True + self.es_task_handler.json_format = True + self.es_task_handler.write_stdout = False + self.es_task_handler.local_base = Path(os.getcwd()) / "local" / "log" / "location" + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + self.es_task_handler.formatter = formatter + + self.es_task_handler.set_context(ti) + with patch( + "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler._write_to_es" + ) as mock_write_to_es: + mock_write = Mock(return_value=True) + mock_write_to_es.return_value = mock_write + self.es_task_handler._write_to_es = mock_write_to_es + self.es_task_handler.close() + mock_write_to_es.assert_called_once() + def test_safe_attrgetter(): class A: ...