diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f568d5dc8ea95..a8543d38a9f30 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -59,7 +59,6 @@
from sqlalchemy import func
from sqlalchemy.orm import exc
-
api.load_auth()
api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
@@ -316,7 +315,7 @@ def run(args, dag=None):
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
- conf_dict = json.load(conf_file)
+ conf_dict = json.load(conf_file)
if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)
@@ -327,6 +326,21 @@ def run(args, dag=None):
settings.configure_vars()
settings.configure_orm()
+ if not args.pickle and not dag:
+ dag = get_dag(args)
+ elif not dag:
+ session = settings.Session()
+ logging.info('Loading pickle id {args.pickle}'.format(args=args))
+ dag_pickle = session.query(
+ DagPickle).filter(DagPickle.id == args.pickle).first()
+ if not dag_pickle:
+ raise AirflowException("Who hid the pickle!? [missing pickle]")
+ dag = dag_pickle.pickle
+
+ task = dag.get_task(task_id=args.task_id)
+ ti = TaskInstance(task, args.execution_date)
+ ti.refresh_from_db()
+
logging.root.handlers = []
if args.raw:
# Output to STDOUT for the parent process to read and log
@@ -350,19 +364,23 @@ def run(args, dag=None):
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
+ try_number = ti.try_number
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
- directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
+ log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id,
+ args.execution_date)
+ directory = os.path.join(log_base, log_relative_dir)
# Create the log file and give it group writable permissions
# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
- if not os.path.exists(directory):
+ if not os.path.isdir(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o775)
- iso = args.execution_date.isoformat()
- filename = "{directory}/{iso}".format(**locals())
+ log_relative = logging_utils.get_log_filename(
+ args.dag_id, args.task_id, args.execution_date, try_number)
+ filename = os.path.join(log_base, log_relative)
if not os.path.exists(filename):
open(filename, "a").close()
@@ -376,21 +394,6 @@ def run(args, dag=None):
hostname = socket.getfqdn()
logging.info("Running on host {}".format(hostname))
- if not args.pickle and not dag:
- dag = get_dag(args)
- elif not dag:
- session = settings.Session()
- logging.info('Loading pickle id {args.pickle}'.format(**locals()))
- dag_pickle = session.query(
- DagPickle).filter(DagPickle.id == args.pickle).first()
- if not dag_pickle:
- raise AirflowException("Who hid the pickle!? [missing pickle]")
- dag = dag_pickle.pickle
- task = dag.get_task(task_id=args.task_id)
-
- ti = TaskInstance(task, args.execution_date)
- ti.refresh_from_db()
-
if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
@@ -424,8 +427,8 @@ def run(args, dag=None):
session.commit()
pickle_id = pickle.id
print((
- 'Pickled dag {dag} '
- 'as pickle_id:{pickle_id}').format(**locals()))
+ 'Pickled dag {dag} '
+ 'as pickle_id:{pickle_id}').format(**locals()))
except Exception as e:
print('Could not pickle the DAG')
print(e)
@@ -475,7 +478,8 @@ def run(args, dag=None):
with open(filename, 'r') as logfile:
log = logfile.read()
- remote_log_location = filename.replace(log_base, remote_base)
+ remote_log_location = os.path.join(remote_base, log_relative)
+ logging.debug("Uploading to remote log location {}".format(remote_log_location))
# S3
if remote_base.startswith('s3:/'):
logging_utils.S3Log().write(log, remote_log_location)
@@ -669,10 +673,10 @@ def start_refresh(gunicorn_master_proc):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
wait_until_true(lambda: num_workers_expected ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
while True:
num_workers_running = get_num_workers_running(gunicorn_master_proc)
@@ -695,7 +699,7 @@ def start_refresh(gunicorn_master_proc):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
excess -= 1
wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
@@ -887,6 +891,7 @@ def serve_logs(filename): # noqa
filename,
mimetype="application/json",
as_attachment=False)
+
WORKER_LOG_SERVER_PORT = \
int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
flask_app.run(
@@ -947,8 +952,8 @@ def initdb(args): # noqa
def resetdb(args):
print("DB: " + repr(settings.engine.url))
if args.yes or input(
- "This will drop existing tables if they exist. "
- "Proceed? (y/n)").upper() == "Y":
+ "This will drop existing tables if they exist. "
+ "Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
db_utils.resetdb()
@@ -966,7 +971,7 @@ def upgradedb(args): # noqa
if not ds_rows:
qry = (
session.query(DagRun.dag_id, DagRun.state, func.count('*'))
- .group_by(DagRun.dag_id, DagRun.state)
+ .group_by(DagRun.dag_id, DagRun.state)
)
for dag_id, state, count in qry:
session.add(DagStat(dag_id=dag_id, state=state, count=count))
@@ -1065,8 +1070,8 @@ def connections(args):
session = settings.Session()
if not (session
- .query(Connection)
- .filter(Connection.conn_id == new_conn.conn_id).first()):
+ .query(Connection)
+ .filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
@@ -1168,16 +1173,16 @@ class CLIFactory(object):
'dry_run': Arg(
("-dr", "--dry_run"), "Perform a dry run", "store_true"),
'pid': Arg(
- ("--pid", ), "PID file location",
+ ("--pid",), "PID file location",
nargs='?'),
'daemon': Arg(
("-D", "--daemon"), "Daemonize instead of running "
"in the foreground",
"store_true"),
'stderr': Arg(
- ("--stderr", ), "Redirect stderr to this file"),
+ ("--stderr",), "Redirect stderr to this file"),
'stdout': Arg(
- ("--stdout", ), "Redirect stdout to this file"),
+ ("--stdout",), "Redirect stdout to this file"),
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),
@@ -1333,7 +1338,7 @@ class CLIFactory(object):
"Serialized pickle object of the entire dag (used internally)"),
'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
'cfg_path': Arg(
- ("--cfg_path", ), "Path to config file to use instead of airflow.cfg"),
+ ("--cfg_path",), "Path to config file to use instead of airflow.cfg"),
# webserver
'port': Arg(
("-p", "--port"),
@@ -1341,11 +1346,11 @@ class CLIFactory(object):
type=int,
help="The port on which to run the server"),
'ssl_cert': Arg(
- ("--ssl_cert", ),
+ ("--ssl_cert",),
default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
help="Path to the SSL certificate for the webserver"),
'ssl_key': Arg(
- ("--ssl_key", ),
+ ("--ssl_key",),
default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
help="Path to the key to use with the SSL certificate"),
'workers': Arg(
diff --git a/airflow/models.py b/airflow/models.py
index 32ad144a22bb5..c1fd4a3e86225 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -108,7 +108,7 @@ def get_fernet():
_CONTEXT_MANAGER_DAG = None
-def clear_task_instances(tis, session, activate_dag_runs=True):
+def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
get killed.
@@ -119,12 +119,20 @@ def clear_task_instances(tis, session, activate_dag_runs=True):
if ti.job_id:
ti.state = State.SHUTDOWN
job_ids.append(ti.job_id)
- # todo: this creates an issue with the webui tests
- # elif ti.state != State.REMOVED:
- # ti.state = State.NONE
- # session.merge(ti)
else:
- session.delete(ti)
+ task_id = ti.task_id
+ if dag and dag.has_task(task_id):
+ task = dag.get_task(task_id)
+ task_retries = task.retries
+ ti.max_tries = ti.try_number + task_retries
+ else:
+ # Ignore errors when updating max_tries if dag is None or
+ # task not found in dag since database records could be
+ # outdated. We make max_tries the maximum value of its
+ # original max_tries or the current task try number.
+ ti.max_tries = max(ti.max_tries, ti.try_number)
+ ti.state = State.NONE
+ session.merge(ti)
if job_ids:
from airflow.jobs import BaseJob as BJ
@@ -1316,8 +1324,8 @@ def run(
# not 0-indexed lists (i.e. Attempt 1 instead of
# Attempt 0 for the first attempt).
msg = "Starting attempt {attempt} of {total}".format(
- attempt=self.try_number % (task.retries + 1) + 1,
- total=task.retries + 1)
+ attempt=self.try_number + 1,
+ total=self.max_tries + 1)
self.start_date = datetime.now()
dep_context = DepContext(
@@ -1338,8 +1346,8 @@ def run(
self.state = State.NONE
msg = ("FIXME: Rescheduling due to concurrency limits reached at task "
"runtime. Attempt {attempt} of {total}. State set to NONE.").format(
- attempt=self.try_number % (task.retries + 1) + 1,
- total=task.retries + 1)
+ attempt=self.try_number + 1,
+ total=self.max_tries + 1)
logging.warning(hr + msg + hr)
self.queued_dttm = datetime.now()
@@ -1486,7 +1494,11 @@ def handle_failure(self, error, test_mode=False, context=None):
# Let's go deeper
try:
- if task.retries and self.try_number % (task.retries + 1) != 0:
+ # try_number is incremented by 1 during task instance run. So the
+ # current task instance try_number is the try_number for the next
+ # task instance run. We only mark task instance as FAILED if the
+ # next task instance try_number exceeds the max_tries.
+ if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
logging.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
@@ -1641,15 +1653,17 @@ def email_alert(self, exception, is_retry=False):
task = self.task
title = "Airflow alert: {self}".format(**locals())
exception = str(exception).replace('\n', '
')
- try_ = task.retries + 1
+ # For reporting purposes, we report based on 1-indexed,
+ # not 0-indexed lists (i.e. Try 1 instead of
+ # Try 0 for the first attempt).
body = (
- "Try {self.try_number} out of {try_}
"
+ "Try {try_number} out of {max_tries}
"
"Exception:
{exception}
"
"Log: Link
"
"Host: {self.hostname}
"
"Log file: {self.log_filepath}
"
"Mark success: Link
"
- ).format(**locals())
+ ).format(try_number=self.try_number + 1, max_tries=self.max_tries + 1, **locals())
send_email(task.email, title, body)
def set_duration(self):
@@ -2382,9 +2396,7 @@ def downstream_list(self):
def downstream_task_ids(self):
return self._downstream_task_ids
- def clear(
- self, start_date=None, end_date=None,
- upstream=False, downstream=False):
+ def clear(self, start_date=None, end_date=None, upstream=False, downstream=False):
"""
Clears the state of task instances associated with the task, following
the parameters specified.
@@ -2413,7 +2425,7 @@ def clear(
count = qry.count()
- clear_task_instances(qry.all(), session)
+ clear_task_instances(qry.all(), session, dag=self.dag)
session.commit()
session.close()
@@ -3244,7 +3256,7 @@ def clear(
do_it = utils.helpers.ask_yesno(question)
if do_it:
- clear_task_instances(tis.all(), session)
+ clear_task_instances(tis.all(), session, dag=self)
if reset_dag_runs:
self.set_dag_runs_state(session=session)
else:
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 96767cb6ea5b4..b86d839261053 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -19,7 +19,9 @@
from builtins import object
+import dateutil.parser
import logging
+import six
from airflow import configuration
from airflow.exceptions import AirflowException
@@ -57,6 +59,19 @@ def __init__(self):
'Please make sure that airflow[s3] is installed and '
'the S3 connection exists.'.format(remote_conn_id))
+ def log_exists(self, remote_log_location):
+ """
+ Check if remote_log_location exists in remote storage
+ :param remote_log_location: log's location in remote storage
+ :return: True if location exists else False
+ """
+ if self.hook:
+ try:
+ return self.hook.get_key(remote_log_location) is not None
+ except Exception:
+ pass
+ return False
+
def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
@@ -137,6 +152,20 @@ def __init__(self):
'"{}". Please make sure that airflow[gcp_api] is installed '
'and the GCS connection exists.'.format(remote_conn_id))
+ def log_exists(self, remote_log_location):
+ """
+ Check if remote_log_location exists in remote storage
+ :param remote_log_location: log's location in remote storage
+ :return: True if location exists else False
+ """
+ if self.hook:
+ try:
+ bkt, blob = self.parse_gcs_url(remote_log_location)
+ return self.hook.exists(bkt, blob)
+ except Exception:
+ pass
+ return False
+
def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location.
@@ -211,3 +240,40 @@ def parse_gcs_url(self, gsurl):
bucket = parsed_url.netloc
blob = parsed_url.path.strip('/')
return (bucket, blob)
+
+
+# TODO: get_log_filename and get_log_directory are temporary helper
+# functions to get airflow log filename. Logic of using FileHandler
+# will be extract out and those two functions will be moved.
+# For more details, please check issue AIRFLOW-1385.
+def get_log_filename(dag_id, task_id, execution_date, try_number):
+ """
+ Return relative log path.
+ :arg dag_id: id of the dag
+ :arg task_id: id of the task
+ :arg execution_date: execution date of the task instance
+ :arg try_number: try_number of current task instance
+ """
+ relative_dir = get_log_directory(dag_id, task_id, execution_date)
+ # For reporting purposes and keeping logs consistent with web UI
+ # display, we report based on 1-indexed, not 0-indexed lists
+ filename = "{}/{}.log".format(relative_dir, try_number+1)
+
+ return filename
+
+
+def get_log_directory(dag_id, task_id, execution_date):
+ """
+ Return log directory path: dag_id/task_id/execution_date
+ :arg dag_id: id of the dag
+ :arg task_id: id of the task
+ :arg execution_date: execution date of the task instance
+ """
+ # execution_date could be parsed in as unicode character
+ # instead of datetime object.
+ if isinstance(execution_date, six.string_types):
+ execution_date = dateutil.parser.parse(execution_date)
+ iso = execution_date.isoformat()
+ relative_dir = '{}/{}/{}'.format(dag_id, task_id, iso)
+
+ return relative_dir
diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html
new file mode 100644
index 0000000000000..03c0ed3707f9a
--- /dev/null
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -0,0 +1,40 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+#}
+{% extends "airflow/task_instance.html" %}
+{% block title %}Airflow - DAGs{% endblock %}
+
+{% block body %}
+ {{ super() }}
+
{{ log }}+