From ec38ba9594395de04ec932481212a86fbe9ae107 Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Fri, 13 Apr 2018 11:09:50 +0200 Subject: [PATCH] [AIRFLOW-1325] Add ElasticSearch log handler and reader Closes #3214 from yrqls21/kevin_yang_add_es_task_handler --- .gitignore | 3 + LICENSE | 1 + airflow/bin/cli.py | 4 +- .../airflow_local_settings.py | 24 +- airflow/config_templates/default_airflow.cfg | 7 +- airflow/models.py | 4 +- airflow/utils/helpers.py | 9 + airflow/utils/log/es_task_handler.py | 183 +++++++++ airflow/utils/log/file_processor_handler.py | 10 +- airflow/utils/log/file_task_handler.py | 29 +- airflow/utils/log/gcs_task_handler.py | 8 +- airflow/utils/log/s3_task_handler.py | 8 +- airflow/utils/log/wasb_task_handler.py | 352 +++++++++--------- airflow/www/api/experimental/endpoints.py | 26 +- airflow/www/templates/airflow/ti_log.html | 159 ++++++-- airflow/www/views.py | 87 +++-- licenses/LICENSE-elasticmock.txt | 21 ++ setup.py | 7 +- tests/utils/log/elasticmock/__init__.py | 56 +++ .../log/elasticmock/fake_elasticsearch.py | 295 +++++++++++++++ .../log/elasticmock/utilities/__init__.py | 33 ++ tests/utils/log/test_es_task_handler.py | 255 +++++++++++++ tests/utils/log/test_s3_task_handler.py | 6 +- tests/utils/test_log_handlers.py | 15 +- tests/www/test_views.py | 19 +- 25 files changed, 1339 insertions(+), 282 deletions(-) create mode 100644 airflow/utils/log/es_task_handler.py create mode 100644 licenses/LICENSE-elasticmock.txt create mode 100644 tests/utils/log/elasticmock/__init__.py create mode 100644 tests/utils/log/elasticmock/fake_elasticsearch.py create mode 100644 tests/utils/log/elasticmock/utilities/__init__.py create mode 100644 tests/utils/log/test_es_task_handler.py diff --git a/.gitignore b/.gitignore index f5ed5adaa071c..5e7b24d8ba0b4 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ ENV/ # Spark rat-results.txt + +# Git stuff +.gitattributes diff --git a/LICENSE b/LICENSE index 540289ba3e530..405540c64fb87 100644 --- a/LICENSE +++ b/LICENSE @@ -225,6 +225,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (MIT License) Underscorejs (http://underscorejs.org) (MIT License) Bootstrap Toggle (http://www.bootstraptoggle.com) (MIT License) normalize.css (http://necolas.github.io/normalize.css/) + (MIT License) ElasticMock (https://github.com/vrcmarcos/elasticmock) ======================================================================== BSD 2-Clause licenses diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index eb96e77537645..52d5957ca321d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -457,10 +457,8 @@ def run(args, dag=None): if args.interactive: _run(args, dag, ti) else: - with redirect_stdout(ti.log, logging.INFO),\ - redirect_stderr(ti.log, logging.WARN): + with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): _run(args, dag, ti) - logging.shutdown() @cli_utils.action_logging diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 3086cf016589f..2ab343e4c5093 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -37,12 +37,19 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log' +LOG_ID_TEMPLATE = '{dag_id}-{task_id}-{execution_date}-{try_number}' + # Storage bucket url for remote logging # s3 buckets should start with "s3://" # gcs buckets should start with "gs://" -# wasb buckets should start with "wasb" just to help Airflow select correct handler +# wasb buckets should start with "wasb" +# just to help Airflow select correct handler REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') +ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') + +END_OF_LOG_MARK = 'end_of_log' + DEFAULT_LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': False, @@ -145,7 +152,18 @@ 'filename_template': PROCESSOR_FILENAME_TEMPLATE, 'delete_local_copy': False, }, - } + }, + 'elasticsearch': { + 'task': { + 'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'log_id_template': LOG_ID_TEMPLATE, + 'filename_template': FILENAME_TEMPLATE, + 'end_of_log_mark': END_OF_LOG_MARK, + 'host': ELASTICSEARCH_HOST, + }, + }, } REMOTE_LOGGING = conf.get('core', 'remote_logging') @@ -156,3 +174,5 @@ DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) +elif REMOTE_LOGGING and ELASTICSEARCH_HOST: + DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e8da82fc66353..1748c42a31bdb 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -35,8 +35,8 @@ dags_folder = {AIRFLOW_HOME}/dags # This path must be absolute base_log_folder = {AIRFLOW_HOME}/logs -# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users -# must supply an Airflow connection id that provides access to the storage +# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. +# Users must supply an Airflow connection id that provides access to the storage # location. If remote_logging is set to true, see UPDATING.md for additional # configuration requirements. remote_logging = False @@ -486,3 +486,6 @@ api_rev = v3 [admin] # UI to hide sensitive variable fields when set to True hide_sensitive_variable_fields = True + +[elasticsearch] +elasticsearch_host = diff --git a/airflow/models.py b/airflow/models.py index afcacd126b822..94061505304a8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -857,7 +857,7 @@ def __init__(self, task, execution_date, state=None): self.init_on_load() # Is this TaskInstance being currently running within `airflow run --raw`. # Not persisted to the database so only valid for the current process - self.is_raw = False + self.raw = False @reconstructor def init_on_load(self): @@ -1956,8 +1956,8 @@ def init_run_context(self, raw=False): """ Sets the log context. """ - self._set_context(self) self.raw = raw + self._set_context(self) class TaskFail(Base): diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 9e6a43990eab5..c28851b4627f8 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -31,6 +31,8 @@ import sys import warnings +from jinja2 import Template + from airflow import configuration from airflow.exceptions import AirflowException @@ -223,6 +225,13 @@ def on_terminate(p): log.error("Process %s (%s) could not be killed. Giving up.", p, p.pid) +def parse_template_string(template_string): + if "{{" in template_string: # jinja mode + return None, Template(template_string) + else: + return template_string, None + + class AirflowImporter(object): """ Importer that dynamically loads a class and module from its parent. This diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py new file mode 100644 index 0000000000000..76dacfe351692 --- /dev/null +++ b/airflow/utils/log/es_task_handler.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Using `from elasticsearch import *` would break elasticseach mocking used in unit test. +import elasticsearch +import pendulum +from elasticsearch_dsl import Search + +from airflow.utils import timezone +from airflow.utils.helpers import parse_template_string +from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.log.logging_mixin import LoggingMixin + + +class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): + PAGE = 0 + MAX_LINE_PER_PAGE = 1000 + + """ + ElasticsearchTaskHandler is a python log handler that + reads logs from Elasticsearch. Note logs are not directly + indexed into Elasticsearch. Instead, it flushes logs + into local files. Additional software setup is required + to index the log into Elasticsearch, such as using + Filebeat and Logstash. + To efficiently query and sort Elasticsearch results, we assume each + log message has a field `log_id` consists of ti primary keys: + `log_id = {dag_id}-{task_id}-{execution_date}-{try_number}` + Log messages with specific log_id are sorted based on `offset`, + which is a unique integer indicates log message's order. + Timestamp here are unreliable because multiple log messages + might have the same timestamp. + """ + + def __init__(self, base_log_folder, filename_template, + log_id_template, end_of_log_mark, + host='localhost:9200'): + """ + :param base_log_folder: base folder to store logs locally + :param log_id_template: log id template + :param host: Elasticsearch host name + """ + super(ElasticsearchTaskHandler, self).__init__( + base_log_folder, filename_template) + self.closed = False + + self.log_id_template, self.log_id_jinja_template = \ + parse_template_string(log_id_template) + + self.client = elasticsearch.Elasticsearch([host]) + + self.mark_end_on_close = True + self.end_of_log_mark = end_of_log_mark + + def _render_log_id(self, ti, try_number): + if self.log_id_jinja_template: + jinja_context = ti.get_template_context() + jinja_context['try_number'] = try_number + return self.log_id_jinja_template.render(**jinja_context) + + return self.log_id_template.format(dag_id=ti.dag_id, + task_id=ti.task_id, + execution_date=ti + .execution_date.isoformat(), + try_number=try_number) + + def _read(self, ti, try_number, metadata=None): + """ + Endpoint for streaming log. + :param ti: task instance object + :param try_number: try_number of the task instance + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + :return a list of log documents and metadata. + """ + if not metadata: + metadata = {'offset': 0} + if 'offset' not in metadata: + metadata['offset'] = 0 + + offset = metadata['offset'] + log_id = self._render_log_id(ti, try_number) + + logs = self.es_read(log_id, offset) + + next_offset = offset if not logs else logs[-1].offset + + metadata['offset'] = next_offset + # end_of_log_mark may contain characters like '\n' which is needed to + # have the log uploaded but will not be stored in elasticsearch. + metadata['end_of_log'] = False if not logs \ + else logs[-1].message == self.end_of_log_mark.strip() + + cur_ts = pendulum.now() + # Assume end of log after not receiving new log for 5 min, + # as executor heartbeat is 1 min and there might be some + # delay before Elasticsearch makes the log available. + if 'last_log_timestamp' in metadata: + last_log_ts = timezone.parse(metadata['last_log_timestamp']) + if cur_ts.diff(last_log_ts).in_minutes() >= 5: + metadata['end_of_log'] = True + + if offset != next_offset or 'last_log_timestamp' not in metadata: + metadata['last_log_timestamp'] = str(cur_ts) + + message = '\n'.join([log.message for log in logs]) + + return message, metadata + + def es_read(self, log_id, offset): + """ + Returns the logs matching log_id in Elasticsearch and next offset. + Returns '' if no log is found or there was an error. + :param log_id: the log_id of the log to read. + :type log_id: str + :param offset: the offset start to read log from. + :type offset: str + """ + + # Offset is the unique key for sorting logs given log_id. + s = Search(using=self.client) \ + .query('match', log_id=log_id) \ + .sort('offset') + + s = s.filter('range', offset={'gt': offset}) + + logs = [] + if s.count() != 0: + try: + + logs = s[self.MAX_LINE_PER_PAGE * self.PAGE:self.MAX_LINE_PER_PAGE] \ + .execute() + except Exception as e: + msg = 'Could not read log with log_id: {}, ' \ + 'error: {}'.format(log_id, str(e)) + self.log.exception(msg) + + return logs + + def set_context(self, ti): + super(ElasticsearchTaskHandler, self).set_context(ti) + self.mark_end_on_close = not ti.raw + + def close(self): + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + if not self.mark_end_on_close: + self.closed = True + return + + # Case which context of the handler was not set. + if self.handler is None: + self.closed = True + return + + # Reopen the file stream, because FileHandler.close() would be called + # first in logging.shutdown() and the stream in it would be set to None. + if self.handler.stream is None or self.handler.stream.closed: + self.handler.stream = self.handler._open() + + # Mark the end of file using end of log mark, + # so we know where to stop while auto-tailing. + self.handler.stream.write(self.end_of_log_mark) + + super(ElasticsearchTaskHandler, self).close() + + self.closed = True diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index f6d8d9360ec14..9897afc85b431 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -16,9 +16,8 @@ import logging import os -from jinja2 import Template - from airflow import configuration as conf +from airflow.utils.helpers import parse_template_string from datetime import datetime @@ -38,11 +37,8 @@ def __init__(self, base_log_folder, filename_template): self.handler = None self.base_log_folder = base_log_folder self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) - self.filename_template = filename_template - self.filename_jinja_template = None - - if "{{" in self.filename_template: #jinja mode - self.filename_jinja_template = Template(self.filename_template) + self.filename_template, self.filename_jinja_template = \ + parse_template_string(filename_template) self._cur_date = datetime.today() if not os.path.exists(self._get_log_directory()): diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index a67f4454239e4..adec5ab9b23c2 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -16,11 +16,10 @@ import os import requests -from jinja2 import Template - from airflow import configuration as conf from airflow.configuration import AirflowConfigException from airflow.utils.file import mkdirs +from airflow.utils.helpers import parse_template_string class FileTaskHandler(logging.Handler): @@ -39,11 +38,8 @@ def __init__(self, base_log_folder, filename_template): super(FileTaskHandler, self).__init__() self.handler = None self.local_base = base_log_folder - self.filename_template = filename_template - self.filename_jinja_template = None - - if "{{" in self.filename_template: #jinja mode - self.filename_jinja_template = Template(self.filename_template) + self.filename_template, self.filename_jinja_template = \ + parse_template_string(filename_template) def set_context(self, ti): """ @@ -78,13 +74,15 @@ def _render_filename(self, ti, try_number): execution_date=ti.execution_date.isoformat(), try_number=try_number) - def _read(self, ti, try_number): + def _read(self, ti, try_number, metadata=None): """ Template method that contains custom logic of reading logs given the try_number. :param ti: task instance record :param try_number: current try_number to read log from - :return: log message as a string + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + :return: log message as a string and metadata. """ # Task instance here might be different from task instance when # initializing the handler. Thus explicitly getting log location @@ -127,14 +125,16 @@ def _read(self, ti, try_number): except Exception as e: log += "*** Failed to fetch log file from worker. {}\n".format(str(e)) - return log + return log, {'end_of_log': True} - def read(self, task_instance, try_number=None): + def read(self, task_instance, try_number=None, metadata=None): """ Read logs of given task instance from local machine. :param task_instance: task instance object :param try_number: task instance try_number to read logs from. If None it returns all logs separated by try_number + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. :return: a list of logs """ # Task instance increments its try number when it starts to run. @@ -154,10 +154,13 @@ def read(self, task_instance, try_number=None): try_numbers = [try_number] logs = [''] * len(try_numbers) + metadatas = [{}] * len(try_numbers) for i, try_number in enumerate(try_numbers): - logs[i] += self._read(task_instance, try_number) + log, metadata = self._read(task_instance, try_number, metadata) + logs[i] += log + metadatas[i] = metadata - return logs + return logs, metadatas def _init_file(self, ti): """ diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 3b83c8cd1f0ea..e69d739252c78 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -60,7 +60,7 @@ def set_context(self, ti): # log path to upload log files into GCS and read from the # remote location. self.log_relative_path = self._render_filename(ti, ti.try_number) - self.upload_on_close = not ti.is_raw + self.upload_on_close = not ti.raw def close(self): """ @@ -89,12 +89,14 @@ def close(self): # Mark closed so we don't double write if close is called twice self.closed = True - def _read(self, ti, try_number): + def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from GCS. If failed, read the log from task instance host machine. :param ti: task instance object :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. """ # Explicitly getting log relative path is necessary as the given # task instance might be different than task instance passed in @@ -112,7 +114,7 @@ def _read(self, ti, try_number): self.log.error(log) log += super(GCSTaskHandler, self)._read(ti, try_number) - return log + return log, {'end_of_log': True} def gcs_read(self, remote_log_location): """ diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index b3acf3a62ecb6..31e8ef8f04aa0 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -55,7 +55,7 @@ def set_context(self, ti): # Local location and remote location is needed to open and # upload local log file to S3 remote storage. self.log_relative_path = self._render_filename(ti, ti.try_number) - self.upload_on_close = not ti.is_raw + self.upload_on_close = not ti.raw def close(self): """ @@ -84,12 +84,14 @@ def close(self): # Mark closed so we don't double write if close is called twice self.closed = True - def _read(self, ti, try_number): + def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from S3 remote storage. If failed, read the log from task instance host machine. :param ti: task instance object :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. """ # Explicitly getting log relative path is necessary as the given # task instance might be different than task instance passed in @@ -107,7 +109,7 @@ def _read(self, ti, try_number): else: log = super(S3TaskHandler, self)._read(ti, try_number) - return log + return log, {'end_of_log': True} def s3_log_exists(self, remote_log_location): """ diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py index 4784b10232474..dfb261c6d285b 100644 --- a/airflow/utils/log/wasb_task_handler.py +++ b/airflow/utils/log/wasb_task_handler.py @@ -1,175 +1,177 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import shutil - -from airflow import configuration -from airflow.contrib.hooks.wasb_hook import WasbHook -from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.log.file_task_handler import FileTaskHandler -from azure.common import AzureHttpError - - -class WasbTaskHandler(FileTaskHandler, LoggingMixin): - """ - WasbTaskHandler is a python log handler that handles and reads - task instance logs. It extends airflow FileTaskHandler and - uploads to and reads from Wasb remote storage. - """ - - def __init__(self, base_log_folder, wasb_log_folder, wasb_container, - filename_template, delete_local_copy): - super(WasbTaskHandler, self).__init__(base_log_folder, filename_template) - self.wasb_container = wasb_container - self.remote_base = wasb_log_folder - self.log_relative_path = '' - self._hook = None - self.closed = False - self.upload_on_close = True - self.delete_local_copy = delete_local_copy - - def _build_hook(self): - remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') - try: - return WasbHook(remote_conn_id) - except AzureHttpError: - self.log.error( - 'Could not create an WasbHook with connection id "%s". ' - 'Please make sure that airflow[azure] is installed and ' - 'the Wasb connection exists.', remote_conn_id - ) - - @property - def hook(self): - if self._hook is None: - self._hook = self._build_hook() - return self._hook - - def set_context(self, ti): - super(WasbTaskHandler, self).set_context(ti) - # Local location and remote location is needed to open and - # upload local log file to Wasb remote storage. - self.log_relative_path = self._render_filename(ti, ti.try_number) - self.upload_on_close = not ti.is_raw - - def close(self): - """ - Close and upload local log file to remote storage Wasb. - """ - # When application exit, system shuts down all handlers by - # calling close method. Here we check if logger is already - # closed to prevent uploading the log to remote storage multiple - # times when `logging.shutdown` is called. - if self.closed: - return - - super(WasbTaskHandler, self).close() - - if not self.upload_on_close: - return - - local_loc = os.path.join(self.local_base, self.log_relative_path) - remote_loc = os.path.join(self.remote_base, self.log_relative_path) - if os.path.exists(local_loc): - # read log and remove old logs to get just the latest additions - with open(local_loc, 'r') as logfile: - log = logfile.read() - self.wasb_write(log, remote_loc, append=True) - - if self.delete_local_copy: - shutil.rmtree(os.path.dirname(local_loc)) - # Mark closed so we don't double write if close is called twice - self.closed = True - - def _read(self, ti, try_number): - """ - Read logs of given task instance and try_number from Wasb remote storage. - If failed, read the log from task instance host machine. - :param ti: task instance object - :param try_number: task instance try_number to read logs from - """ - # Explicitly getting log relative path is necessary as the given - # task instance might be different than task instance passed in - # in set_context method. - log_relative_path = self._render_filename(ti, try_number) - remote_loc = os.path.join(self.remote_base, log_relative_path) - - if self.wasb_log_exists(remote_loc): - # If Wasb remote file exists, we do not fetch logs from task instance - # local machine even if there are errors reading remote logs, as - # returned remote_log will contain error messages. - remote_log = self.wasb_read(remote_loc, return_error=True) - log = '*** Reading remote log from {}.\n{}\n'.format( - remote_loc, remote_log) - else: - log = super(WasbTaskHandler, self)._read(ti, try_number) - - return log - - def wasb_log_exists(self, remote_log_location): - """ - Check if remote_log_location exists in remote storage - :param remote_log_location: log's location in remote storage - :return: True if location exists else False - """ - try: - return self.hook.check_for_blob(self.wasb_container, remote_log_location) - except Exception: - pass - return False - - def wasb_read(self, remote_log_location, return_error=False): - """ - Returns the log found at the remote_log_location. Returns '' if no - logs are found or there is an error. - :param remote_log_location: the log's location in remote storage - :type remote_log_location: string (path) - :param return_error: if True, returns a string error message if an - error occurs. Otherwise returns '' when an error occurs. - :type return_error: bool - """ - try: - return self.hook.read_file(self.wasb_container, remote_log_location) - except AzureHttpError: - msg = 'Could not read logs from {}'.format(remote_log_location) - self.log.exception(msg) - # return error if needed - if return_error: - return msg - - def wasb_write(self, log, remote_log_location, append=True): - """ - Writes the log to the remote_log_location. Fails silently if no hook - was created. - :param log: the log to write to the remote_log_location - :type log: string - :param remote_log_location: the log's location in remote storage - :type remote_log_location: string (path) - :param append: if False, any existing log file is overwritten. If True, - the new log is appended to any existing logs. - :type append: bool - """ - if append and self.wasb_log_exists(remote_log_location): - old_log = self.wasb_read(remote_log_location) - log = '\n'.join([old_log, log]) if old_log else log - - try: - self.hook.load_string( - log, - self.wasb_container, - remote_log_location, - ) - except AzureHttpError: - self.log.exception('Could not write logs to %s', - remote_log_location) +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import shutil + +from airflow import configuration +from airflow.contrib.hooks.wasb_hook import WasbHook +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.file_task_handler import FileTaskHandler +from azure.common import AzureHttpError + + +class WasbTaskHandler(FileTaskHandler, LoggingMixin): + """ + WasbTaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from Wasb remote storage. + """ + + def __init__(self, base_log_folder, wasb_log_folder, wasb_container, + filename_template, delete_local_copy): + super(WasbTaskHandler, self).__init__(base_log_folder, filename_template) + self.wasb_container = wasb_container + self.remote_base = wasb_log_folder + self.log_relative_path = '' + self._hook = None + self.closed = False + self.upload_on_close = True + self.delete_local_copy = delete_local_copy + + def _build_hook(self): + remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') + try: + return WasbHook(remote_conn_id) + except AzureHttpError: + self.log.error( + 'Could not create an WasbHook with connection id "%s". ' + 'Please make sure that airflow[azure] is installed and ' + 'the Wasb connection exists.', remote_conn_id + ) + + @property + def hook(self): + if self._hook is None: + self._hook = self._build_hook() + return self._hook + + def set_context(self, ti): + super(WasbTaskHandler, self).set_context(ti) + # Local location and remote location is needed to open and + # upload local log file to Wasb remote storage. + self.log_relative_path = self._render_filename(ti, ti.try_number) + self.upload_on_close = not ti.raw + + def close(self): + """ + Close and upload local log file to remote storage Wasb. + """ + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super(WasbTaskHandler, self).close() + + if not self.upload_on_close: + return + + local_loc = os.path.join(self.local_base, self.log_relative_path) + remote_loc = os.path.join(self.remote_base, self.log_relative_path) + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + with open(local_loc, 'r') as logfile: + log = logfile.read() + self.wasb_write(log, remote_loc, append=True) + + if self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) + # Mark closed so we don't double write if close is called twice + self.closed = True + + def _read(self, ti, try_number, metadata=None): + """ + Read logs of given task instance and try_number from Wasb remote storage. + If failed, read the log from task instance host machine. + :param ti: task instance object + :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + """ + # Explicitly getting log relative path is necessary as the given + # task instance might be different than task instance passed in + # in set_context method. + log_relative_path = self._render_filename(ti, try_number) + remote_loc = os.path.join(self.remote_base, log_relative_path) + + if self.wasb_log_exists(remote_loc): + # If Wasb remote file exists, we do not fetch logs from task instance + # local machine even if there are errors reading remote logs, as + # returned remote_log will contain error messages. + remote_log = self.wasb_read(remote_loc, return_error=True) + log = '*** Reading remote log from {}.\n{}\n'.format( + remote_loc, remote_log) + else: + log = super(WasbTaskHandler, self)._read(ti, try_number) + + return log, {'end_of_log': True} + + def wasb_log_exists(self, remote_log_location): + """ + Check if remote_log_location exists in remote storage + :param remote_log_location: log's location in remote storage + :return: True if location exists else False + """ + try: + return self.hook.check_for_blob(self.wasb_container, remote_log_location) + except Exception: + pass + return False + + def wasb_read(self, remote_log_location, return_error=False): + """ + Returns the log found at the remote_log_location. Returns '' if no + logs are found or there is an error. + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param return_error: if True, returns a string error message if an + error occurs. Otherwise returns '' when an error occurs. + :type return_error: bool + """ + try: + return self.hook.read_file(self.wasb_container, remote_log_location) + except AzureHttpError: + msg = 'Could not read logs from {}'.format(remote_log_location) + self.log.exception(msg) + # return error if needed + if return_error: + return msg + + def wasb_write(self, log, remote_log_location, append=True): + """ + Writes the log to the remote_log_location. Fails silently if no hook + was created. + :param log: the log to write to the remote_log_location + :type log: string + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param append: if False, any existing log file is overwritten. If True, + the new log is appended to any existing logs. + :type append: bool + """ + if append and self.wasb_log_exists(remote_log_location): + old_log = self.wasb_read(remote_log_location) + log = '\n'.join([old_log, log]) if old_log else log + + try: + self.hook.load_string( + log, + self.wasb_container, + remote_log_location, + ) + except AzureHttpError: + self.log.exception('Could not write logs to %s', + remote_log_location) diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index ec1ac5b4799e3..f72b5ecb16ba7 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -11,23 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import airflow.api +from flask import ( + g, Blueprint, jsonify, request, url_for +) +import airflow.api +from airflow.api.common.experimental import delete_dag as delete from airflow.api.common.experimental import pool as pool_api from airflow.api.common.experimental import trigger_dag as trigger -from airflow.api.common.experimental import delete_dag as delete from airflow.api.common.experimental.get_task import get_task from airflow.api.common.experimental.get_task_instance import get_task_instance from airflow.exceptions import AirflowException -from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils import timezone +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.www.app import csrf -from flask import ( - g, Markup, Blueprint, redirect, jsonify, abort, - request, current_app, send_file, url_for -) - _log = LoggingMixin().log requires_authentication = airflow.api.api_auth.requires_authentication @@ -63,8 +61,8 @@ def trigger_dag(dag_id): except ValueError: error_message = ( 'Given execution date, {}, could not be identified ' - 'as a date. Example date format: 2015-11-16T14:34:15+00:00' - .format(execution_date)) + 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( + execution_date)) _log.info(error_message) response = jsonify({'error': error_message}) response.status_code = 400 @@ -128,7 +126,9 @@ def task_info(dag_id, task_id): return jsonify(fields) -@api_experimental.route('/dags//dag_runs//tasks/', methods=['GET']) +@api_experimental.route( + '/dags//dag_runs//tasks/', + methods=['GET']) @requires_authentication def task_instance_info(dag_id, execution_date, task_id): """ @@ -144,8 +144,8 @@ def task_instance_info(dag_id, execution_date, task_id): except ValueError: error_message = ( 'Given execution date, {}, could not be identified ' - 'as a date. Example date format: 2015-11-16T14:34:15+00:00' - .format(execution_date)) + 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( + execution_date)) _log.info(error_message) response = jsonify({'error': error_message}) response.status_code = 400 diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html index 03c0ed3707f9a..2615c4520a779 100644 --- a/airflow/www/templates/airflow/ti_log.html +++ b/airflow/www/templates/airflow/ti_log.html @@ -1,40 +1,141 @@ {# - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. #} {% extends "airflow/task_instance.html" %} {% block title %}Airflow - DAGs{% endblock %} {% block body %} - {{ super() }} -

{{ title }}

- -
- {% for log in logs %} -
-
{{ log }}
-
- {% endfor %} +{{ super() }} +

{{ title }}

+ +
+ {% for log in logs %} +
+ spinner +
{{ log }}
+ {% endfor %} +
+{% endblock %} +{% block tail %} +{{ lib.form_js() }} +{{ super() }} + {% endblock %} diff --git a/airflow/www/views.py b/airflow/www/views.py index 83e567a66da11..799e1bdef13f9 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -39,8 +39,8 @@ from sqlalchemy import or_, desc, and_, union_all from flask import ( - abort, redirect, url_for, request, Markup, Response, current_app, render_template, - make_response) + abort, jsonify, redirect, url_for, request, Markup, Response, + current_app, render_template, make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView from flask_admin.actions import action @@ -69,7 +69,6 @@ from airflow import settings from airflow.api.common.experimental.mark_tasks import set_dag_run_state from airflow.exceptions import AirflowException -from airflow.settings import Session from airflow.models import XCom, DagRun from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS @@ -719,6 +718,66 @@ def rendered(self): form=form, title=title, ) + @expose('/get_logs_with_metadata') + @login_required + @wwwutils.action_logging + @provide_session + def get_logs_with_metadata(self, session=None): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + dttm = pendulum.parse(execution_date) + try_number = int(request.args.get('try_number')) + # metadata may be None + metadata = request.args.get('metadata') + if metadata: + metadata = json.loads(metadata) + + # Convert string datetime into actual datetime + try: + execution_date = timezone.parse(execution_date) + except ValueError: + error_message = ( + 'Given execution date, {}, could not be identified ' + 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( + execution_date)) + response = jsonify({'error': error_message}) + response.status_code = 400 + + return response + + logger = logging.getLogger('airflow.task') + task_log_reader = conf.get('core', 'task_log_reader') + handler = next((handler for handler in logger.handlers + if handler.name == task_log_reader), None) + + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, + models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() + try: + if ti is None: + logs = ["*** Task instance did not exist in the DB\n"] + metadata['end_of_log'] = True + else: + dag = dagbag.get_dag(dag_id) + ti.task = dag.get_task(ti.task_id) + logs, metadatas = handler.read(ti, try_number, metadata=metadata) + metadata = metadatas[0] + for i, log in enumerate(logs): + if PY2 and not isinstance(log, unicode): + logs[i] = log.decode('utf-8') + message = logs[0] + return jsonify(message=message, metadata=metadata) + except AttributeError as e: + error_message = ["Task log handler {} does not support read logs.\n{}\n" + .format(task_log_reader, str(e))] + metadata['end_of_log'] = True + return jsonify(message=error_message, error=True, metadata=metadata) + except AirflowException as e: + metadata['end_of_log'] = True + return jsonify(message=str(e), error=True, metadata=metadata) + @expose('/log') @login_required @wwwutils.action_logging @@ -730,31 +789,17 @@ def log(self, session=None): dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) + ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, models.TaskInstance.execution_date == dttm).first() - if ti is None: - logs = ["*** Task instance did not exist in the DB\n"] - else: - logger = logging.getLogger('airflow.task') - task_log_reader = conf.get('core', 'task_log_reader') - handler = next((handler for handler in logger.handlers - if handler.name == task_log_reader), None) - try: - ti.task = dag.get_task(ti.task_id) - logs = handler.read(ti) - except AttributeError as e: - logs = ["Task log handler {} does not support read logs.\n{}\n" \ - .format(task_log_reader, str(e))] - - for i, log in enumerate(logs): - if PY2 and not isinstance(log, unicode): - logs[i] = log.decode('utf-8') + logs = [''] * (ti.next_try_number - 1 if ti is not None else 0) return self.render( 'airflow/ti_log.html', - logs=logs, dag=dag, title="Log by attempts", task_id=task_id, + logs=logs, dag=dag, title="Log by attempts", + dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date, form=form) @expose('/task') diff --git a/licenses/LICENSE-elasticmock.txt b/licenses/LICENSE-elasticmock.txt new file mode 100644 index 0000000000000..ea757fb172c9e --- /dev/null +++ b/licenses/LICENSE-elasticmock.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Marcos Cardoso + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/setup.py b/setup.py index 71a911374896f..538cb2c2549ef 100644 --- a/setup.py +++ b/setup.py @@ -119,6 +119,10 @@ def write_version(filename=os.path.join(*['airflow', ] docker = ['docker-py>=1.6.0'] druid = ['pydruid>=0.4.1'] +elasticsearch = [ + 'elasticsearch>=5.0.0,<6.0.0', + 'elasticsearch-dsl>=5.0.0,<6.0.0' +] emr = ['boto3>=1.0.0'] gcp_api = [ 'httplib2', @@ -193,7 +197,7 @@ def write_version(filename=os.path.join(*['airflow', devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + - druid + snowflake) + druid + snowflake + elasticsearch) # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'( if PY3: @@ -274,6 +278,7 @@ def do_setup(): 'doc': doc, 'docker': docker, 'druid': druid, + 'elasticsearch': elasticsearch, 'emr': emr, 'gcp_api': gcp_api, 'github_enterprise': github_enterprise, diff --git a/tests/utils/log/elasticmock/__init__.py b/tests/utils/log/elasticmock/__init__.py new file mode 100644 index 0000000000000..73c4879847138 --- /dev/null +++ b/tests/utils/log/elasticmock/__init__.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (c) 2016 Marcos Cardoso +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from functools import wraps + +from elasticsearch.client import _normalize_hosts +from mock import patch + +from .fake_elasticsearch import FakeElasticsearch + +ELASTIC_INSTANCES = {} + + +def _get_elasticmock(hosts=None, *args, **kwargs): + host = _normalize_hosts(hosts)[0] + elastic_key = '{0}:{1}'.format( + host.get('host', 'localhost'), host.get('port', 9200) + ) + + if elastic_key in ELASTIC_INSTANCES: + connection = ELASTIC_INSTANCES.get(elastic_key) + else: + connection = FakeElasticsearch() + ELASTIC_INSTANCES[elastic_key] = connection + return connection + + +def elasticmock(f): + @wraps(f) + def decorated(*args, **kwargs): + ELASTIC_INSTANCES.clear() + with patch('elasticsearch.Elasticsearch', _get_elasticmock): + result = f(*args, **kwargs) + return result + return decorated diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py b/tests/utils/log/elasticmock/fake_elasticsearch.py new file mode 100644 index 0000000000000..0e29e91bb787f --- /dev/null +++ b/tests/utils/log/elasticmock/fake_elasticsearch.py @@ -0,0 +1,295 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (c) 2016 Marcos Cardoso +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import json +import six + +from elasticsearch import Elasticsearch +from elasticsearch.client.utils import query_params +from elasticsearch.exceptions import NotFoundError + +from .utilities import get_random_id + + +class FakeElasticsearch(Elasticsearch): + __documents_dict = None + + def __init__(self): + self.__documents_dict = {} + + @query_params() + def ping(self, params=None): + return True + + @query_params() + def info(self, params=None): + return { + 'status': 200, + 'cluster_name': 'elasticmock', + 'version': + { + 'lucene_version': '4.10.4', + 'build_hash': '00f95f4ffca6de89d68b7ccaf80d148f1f70e4d4', + 'number': '1.7.5', + 'build_timestamp': '2016-02-02T09:55:30Z', + 'build_snapshot': False + }, + 'name': 'Nightwatch', + 'tagline': 'You Know, for Search' + } + + @query_params('consistency', 'op_type', 'parent', 'refresh', 'replication', + 'routing', 'timeout', 'timestamp', 'ttl', 'version', 'version_type') + def index(self, index, doc_type, body, id=None, params=None): + if index not in self.__documents_dict: + self.__documents_dict[index] = list() + + if id is None: + id = get_random_id() + + version = 1 + + self.__documents_dict[index].append({ + '_type': doc_type, + '_id': id, + '_source': body, + '_index': index, + '_version': version + }) + + return { + '_type': doc_type, + '_id': id, + 'created': True, + '_version': version, + '_index': index + } + + @query_params('parent', 'preference', 'realtime', 'refresh', 'routing') + def exists(self, index, doc_type, id, params=None): + result = False + if index in self.__documents_dict: + for document in self.__documents_dict[index]: + if document.get('_id') == id and document.get('_type') == doc_type: + result = True + break + return result + + @query_params('_source', '_source_exclude', '_source_include', 'fields', + 'parent', 'preference', 'realtime', 'refresh', 'routing', 'version', + 'version_type') + def get(self, index, id, doc_type='_all', params=None): + result = None + if index in self.__documents_dict: + for document in self.__documents_dict[index]: + if document.get('_id') == id: + if doc_type == '_all': + result = document + break + else: + if document.get('_type') == doc_type: + result = document + break + + if result: + result['found'] = True + else: + error_data = { + '_index': index, + '_type': doc_type, + '_id': id, + 'found': False + } + raise NotFoundError(404, json.dumps(error_data)) + + return result + + @query_params('_source', '_source_exclude', '_source_include', 'parent', + 'preference', 'realtime', 'refresh', 'routing', 'version', + 'version_type') + def get_source(self, index, doc_type, id, params=None): + document = self.get(index=index, doc_type=doc_type, id=id, params=params) + return document.get('_source') + + @query_params('_source', '_source_exclude', '_source_include', + 'allow_no_indices', 'analyze_wildcard', 'analyzer', 'default_operator', + 'df', 'expand_wildcards', 'explain', 'fielddata_fields', 'fields', + 'from_', 'ignore_unavailable', 'lenient', 'lowercase_expanded_terms', + 'preference', 'q', 'request_cache', 'routing', 'scroll', 'search_type', + 'size', 'sort', 'stats', 'suggest_field', 'suggest_mode', + 'suggest_size', 'suggest_text', 'terminate_after', 'timeout', + 'track_scores', 'version') + def count(self, index=None, doc_type=None, body=None, params=None): + searchable_indexes = self._normalize_index_to_list(index) + searchable_doc_types = self._normalize_doc_type_to_list(doc_type) + + i = 0 + for searchable_index in searchable_indexes: + for document in self.__documents_dict[searchable_index]: + if searchable_doc_types\ + and document.get('_type') not in searchable_doc_types: + continue + i += 1 + result = { + 'count': i, + '_shards': { + 'successful': 1, + 'failed': 0, + 'total': 1 + } + } + + return result + + @query_params('_source', '_source_exclude', '_source_include', + 'allow_no_indices', 'analyze_wildcard', 'analyzer', 'default_operator', + 'df', 'expand_wildcards', 'explain', 'fielddata_fields', 'fields', + 'from_', 'ignore_unavailable', 'lenient', 'lowercase_expanded_terms', + 'preference', 'q', 'request_cache', 'routing', 'scroll', 'search_type', + 'size', 'sort', 'stats', 'suggest_field', 'suggest_mode', + 'suggest_size', 'suggest_text', 'terminate_after', 'timeout', + 'track_scores', 'version') + def search(self, index=None, doc_type=None, body=None, params=None): + searchable_indexes = self._normalize_index_to_list(index) + searchable_doc_types = self._normalize_doc_type_to_list(doc_type) + + matches = [] + for searchable_index in searchable_indexes: + for document in self.__documents_dict[searchable_index]: + if searchable_doc_types\ + and document.get('_type') not in searchable_doc_types: + continue + matches.append(document) + + result = { + 'hits': { + 'total': len(matches), + 'max_score': 1.0 + }, + '_shards': { + # Simulate indexes with 1 shard each + 'successful': len(searchable_indexes), + 'failed': 0, + 'total': len(searchable_indexes) + }, + 'took': 1, + 'timed_out': False + } + + hits = [] + for match in matches: + match['_score'] = 1.0 + hits.append(match) + result['hits']['hits'] = hits + + return result + + @query_params('consistency', 'parent', 'refresh', 'replication', 'routing', + 'timeout', 'version', 'version_type') + def delete(self, index, doc_type, id, params=None): + + found = False + + if index in self.__documents_dict: + for document in self.__documents_dict[index]: + if document.get('_type') == doc_type and document.get('_id') == id: + found = True + self.__documents_dict[index].remove(document) + break + + result_dict = { + 'found': found, + '_index': index, + '_type': doc_type, + '_id': id, + '_version': 1, + } + + if found: + return result_dict + else: + raise NotFoundError(404, json.dumps(result_dict)) + + @query_params('allow_no_indices', 'expand_wildcards', 'ignore_unavailable', + 'preference', 'routing') + def suggest(self, body, index=None, params=None): + if index is not None and index not in self.__documents_dict: + raise NotFoundError(404, 'IndexMissingException[[{0}] missing]'.format(index)) + + result_dict = {} + for key, value in body.items(): + text = value.get('text') + suggestion = int(text) + 1 if isinstance(text, int) \ + else '{0}_suggestion'.format(text) + result_dict[key] = [ + { + 'text': text, + 'length': 1, + 'options': [ + { + 'text': suggestion, + 'freq': 1, + 'score': 1.0 + } + ], + 'offset': 0 + } + ] + return result_dict + + def _normalize_index_to_list(self, index): + # Ensure to have a list of index + if index is None: + searchable_indexes = self.__documents_dict.keys() + elif isinstance(index, six.string_types): + searchable_indexes = [index] + elif isinstance(index, list): + searchable_indexes = index + else: + # Is it the correct exception to use ? + raise ValueError("Invalid param 'index'") + + # Check index(es) exists + for searchable_index in searchable_indexes: + if searchable_index not in self.__documents_dict: + raise NotFoundError(404, + 'IndexMissingException[[{0}] missing]' + .format(searchable_index)) + + return searchable_indexes + + @staticmethod + def _normalize_doc_type_to_list(doc_type): + # Ensure to have a list of index + if doc_type is None: + searchable_doc_types = [] + elif isinstance(doc_type, six.string_types): + searchable_doc_types = [doc_type] + elif isinstance(doc_type, list): + searchable_doc_types = doc_type + else: + # Is it the correct exception to use ? + raise ValueError("Invalid param 'index'") + + return searchable_doc_types diff --git a/tests/utils/log/elasticmock/utilities/__init__.py b/tests/utils/log/elasticmock/utilities/__init__.py new file mode 100644 index 0000000000000..19438ba1c5e46 --- /dev/null +++ b/tests/utils/log/elasticmock/utilities/__init__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (c) 2016 Marcos Cardoso +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import random +import string + +DEFAULT_ELASTICSEARCH_ID_SIZE = 20 +CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits + + +def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE): + return ''.join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in range(size)) diff --git a/tests/utils/log/test_es_task_handler.py b/tests/utils/log/test_es_task_handler.py new file mode 100644 index 0000000000000..cd987175a0677 --- /dev/null +++ b/tests/utils/log/test_es_task_handler.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import unittest + +import elasticsearch +import mock +import pendulum + +from airflow import configuration +from airflow.models import TaskInstance, DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils import timezone +from airflow.utils.log.es_task_handler import ElasticsearchTaskHandler +from airflow.utils.state import State +from airflow.utils.timezone import datetime +from .elasticmock import elasticmock + + +class TestElasticsearchTaskHandler(unittest.TestCase): + DAG_ID = 'dag_for_testing_file_task_handler' + TASK_ID = 'task_for_testing_file_log_handler' + EXECUTION_DATE = datetime(2016, 1, 1) + LOG_ID = 'dag_for_testing_file_task_handler-task_for_testing' \ + '_file_log_handler-2016-01-01T00:00:00+00:00-1' + + @elasticmock + def setUp(self): + super(TestElasticsearchTaskHandler, self).setUp() + self.local_log_location = 'local/log/location' + self.filename_template = '{try_number}.log' + self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}' + self.end_of_log_mark = 'end_of_log\n' + self.es_task_handler = ElasticsearchTaskHandler( + self.local_log_location, + self.filename_template, + self.log_id_template, + self.end_of_log_mark + ) + + self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}]) + self.index_name = 'test_index' + self.doc_type = 'log' + self.test_message = 'some random stuff' + self.body = {'message': self.test_message, 'log_id': self.LOG_ID, + 'offset': 1} + + self.es.index(index=self.index_name, doc_type=self.doc_type, + body=self.body, id=1) + + configuration.load_test_config() + self.dag = DAG(self.DAG_ID, start_date=self.EXECUTION_DATE) + task = DummyOperator(task_id=self.TASK_ID, dag=self.dag) + self.ti = TaskInstance(task=task, execution_date=self.EXECUTION_DATE) + self.ti.try_number = 1 + self.ti.state = State.RUNNING + self.addCleanup(self.dag.clear) + + def tearDown(self): + shutil.rmtree(self.local_log_location.split(os.path.sep)[0], ignore_errors=True) + + def test_client(self): + self.assertIsInstance(self.es_task_handler.client, elasticsearch.Elasticsearch) + + def test_read(self): + ts = pendulum.now() + logs, metadatas = self.es_task_handler.read(self.ti, + 1, + {'offset': 0, + 'last_log_timestamp': str(ts), + 'end_of_log': False}) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual(self.test_message, logs[0]) + self.assertFalse(metadatas[0]['end_of_log']) + self.assertEqual(1, metadatas[0]['offset']) + self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts) + + def test_read_with_none_meatadata(self): + logs, metadatas = self.es_task_handler.read(self.ti, 1) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual(self.test_message, logs[0]) + self.assertFalse(metadatas[0]['end_of_log']) + self.assertEqual(1, metadatas[0]['offset']) + self.assertTrue( + timezone.parse(metadatas[0]['last_log_timestamp']) < pendulum.now()) + + def test_read_nonexistent_log(self): + ts = pendulum.now() + # In ElasticMock, search is going to return all documents with matching index + # and doc_type regardless of match filters, so we delete the log entry instead + # of making a new TaskInstance to query. + self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) + logs, metadatas = self.es_task_handler.read(self.ti, + 1, + {'offset': 0, + 'last_log_timestamp': str(ts), + 'end_of_log': False}) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual([''], logs) + self.assertFalse(metadatas[0]['end_of_log']) + self.assertEqual(0, metadatas[0]['offset']) + # last_log_timestamp won't change if no log lines read. + self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts) + + def test_read_with_empty_metadata(self): + ts = pendulum.now() + logs, metadatas = self.es_task_handler.read(self.ti, 1, {}) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual(self.test_message, logs[0]) + self.assertFalse(metadatas[0]['end_of_log']) + # offset should be initialized to 0 if not provided. + self.assertEqual(1, metadatas[0]['offset']) + # last_log_timestamp will be initialized using log reading time + # if not last_log_timestamp is provided. + self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts) + + # case where offset is missing but metadata not empty. + self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) + logs, metadatas = self.es_task_handler.read(self.ti, 1, {'end_of_log': False}) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual([''], logs) + self.assertFalse(metadatas[0]['end_of_log']) + # offset should be initialized to 0 if not provided. + self.assertEqual(0, metadatas[0]['offset']) + # last_log_timestamp will be initialized using log reading time + # if not last_log_timestamp is provided. + self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > ts) + + def test_read_timeout(self): + ts = pendulum.now().subtract(minutes=5) + + self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) + logs, metadatas = self.es_task_handler.read(self.ti, + 1, + {'offset': 0, + 'last_log_timestamp': str(ts), + 'end_of_log': False}) + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual([''], logs) + self.assertTrue(metadatas[0]['end_of_log']) + # offset should be initialized to 0 if not provided. + self.assertEqual(0, metadatas[0]['offset']) + self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) == ts) + + def test_read_raises(self): + with mock.patch.object(self.es_task_handler.log, 'exception') as mock_exception: + with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute: + mock_execute.side_effect = Exception('Failed to read') + logs, metadatas = self.es_task_handler.read(self.ti, 1) + msg = "Could not read log with log_id: {}".format(self.LOG_ID) + mock_exception.assert_called_once() + args, kwargs = mock_exception.call_args + self.assertIn(msg, args[0]) + + self.assertEqual(1, len(logs)) + self.assertEqual(len(logs), len(metadatas)) + self.assertEqual([''], logs) + self.assertFalse(metadatas[0]['end_of_log']) + self.assertEqual(0, metadatas[0]['offset']) + + def test_set_context(self): + self.es_task_handler.set_context(self.ti) + self.assertTrue(self.es_task_handler.mark_end_on_close) + + def test_close(self): + self.es_task_handler.set_context(self.ti) + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertIn(self.end_of_log_mark, log_file.read()) + self.assertTrue(self.es_task_handler.closed) + + def test_close_no_mark_end(self): + self.ti.raw = True + self.es_task_handler.set_context(self.ti) + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertNotIn(self.end_of_log_mark, log_file.read()) + self.assertTrue(self.es_task_handler.closed) + + def test_close_closed(self): + self.es_task_handler.closed = True + self.es_task_handler.set_context(self.ti) + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertEqual(0, len(log_file.read())) + + def test_close_with_no_handler(self): + self.es_task_handler.set_context(self.ti) + self.es_task_handler.handler = None + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertEqual(0, len(log_file.read())) + self.assertTrue(self.es_task_handler.closed) + + def test_close_with_no_stream(self): + self.es_task_handler.set_context(self.ti) + self.es_task_handler.handler.stream = None + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertIn(self.end_of_log_mark, log_file.read()) + self.assertTrue(self.es_task_handler.closed) + + self.es_task_handler.set_context(self.ti) + self.es_task_handler.handler.stream.close() + self.es_task_handler.close() + with open(os.path.join(self.local_log_location, + self.filename_template.format(try_number=1)), + 'r') as log_file: + self.assertIn(self.end_of_log_mark, log_file.read()) + self.assertTrue(self.es_task_handler.closed) + + def test_render_log_id(self): + expected_log_id = 'dag_for_testing_file_task_handler-' \ + 'task_for_testing_file_log_handler-2016-01-01T00:00:00+00:00-1' + log_id = self.es_task_handler._render_log_id(self.ti, 1) + self.assertEqual(expected_log_id, log_id) + + # Switch to use jinja template. + self.es_task_handler = ElasticsearchTaskHandler( + self.local_log_location, + self.filename_template, + '{{ ti.dag_id }}-{{ ti.task_id }}-{{ ts }}-{{ try_number }}', + self.end_of_log_mark + ) + log_id = self.es_task_handler._render_log_id(self.ti, 1) + self.assertEqual(expected_log_id, log_id) diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index 53c1e36cc35c4..590802213a5cb 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -108,8 +108,8 @@ def test_read(self): self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n') self.assertEqual( self.s3_task_handler.read(self.ti), - ['*** Reading remote log from s3://bucket/remote/log/location/1.log.\n' - 'Log line\n\n'] + (['*** Reading remote log from s3://bucket/remote/log/location/1.log.\n' + 'Log line\n\n'], [{'end_of_log': True}]) ) def test_read_raises_return_error(self): @@ -155,7 +155,7 @@ def test_close(self): boto3.resource('s3').Object('bucket', self.remote_log_key).get() def test_close_no_upload(self): - self.ti.is_raw = True + self.ti.raw = True self.s3_task_handler.set_context(self.ti) self.assertFalse(self.s3_task_handler.upload_on_close) self.s3_task_handler.close() diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index b4a9ae22dbac0..6759074404563 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -91,10 +91,13 @@ def task_callable(ti, **kwargs): file_handler.close() self.assertTrue(hasattr(file_handler, 'read')) - # Return value of read must be a list. - logs = file_handler.read(ti) + # Return value of read must be a tuple of list and list. + logs, metadatas = file_handler.read(ti) self.assertTrue(isinstance(logs, list)) + self.assertTrue(isinstance(metadatas, list)) self.assertEqual(len(logs), 1) + self.assertEqual(len(logs), len(metadatas)) + self.assertTrue(isinstance(metadatas[0], dict)) target_re = r'\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n' # We should expect our log line from the callable above to appear in @@ -139,11 +142,15 @@ def task_callable(ti, **kwargs): logger.info("Test") - # Return value of read must be a list. - logs = file_handler.read(ti) + # Return value of read must be a tuple of list and list. + logs, metadatas = file_handler.read(ti) self.assertTrue(isinstance(logs, list)) # Logs for running tasks should show up too. + self.assertTrue(isinstance(logs, list)) + self.assertTrue(isinstance(metadatas, list)) self.assertEqual(len(logs), 2) + self.assertEqual(len(logs), len(metadatas)) + self.assertTrue(isinstance(metadatas[0], dict)) # Remove the generated tmp log file. os.remove(log_filename) diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 9461f9fae7274..b674352d94789 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -20,7 +20,9 @@ import tempfile import unittest import sys +import json +from urllib.parse import quote_plus from werkzeug.test import Client from airflow import models, configuration, settings @@ -377,9 +379,24 @@ def test_get_file_task_log(self): follow_redirects=True, ) self.assertEqual(response.status_code, 200) - self.assertIn('Log file does not exist', + self.assertIn('Log by attempts', response.data.decode('utf-8')) + def test_get_logs_with_metadata(self): + url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}&metadata={}" + response = \ + self.app.get(url_template.format(self.DAG_ID, + self.TASK_ID, + quote_plus(self.DEFAULT_DATE.isoformat()), + 1, + json.dumps({}))) + + self.assertIn('"message":', response.data.decode('utf-8')) + self.assertIn('"metadata":', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + class TestVarImportView(unittest.TestCase):