Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write feature to ESTaskHandler #44973

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

Expand All @@ -322,6 +324,8 @@
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
"write_to_es": ELASTICSEARCH_WRITE_TO_ES,
"target_index": ELASTICSEARCH_TARGET_INDEX,
"json_format": ELASTICSEARCH_JSON_FORMAT,
"json_fields": ELASTICSEARCH_JSON_FIELDS,
"host_field": ELASTICSEARCH_HOST_FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message
host_field = host
offset_field = offset
index_patterns = _all
write_to_es = False
target_index = airflow-logs

[elasticsearch_configs]
use_ssl = False
Expand Down
19 changes: 19 additions & 0 deletions docs/apache-airflow-providers-elasticsearch/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Writing logs to Elasticsearch

Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.

Airflow also supports writing log to Elasticsearch directly without requiring additional software like filebeat and logstash. To enable this feature, set ``write_to_es`` and ``json_format`` to ``True`` and ``write_stdout`` to ``False`` in ``airflow.cfg``. Please be aware that if you set both ``write_to_es`` and ``delete_local_logs`` in logging section to true, airflow will delete the local copy of task logs upon successfully writing task logs to ElasticSearch.

You can choose to have all task logs from workers output to the highest parent level process, instead of the standard file locations. This allows for some additional flexibility in container environments like Kubernetes, where container stdout is already being logged to the host nodes. From there a log shipping tool can be used to forward them along to Elasticsearch. To use this feature, set the ``write_stdout`` option in ``airflow.cfg``.
You can also choose to have the logs output in a JSON format, using the ``json_format`` option. Airflow uses the standard Python logging module and JSON fields are directly extracted from the LogRecord object. To use this feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to the comma-delimited string that you want collected for the logs. These fields are from the LogRecord object in the ``logging`` module. `Documentation on different attributes can be found here <https://docs.python.org/3/library/logging.html#logrecord-objects/>`_.

Expand All @@ -47,6 +49,21 @@ To output task logs to stdout in JSON format, the following config could be used
write_stdout = True
json_format = True

To output task logs to ElasticSearch, the following config could be used: (set ``delete_local_logs`` to true if you don't want retain a local copy of task log)

.. code-block:: ini

[logging]
remote_logging = True
delete_local_logs = False

[elasticsearch]
host = <host>:<port>
write_stdout = False
json_format = True
write_to_es = True
target_index = [name of the index to store logs]

.. _write-logs-elasticsearch-tls:

Writing logs to Elasticsearch over TLS
Expand All @@ -55,6 +72,8 @@ Writing logs to Elasticsearch over TLS
To add custom configurations to ElasticSearch (e.g. turning on ``ssl_verify``, adding a custom self-signed
cert, etc.) use the ``elasticsearch_configs`` setting in your ``airflow.cfg``

Note that these configurations also apply when you enable writing logs to ElasticSearch

.. code-block:: ini

[logging]
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ fetchmany
fetchone
FieldMask
Filebeat
filebeat
filehandle
fileloc
filelocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

import contextlib
import inspect
import json
import logging
import os
import pathlib
import shutil
import sys
import time
from collections import defaultdict
Expand All @@ -30,6 +34,7 @@
# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch import helpers
from elasticsearch.exceptions import NotFoundError

from airflow.configuration import conf
Expand Down Expand Up @@ -106,10 +111,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
"""
ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch.

Note that Airflow does not handle the indexing of logs into Elasticsearch. Instead,
Note that Airflow by default does not handle the indexing of logs into Elasticsearch. Instead,
Airflow flushes logs into local files. Additional software setup is required to index
the logs into Elasticsearch, such as using Filebeat and Logstash.

Airflow can be configured to support directly writing logging to Elasticsearch. To enable this feature,
set `json_format` and `write_to_es` to `True`.

To efficiently query and sort Elasticsearch results, this handler assumes each
log message has a field `log_id` consists of ti primary keys:
`log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
Expand All @@ -136,6 +144,8 @@ def __init__(
write_stdout: bool,
json_format: bool,
json_fields: str,
write_to_es: bool = False,
target_index: str = "airflow-logs",
host_field: str = "host",
offset_field: str = "offset",
host: str = "http://localhost:9200",
Expand Down Expand Up @@ -166,6 +176,11 @@ def __init__(
self.index_patterns = index_patterns
self.index_patterns_callable = index_patterns_callable
self.context_set = False
self.write_to_es = write_to_es
self.target_index = target_index
self.delete_local_copy = kwargs.get(
"delete_local_copy", conf.getboolean("logging", "delete_local_logs")
)

self.formatter: logging.Formatter
self.handler: logging.FileHandler | logging.StreamHandler # type: ignore[assignment]
Expand Down Expand Up @@ -428,9 +443,11 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
extras={
"dag_id": str(ti.dag_id),
"task_id": str(ti.task_id),
date_key: self._clean_date(ti.logical_date)
if AIRFLOW_V_3_0_PLUS
else self._clean_date(ti.execution_date),
date_key: (
self._clean_date(ti.logical_date)
if AIRFLOW_V_3_0_PLUS
else self._clean_date(ti.execution_date)
),
"try_number": str(ti.try_number),
"log_id": self._render_log_id(ti, ti.try_number),
},
Expand Down Expand Up @@ -480,6 +497,18 @@ def close(self) -> None:
self.handler.close()
sys.stdout = sys.__stdout__

if self.write_to_es and not self.write_stdout:
full_path = self.handler.baseFilename # type: ignore[union-attr]
log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
local_loc = os.path.join(self.local_base, log_relative_path)
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
log = pathlib.Path(local_loc).read_text()
log_lines = self._parse_raw_log(log)
success = self._write_to_es(log_lines)
if success and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

super().close()

self.closed = True
Expand Down Expand Up @@ -599,6 +628,31 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class)
return callback(hit)

def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
logs = log.split("\n")
parsed_logs = []
for line in logs:
# Make sure line is not empty
if line.strip():
parsed_logs.append(json.loads(line))

return parsed_logs

def _write_to_es(self, log_lines: list[dict[str, Any]]) -> bool:
"""
Write the log to ElasticSearch; return `True` or fails silently and return `False`.

:param log_lines: the log_lines to write to the ElasticSearch.
"""
# Prepare the bulk request for Elasticsearch
bulk_actions = [{"_index": self.target_index, "_source": log} for log in log_lines]
try:
_ = helpers.bulk(self.client, bulk_actions)
return True
except Exception as e:
self.log.exception("Unable to insert logs into Elasticsearch. Reason: %s", str(e))
return False


def getattr_nested(obj, item, default):
"""
Expand Down
14 changes: 14 additions & 0 deletions providers/src/airflow/providers/elasticsearch/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ config:
type: string
example: ~
default: "False"
write_to_es:
description: |
Write the task logs to the ElasticSearch
version_added: 5.5.4
type: string
example: ~
default: "False"
target_index:
description: |
Name of the index to write to, when enabling writing the task logs to the ElasticSearch
version_added: 5.5.4
type: string
example: ~
default: "airflow-logs"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
Expand Down
26 changes: 23 additions & 3 deletions providers/tests/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class TestElasticsearchTaskHandler:
def ti(self, create_task_instance, create_log_template):
create_log_template(
self.FILENAME_TEMPLATE,
"{dag_id}-{task_id}-{logical_date}-{try_number}"
if AIRFLOW_V_3_0_PLUS
else "{dag_id}-{task_id}-{execution_date}-{try_number}",
(
"{dag_id}-{task_id}-{logical_date}-{try_number}"
if AIRFLOW_V_3_0_PLUS
else "{dag_id}-{task_id}-{execution_date}-{try_number}"
),
)
yield get_ti(
dag_id=self.DAG_ID,
Expand Down Expand Up @@ -673,6 +675,24 @@ def test_filename_template_for_backward_compatibility(self):
filename_template=None,
)

def test_write_to_es(self, ti):
self.es_task_handler.write_to_es = True
self.es_task_handler.json_format = True
self.es_task_handler.write_stdout = False
self.es_task_handler.local_base = Path(os.getcwd()) / "local" / "log" / "location"
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.es_task_handler.formatter = formatter

self.es_task_handler.set_context(ti)
with patch(
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler._write_to_es"
) as mock_write_to_es:
mock_write = Mock(return_value=True)
mock_write_to_es.return_value = mock_write
self.es_task_handler._write_to_es = mock_write_to_es
self.es_task_handler.close()
mock_write_to_es.assert_called_once()


def test_safe_attrgetter():
class A: ...
Expand Down