diff --git a/airflow/models.py b/airflow/models.py index e979b076fc931..3e296eb58b1ec 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -829,6 +829,10 @@ def try_number(self): def try_number(self, value): self._try_number = value + @property + def next_try_number(self): + return self._try_number + 1 + def command( self, mark_success=False, diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 9a8061ad2d553..2e40ce8e139b8 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -139,12 +139,14 @@ def read(self, task_instance, try_number=None): # So the log for a particular task try will only show up when # try number gets incremented in DB, i.e logs produced the time # after cli run and before try_number + 1 in DB will not be displayed. - next_try = task_instance.try_number if try_number is None: + next_try = task_instance.next_try_number try_numbers = list(range(1, next_try)) elif try_number < 1: - logs = ['Error fetching the logs. Try number {} is invalid.'.format(try_number)] + logs = [ + 'Error fetching the logs. Try number {} is invalid.'.format(try_number), + ] return logs else: try_numbers = [try_number] diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index e64ed7c53c1f7..3e94d319701ab 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -26,6 +26,7 @@ from airflow.settings import Session from airflow.utils.log.logging_mixin import set_context from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.state import State DEFAULT_DATE = datetime(2016, 1, 1) TASK_LOGGER = 'airflow.task' @@ -110,6 +111,45 @@ def task_callable(ti, **kwargs): # Remove the generated tmp log file. os.remove(log_filename) + def test_file_task_handler_running(self): + def task_callable(ti, **kwargs): + ti.log.info("test") + dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) + task = PythonOperator( + task_id='task_for_testing_file_log_handler', + dag=dag, + python_callable=task_callable, + provide_context=True + ) + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.try_number = 2 + ti.state = State.RUNNING + + logger = ti.log + ti.log.disabled = False + + file_handler = next((handler for handler in logger.handlers + if handler.name == FILE_TASK_HANDLER), None) + self.assertIsNotNone(file_handler) + + set_context(logger, ti) + self.assertIsNotNone(file_handler.handler) + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + self.assertTrue(os.path.isfile(log_filename)) + self.assertTrue(log_filename.endswith("2.log"), log_filename) + + logger.info("Test") + + # Return value of read must be a list. + logs = file_handler.read(ti) + self.assertTrue(isinstance(logs, list)) + # Logs for running tasks should show up too. + self.assertEqual(len(logs), 2) + + # Remove the generated tmp log file. + os.remove(log_filename) + class TestFilenameRendering(unittest.TestCase):