Skip to content

Commit

Permalink
[AIRFLOW-289] Make airflow timezone independent
Browse files Browse the repository at this point in the history
Airflow mixes datetime.now()/today() and utcnow().
This can lead
to issues in case the OS is not in UTC.

Closes apache#2618 from bolkedebruin/use_utcnow

(cherry picked from commit a81c153)
Signed-off-by: Bolke de Bruin <[email protected]>
  • Loading branch information
bolkedebruin committed Sep 20, 2017
1 parent b9d7d1f commit 20c83e1
Show file tree
Hide file tree
Showing 19 changed files with 108 additions and 108 deletions.
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
dr = dag.create_dagrun(
run_id=run_id_template.format(date.isoformat()),
execution_date=date,
start_date=datetime.datetime.now(),
start_date=datetime.datetime.utcnow(),
external_trigger=False,
state=state,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
dag = dagbag.get_dag(dag_id)

if not execution_date:
execution_date = datetime.datetime.now()
execution_date = datetime.datetime.utcnow()

assert isinstance(execution_date, datetime.datetime)
execution_date = execution_date.replace(microsecond=0)
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/task_runner/cgroup_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def start(self):
return

# Create a unique cgroup name
cgroup_name = "airflow/{}/{}".format(datetime.datetime.now().
cgroup_name = "airflow/{}/{}".format(datetime.datetime.utcnow().
strftime("%Y-%m-%d"),
str(uuid.uuid1()))

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/docker_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# default_args = {
# 'owner': 'airflow',
# 'depends_on_past': False,
# 'start_date': datetime.now(),
# 'start_date': datetime.utcnow(),
# 'email': ['[email protected]'],
# 'email_on_failure': False,
# 'email_on_retry': False,
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_docker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'start_date': datetime.utcnow(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def conditionally_trigger(context, dag_run_obj):
# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
default_args={"owner": "airflow",
"start_date": datetime.now()},
"start_date": datetime.utcnow()},
schedule_interval='@once')


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
'start_date': datetime.now(),
'start_date': datetime.utcnow(),
'owner': 'airflow',
}

Expand Down
64 changes: 32 additions & 32 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,22 @@ def __init__(
self.hostname = socket.getfqdn()
self.executor = executor
self.executor_class = executor.__class__.__name__
self.start_date = datetime.now()
self.latest_heartbeat = datetime.now()
self.start_date = datetime.utcnow()
self.latest_heartbeat = datetime.utcnow()
self.heartrate = heartrate
self.unixname = getpass.getuser()
super(BaseJob, self).__init__(*args, **kwargs)

def is_alive(self):
return (
(datetime.now() - self.latest_heartbeat).seconds <
(datetime.utcnow() - self.latest_heartbeat).seconds <
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
)

def kill(self):
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.end_date = datetime.now()
job.end_date = datetime.utcnow()
try:
self.on_kill()
except:
Expand Down Expand Up @@ -164,7 +164,7 @@ def heartbeat(self):
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (datetime.now() - job.latest_heartbeat).total_seconds())
self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds())

# Don't keep session open while sleeping as it leaves a connection open
session.close()
Expand All @@ -173,7 +173,7 @@ def heartbeat(self):
# Update last heartbeat time
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.latest_heartbeat = datetime.now()
job.latest_heartbeat = datetime.utcnow()
session.merge(job)
session.commit()

Expand All @@ -196,7 +196,7 @@ def run(self):
self._execute()

# Marking the success in the DB
self.end_date = datetime.now()
self.end_date = datetime.utcnow()
self.state = State.SUCCESS
session.merge(self)
session.commit()
Expand Down Expand Up @@ -422,7 +422,7 @@ def start(self):
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self.log_file)
self._start_time = datetime.now()
self._start_time = datetime.utcnow()

def terminate(self, sigkill=False):
"""
Expand Down Expand Up @@ -635,16 +635,16 @@ def manage_slas(self, dag, session=None):
TI.execution_date == sq.c.max_ti,
).all()

ts = datetime.now()
ts = datetime.utcnow()
SlaMiss = models.SlaMiss
for ti in max_tis:
task = dag.get_task(ti.task_id)
dttm = ti.execution_date
if task.sla:
dttm = dag.following_schedule(dttm)
while dttm < datetime.now():
while dttm < datetime.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < datetime.now():
if following_schedule + task.sla < datetime.utcnow():
session.merge(models.SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
Expand Down Expand Up @@ -794,9 +794,9 @@ def create_dag_run(self, dag, session=None):
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.now() - dag.dagrun_timeout):
dr.start_date < datetime.utcnow() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.now()
dr.end_date = datetime.utcnow()
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
Expand All @@ -821,9 +821,9 @@ def create_dag_run(self, dag, session=None):
# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.now() is AFTER
# one period before, so that datetime.utcnow() is AFTER
# the period end, and the job can be created...
now = datetime.now()
now = datetime.utcnow()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
Expand Down Expand Up @@ -869,7 +869,7 @@ def create_dag_run(self, dag, session=None):
)

# don't ever schedule in the future
if next_run_date > datetime.now():
if next_run_date > datetime.utcnow():
return

# this structure is necessary to avoid a TypeError from concatenating
Expand All @@ -892,11 +892,11 @@ def create_dag_run(self, dag, session=None):
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return

if next_run_date and period_end and period_end <= datetime.now():
if next_run_date and period_end and period_end <= datetime.utcnow():
next_run = dag.create_dagrun(
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.now(),
start_date=datetime.utcnow(),
state=State.RUNNING,
external_trigger=False
)
Expand All @@ -916,7 +916,7 @@ def _process_task_instances(self, dag, queue):
for run in dag_runs:
self.log.info("Examining DAG run %s", run)
# don't consider runs that are executed in the future
if run.execution_date > datetime.now():
if run.execution_date > datetime.utcnow():
self.log.error(
"Execution date is in future: %s",
run.execution_date
Expand Down Expand Up @@ -1215,7 +1215,7 @@ def _change_state_for_executable_task_instances(self, task_instances,
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = (datetime.now()
task_instance.queued_dttm = (datetime.utcnow()
if not task_instance.queued_dttm
else task_instance.queued_dttm)
session.merge(task_instance)
Expand Down Expand Up @@ -1422,7 +1422,7 @@ def _log_file_processing_stats(self,
last_runtime = processor_manager.get_last_runtime(file_path)
processor_pid = processor_manager.get_pid(file_path)
processor_start_time = processor_manager.get_start_time(file_path)
runtime = ((datetime.now() - processor_start_time).total_seconds()
runtime = ((datetime.utcnow() - processor_start_time).total_seconds()
if processor_start_time else None)
last_run = processor_manager.get_last_finish_time(file_path)

Expand Down Expand Up @@ -1543,34 +1543,34 @@ def _execute_helper(self, processor_manager):
self.reset_state_for_orphaned_tasks(session=session)
session.close()

execute_start_time = datetime.now()
execute_start_time = datetime.utcnow()

# Last time stats were printed
last_stat_print_time = datetime(2000, 1, 1)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.now()
last_self_heartbeat_time = datetime.utcnow()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.now()
last_dag_dir_refresh_time = datetime.utcnow()

# Use this value initially
known_file_paths = processor_manager.file_paths

# For the execute duration, parse and schedule DAGs
while (datetime.now() - execute_start_time).total_seconds() < \
while (datetime.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
loop_start_time = time.time()

# Traverse the DAG directory for Python files containing DAGs
# periodically
elapsed_time_since_refresh = (datetime.now() -
elapsed_time_since_refresh = (datetime.utcnow() -
last_dag_dir_refresh_time).total_seconds()

if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = datetime.now()
last_dag_dir_refresh_time = datetime.utcnow()
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
processor_manager.set_file_paths(known_file_paths)

Expand Down Expand Up @@ -1620,20 +1620,20 @@ def _execute_helper(self, processor_manager):
self._process_executor_events()

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (datetime.now() -
time_since_last_heartbeat = (datetime.utcnow() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.log.info("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = datetime.now()
last_self_heartbeat_time = datetime.utcnow()

# Occasionally print out stats about how fast the files are getting processed
if ((datetime.now() - last_stat_print_time).total_seconds() >
if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
self.print_stats_interval):
if len(known_file_paths) > 0:
self._log_file_processing_stats(known_file_paths,
processor_manager)
last_stat_print_time = datetime.now()
last_stat_print_time = datetime.utcnow()

loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
Expand Down Expand Up @@ -2016,7 +2016,7 @@ def _get_dag_run(self, run_date, session=None):
run = run or self.dag.create_dagrun(
run_id=run_id,
execution_date=run_date,
start_date=datetime.now(),
start_date=datetime.utcnow(),
state=State.RUNNING,
external_trigger=False,
session=session
Expand Down
Loading

0 comments on commit 20c83e1

Please sign in to comment.