Skip to content

Commit

Permalink
[AIRFLOW-2430] Extend query batching to additional slow queries
Browse files Browse the repository at this point in the history
Closes apache#3324 from gsilk/batch-inserts
  • Loading branch information
Gabriel Silk authored and Fokko Driesprong committed May 13, 2018
1 parent a92330e commit 042c3f2
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 49 deletions.
13 changes: 10 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,16 @@ scheduler_zombie_task_threshold = 300
catchup_by_default = True

# This changes the batch size of queries in the scheduling main loop.
# This depends on query length limits and how long you are willing to hold locks.
# 0 for no limit
max_tis_per_query = 0
# If this is too high, SQL query performance may be impacted by one
# or more of the following:
# - reversion to full table scan
# - complexity of query predicate
# - excessive locking
#
# Additionally, you may hit the maximum allowable query length for your db.
#
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512

# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
dag_dir_list_interval = 0
max_tis_per_query = 0
max_tis_per_query = 512

[admin]
hide_sensitive_variable_fields = True
93 changes: 49 additions & 44 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from airflow.settings import Stats
from airflow.task.task_runner import get_task_runner
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import asciiart, timezone
from airflow.utils import asciiart, helpers, timezone
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
DagFileProcessorManager,
SimpleDag,
Expand Down Expand Up @@ -110,6 +110,7 @@ def __init__(
self.latest_heartbeat = timezone.utcnow()
self.heartrate = heartrate
self.unixname = getpass.getuser()
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super(BaseJob, self).__init__(*args, **kwargs)

def is_alive(self):
Expand Down Expand Up @@ -254,21 +255,30 @@ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
if ti.key not in queued_tis and ti.key not in running_tis:
tis_to_reset.append(ti)

filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in tis_to_reset])
if len(tis_to_reset) == 0:
return []
reset_tis = (
session
.query(TI)
.filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
.with_for_update()
.all())
for ti in reset_tis:
ti.state = State.NONE
session.merge(ti)

def query(result, items):
filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in items])
reset_tis = (
session
.query(TI)
.filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
.with_for_update()
.all())
for ti in reset_tis:
ti.state = State.NONE
session.merge(ti)
return result + reset_tis

reset_tis = helpers.reduce_in_chunks(query,
tis_to_reset,
[],
self.max_tis_per_query)

task_instance_str = '\n\t'.join(
["{}".format(x) for x in reset_tis])
session.commit()
Expand Down Expand Up @@ -579,7 +589,6 @@ def __init__(
# files have finished parsing.
self.min_file_parsing_loop_time = min_file_parsing_loop_time

self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
'run_duration')
Expand Down Expand Up @@ -1261,19 +1270,28 @@ def _change_state_for_executable_task_instances(self, task_instances,
filter_for_ti_enqueue = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
for ti in tis_to_set_to_queued])
for ti in tis_to_set_to_queued])
session.commit()

# requery in batch since above was expired by commit
tis_to_be_queued = (
session
.query(TI)
.filter(or_(*filter_for_ti_enqueue))
.all())
# requery in batches since above was expired by commit

def query(result, items):
tis_to_be_queued = (
session
.query(TI)
.filter(or_(*items))
.all())
task_instance_str = "\n\t".join(
["{}".format(x) for x in tis_to_be_queued])
self.log.info("Setting the follow tasks to queued state:\n\t%s",
task_instance_str)
return result + tis_to_be_queued

tis_to_be_queued = helpers.reduce_in_chunks(query,
filter_for_ti_enqueue,
[],
self.max_tis_per_query)

task_instance_str = "\n\t".join(
["{}".format(x) for x in tis_to_be_queued])
self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
return tis_to_be_queued

def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances):
Expand Down Expand Up @@ -1349,32 +1367,19 @@ def _execute_task_instances(self,
"""
executable_tis = self._find_executable_task_instances(simple_dag_bag, states,
session=session)
if self.max_tis_per_query == 0:

def query(result, items):
tis_with_state_changed = self._change_state_for_executable_task_instances(
executable_tis,
items,
states,
session=session)
self._enqueue_task_instances_with_queued_state(
simple_dag_bag,
tis_with_state_changed)
session.commit()
return len(tis_with_state_changed)
else:
# makes chunks of max_tis_per_query size
chunks = ([executable_tis[i:i + self.max_tis_per_query]
for i in range(0, len(executable_tis), self.max_tis_per_query)])
total_tis_queued = 0
for chunk in chunks:
tis_with_state_changed = self._change_state_for_executable_task_instances(
chunk,
states,
session=session)
self._enqueue_task_instances_with_queued_state(
simple_dag_bag,
tis_with_state_changed)
session.commit()
total_tis_queued += len(tis_with_state_changed)
return total_tis_queued
return result + len(tis_with_state_changed)

return helpers.reduce_in_chunks(query, executable_tis, 0, self.max_tis_per_query)

def _process_dags(self, dagbag, dags, tis_out):
"""
Expand Down
24 changes: 23 additions & 1 deletion airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from builtins import input
from past.builtins import basestring
from datetime import datetime
import getpass
from functools import reduce
import imp
import os
import re
Expand Down Expand Up @@ -123,6 +123,28 @@ def as_tuple(obj):
return tuple([obj])


def chunks(items, chunk_size):
"""
Yield successive chunks of a given size from a list of items
"""
if (chunk_size <= 0):
raise ValueError('Chunk size must be a positive integer')
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]


def reduce_in_chunks(fn, iterable, initializer, chunk_size=0):
"""
Reduce the given list of items by splitting it into chunks
of the given size and passing each chunk through the reducer
"""
if len(iterable) == 0:
return initializer
if chunk_size == 0:
chunk_size = len(iterable)
return reduce(fn, chunks(iterable, chunk_size), initializer)


def as_flattened_list(iterable):
"""
Return an iterable with one level flattened
Expand Down
30 changes: 30 additions & 0 deletions tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,36 @@ def test_reap_process_group(self):
except OSError:
pass

def test_chunks(self):
with self.assertRaises(ValueError):
[i for i in helpers.chunks([1, 2, 3], 0)]

with self.assertRaises(ValueError):
[i for i in helpers.chunks([1, 2, 3], -3)]

self.assertEqual([i for i in helpers.chunks([], 5)], [])
self.assertEqual([i for i in helpers.chunks([1], 1)], [[1]])
self.assertEqual([i for i in helpers.chunks([1, 2, 3], 2)],
[[1, 2], [3]])

def test_reduce_in_chunks(self):
self.assertEqual(helpers.reduce_in_chunks(lambda x, y: x + [y],
[1, 2, 3, 4, 5],
[]),
[[1, 2, 3, 4, 5]])

self.assertEqual(helpers.reduce_in_chunks(lambda x, y: x + [y],
[1, 2, 3, 4, 5],
[],
2),
[[1, 2], [3, 4], [5]])

self.assertEqual(helpers.reduce_in_chunks(lambda x, y: x + y[0] * y[1],
[1, 2, 3, 4],
0,
2),
14)


if __name__ == '__main__':
unittest.main()

0 comments on commit 042c3f2

Please sign in to comment.