Skip to content

Commit dc54b1a

Browse files
Owen-CH-LeungHariGS-DB
authored andcommitted
Add write feature to ESTaskHandler (apache#44973)
* Add write feature to ESTaskHandler
1 parent 812030c commit dc54b1a

File tree

7 files changed

+121
-7
lines changed

7 files changed

+121
-7
lines changed

airflow/config_templates/airflow_local_settings.py

+4
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,10 @@
308308
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
309309
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
310310
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
311+
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
311312
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
312313
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
314+
ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
313315
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
314316
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
315317

@@ -322,6 +324,8 @@
322324
"host": ELASTICSEARCH_HOST,
323325
"frontend": ELASTICSEARCH_FRONTEND,
324326
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
327+
"write_to_es": ELASTICSEARCH_WRITE_TO_ES,
328+
"target_index": ELASTICSEARCH_TARGET_INDEX,
325329
"json_format": ELASTICSEARCH_JSON_FORMAT,
326330
"json_fields": ELASTICSEARCH_JSON_FIELDS,
327331
"host_field": ELASTICSEARCH_HOST_FIELD,

airflow/config_templates/provider_config_fallback_defaults.cfg

+2
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message
8282
host_field = host
8383
offset_field = offset
8484
index_patterns = _all
85+
write_to_es = False
86+
target_index = airflow-logs
8587

8688
[elasticsearch_configs]
8789
use_ssl = False

docs/apache-airflow-providers-elasticsearch/logging/index.rst

+19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ Writing logs to Elasticsearch
2222

2323
Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.
2424

25+
Airflow also supports writing log to Elasticsearch directly without requiring additional software like filebeat and logstash. To enable this feature, set ``write_to_es`` and ``json_format`` to ``True`` and ``write_stdout`` to ``False`` in ``airflow.cfg``. Please be aware that if you set both ``write_to_es`` and ``delete_local_logs`` in logging section to true, airflow will delete the local copy of task logs upon successfully writing task logs to ElasticSearch.
26+
2527
You can choose to have all task logs from workers output to the highest parent level process, instead of the standard file locations. This allows for some additional flexibility in container environments like Kubernetes, where container stdout is already being logged to the host nodes. From there a log shipping tool can be used to forward them along to Elasticsearch. To use this feature, set the ``write_stdout`` option in ``airflow.cfg``.
2628
You can also choose to have the logs output in a JSON format, using the ``json_format`` option. Airflow uses the standard Python logging module and JSON fields are directly extracted from the LogRecord object. To use this feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to the comma-delimited string that you want collected for the logs. These fields are from the LogRecord object in the ``logging`` module. `Documentation on different attributes can be found here <https://docs.python.org/3/library/logging.html#logrecord-objects/>`_.
2729

@@ -47,6 +49,21 @@ To output task logs to stdout in JSON format, the following config could be used
4749
write_stdout = True
4850
json_format = True
4951
52+
To output task logs to ElasticSearch, the following config could be used: (set ``delete_local_logs`` to true if you don't want retain a local copy of task log)
53+
54+
.. code-block:: ini
55+
56+
[logging]
57+
remote_logging = True
58+
delete_local_logs = False
59+
60+
[elasticsearch]
61+
host = <host>:<port>
62+
write_stdout = False
63+
json_format = True
64+
write_to_es = True
65+
target_index = [name of the index to store logs]
66+
5067
.. _write-logs-elasticsearch-tls:
5168

5269
Writing logs to Elasticsearch over TLS
@@ -55,6 +72,8 @@ Writing logs to Elasticsearch over TLS
5572
To add custom configurations to ElasticSearch (e.g. turning on ``ssl_verify``, adding a custom self-signed
5673
cert, etc.) use the ``elasticsearch_configs`` setting in your ``airflow.cfg``
5774

75+
Note that these configurations also apply when you enable writing logs to ElasticSearch
76+
5877
.. code-block:: ini
5978
6079
[logging]

docs/spelling_wordlist.txt

+1
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ fetchmany
633633
fetchone
634634
FieldMask
635635
Filebeat
636+
filebeat
636637
filehandle
637638
fileloc
638639
filelocs

providers/src/airflow/providers/elasticsearch/log/es_task_handler.py

+58-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919

2020
import contextlib
2121
import inspect
22+
import json
2223
import logging
24+
import os
25+
import pathlib
26+
import shutil
2327
import sys
2428
import time
2529
from collections import defaultdict
@@ -30,6 +34,7 @@
3034
# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
3135
import elasticsearch
3236
import pendulum
37+
from elasticsearch import helpers
3338
from elasticsearch.exceptions import NotFoundError
3439

3540
from airflow.configuration import conf
@@ -106,10 +111,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
106111
"""
107112
ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch.
108113
109-
Note that Airflow does not handle the indexing of logs into Elasticsearch. Instead,
114+
Note that Airflow by default does not handle the indexing of logs into Elasticsearch. Instead,
110115
Airflow flushes logs into local files. Additional software setup is required to index
111116
the logs into Elasticsearch, such as using Filebeat and Logstash.
112117
118+
Airflow can be configured to support directly writing logging to Elasticsearch. To enable this feature,
119+
set `json_format` and `write_to_es` to `True`.
120+
113121
To efficiently query and sort Elasticsearch results, this handler assumes each
114122
log message has a field `log_id` consists of ti primary keys:
115123
`log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
@@ -136,6 +144,8 @@ def __init__(
136144
write_stdout: bool,
137145
json_format: bool,
138146
json_fields: str,
147+
write_to_es: bool = False,
148+
target_index: str = "airflow-logs",
139149
host_field: str = "host",
140150
offset_field: str = "offset",
141151
host: str = "http://localhost:9200",
@@ -166,6 +176,11 @@ def __init__(
166176
self.index_patterns = index_patterns
167177
self.index_patterns_callable = index_patterns_callable
168178
self.context_set = False
179+
self.write_to_es = write_to_es
180+
self.target_index = target_index
181+
self.delete_local_copy = kwargs.get(
182+
"delete_local_copy", conf.getboolean("logging", "delete_local_logs")
183+
)
169184

170185
self.formatter: logging.Formatter
171186
self.handler: logging.FileHandler | logging.StreamHandler # type: ignore[assignment]
@@ -428,9 +443,11 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
428443
extras={
429444
"dag_id": str(ti.dag_id),
430445
"task_id": str(ti.task_id),
431-
date_key: self._clean_date(ti.logical_date)
432-
if AIRFLOW_V_3_0_PLUS
433-
else self._clean_date(ti.execution_date),
446+
date_key: (
447+
self._clean_date(ti.logical_date)
448+
if AIRFLOW_V_3_0_PLUS
449+
else self._clean_date(ti.execution_date)
450+
),
434451
"try_number": str(ti.try_number),
435452
"log_id": self._render_log_id(ti, ti.try_number),
436453
},
@@ -480,6 +497,18 @@ def close(self) -> None:
480497
self.handler.close()
481498
sys.stdout = sys.__stdout__
482499

500+
if self.write_to_es and not self.write_stdout:
501+
full_path = self.handler.baseFilename # type: ignore[union-attr]
502+
log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
503+
local_loc = os.path.join(self.local_base, log_relative_path)
504+
if os.path.exists(local_loc):
505+
# read log and remove old logs to get just the latest additions
506+
log = pathlib.Path(local_loc).read_text()
507+
log_lines = self._parse_raw_log(log)
508+
success = self._write_to_es(log_lines)
509+
if success and self.delete_local_copy:
510+
shutil.rmtree(os.path.dirname(local_loc))
511+
483512
super().close()
484513

485514
self.closed = True
@@ -599,6 +628,31 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
599628
callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class)
600629
return callback(hit)
601630

631+
def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
632+
logs = log.split("\n")
633+
parsed_logs = []
634+
for line in logs:
635+
# Make sure line is not empty
636+
if line.strip():
637+
parsed_logs.append(json.loads(line))
638+
639+
return parsed_logs
640+
641+
def _write_to_es(self, log_lines: list[dict[str, Any]]) -> bool:
642+
"""
643+
Write the log to ElasticSearch; return `True` or fails silently and return `False`.
644+
645+
:param log_lines: the log_lines to write to the ElasticSearch.
646+
"""
647+
# Prepare the bulk request for Elasticsearch
648+
bulk_actions = [{"_index": self.target_index, "_source": log} for log in log_lines]
649+
try:
650+
_ = helpers.bulk(self.client, bulk_actions)
651+
return True
652+
except Exception as e:
653+
self.log.exception("Unable to insert logs into Elasticsearch. Reason: %s", str(e))
654+
return False
655+
602656

603657
def getattr_nested(obj, item, default):
604658
"""

providers/src/airflow/providers/elasticsearch/provider.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,20 @@ config:
136136
type: string
137137
example: ~
138138
default: "False"
139+
write_to_es:
140+
description: |
141+
Write the task logs to the ElasticSearch
142+
version_added: 5.5.4
143+
type: string
144+
example: ~
145+
default: "False"
146+
target_index:
147+
description: |
148+
Name of the index to write to, when enabling writing the task logs to the ElasticSearch
149+
version_added: 5.5.4
150+
type: string
151+
example: ~
152+
default: "airflow-logs"
139153
json_format:
140154
description: |
141155
Instead of the default log formatter, write the log lines as JSON

providers/tests/elasticsearch/log/test_es_task_handler.py

+23-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ class TestElasticsearchTaskHandler:
8181
def ti(self, create_task_instance, create_log_template):
8282
create_log_template(
8383
self.FILENAME_TEMPLATE,
84-
"{dag_id}-{task_id}-{logical_date}-{try_number}"
85-
if AIRFLOW_V_3_0_PLUS
86-
else "{dag_id}-{task_id}-{execution_date}-{try_number}",
84+
(
85+
"{dag_id}-{task_id}-{logical_date}-{try_number}"
86+
if AIRFLOW_V_3_0_PLUS
87+
else "{dag_id}-{task_id}-{execution_date}-{try_number}"
88+
),
8789
)
8890
yield get_ti(
8991
dag_id=self.DAG_ID,
@@ -673,6 +675,24 @@ def test_filename_template_for_backward_compatibility(self):
673675
filename_template=None,
674676
)
675677

678+
def test_write_to_es(self, ti):
679+
self.es_task_handler.write_to_es = True
680+
self.es_task_handler.json_format = True
681+
self.es_task_handler.write_stdout = False
682+
self.es_task_handler.local_base = Path(os.getcwd()) / "local" / "log" / "location"
683+
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
684+
self.es_task_handler.formatter = formatter
685+
686+
self.es_task_handler.set_context(ti)
687+
with patch(
688+
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler._write_to_es"
689+
) as mock_write_to_es:
690+
mock_write = Mock(return_value=True)
691+
mock_write_to_es.return_value = mock_write
692+
self.es_task_handler._write_to_es = mock_write_to_es
693+
self.es_task_handler.close()
694+
mock_write_to_es.assert_called_once()
695+
676696

677697
def test_safe_attrgetter():
678698
class A: ...

0 commit comments

Comments
 (0)