Skip to content

Commit 13a4d5c

Browse files
Martin J. Laubachsafwanrahman
authored andcommitted
Mjl index speedup (#213)
* Use elasticsearch's parallel_bulk for indexing, add ELASTICSEARCH_DSL_PARALLEL default setting and parameters to management command. Use qs.iterator() for fetching data during reindex, as this is much more memory efficient and performant. Instead of finding out which methods to call to prepare fields, do that finagling once and cache it for subsequent model instance prepares. See issue #154 for performance analysis and details. * Move collection of prepare functions to __init__, where it's conceptually cleaner. Also shaves off a test per object. * Minor cleanup: Move prepare cache to Document object instead of Model, as it's conceptually possible to have several indices on the same model. Also remove forced ordering that is a remnant of earlier code. * chunk_size parameter for queryset.iterator() appeared in Django 2 * Do not crash in init_prepare when no fields have been defined * Crank up diff size to see what is going on * Adapt test to changed call pattern * Adapt tests to changed call patterns * Mark pagination test as expected failure for now. * Define _prepared_fields as attribute in class so to_dict() won't pick it up as document field * remove debugging * Add parameter no to do a count(*) before indexing, as for complex querysets that might be expensive. * Fixing example application * Correctly clean up after test run (delete indices with the right name). * Remove paginator test. Add tests for usage of init_prepare() and _prepared_fields. Add tests for correct calling of bulk/parallel_bulk. * Make sure we compare w/ stable order * Adjust for different types for methods/partials in py2 * Correct es dependency (was conflicting with requirements.txt) * Pass queryset_pagination as chunk_size into parallel_bulk too. * Add explanation why we use deque() * Correct typo in explanation of test * Remove leftover instrumentation print * Better formatting to avoid backslash-continuation line
1 parent 1072bcf commit 13a4d5c

File tree

10 files changed

+271
-139
lines changed

10 files changed

+271
-139
lines changed

README.rst

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,12 @@ It required to defined ``Document`` class in ``documents.py`` in your app direc
131131
# Ignore auto updating of Elasticsearch when a model is saved
132132
# or deleted:
133133
# ignore_signals = True
134+
134135
# Don't perform an index refresh after every update (overrides global setting):
135136
# auto_refresh = False
137+
136138
# Paginate the django queryset used to populate the index with the specified size
137-
# (by default there is no pagination)
139+
# (by default it uses the database driver's default setting)
138140
# queryset_pagination = 5000
139141
140142
@@ -551,6 +553,15 @@ Defaults to ``django_elasticsearch_dsl.signals.RealTimeSignalProcessor``.
551553
You could, for instance, make a ``CelerySignalProcessor`` which would add
552554
update jobs to the queue to for delayed processing.
553555

556+
ELASTICSEARCH_DSL_PARALLEL
557+
~~~~~~~~~~~~~~~~~~~~~~~~~~
558+
559+
Default: ``False``
560+
561+
Run indexing (populate and rebuild) in parallel using ES' parallel_bulk() method.
562+
Note that some databases (e.g. sqlite) do not play well with this option.
563+
564+
554565
Testing
555566
-------
556567

django_elasticsearch_dsl/documents.py

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
from __future__ import unicode_literals
22

3-
from django.core.paginator import Paginator
3+
from collections import deque
4+
from copy import deepcopy
5+
from functools import partial
6+
7+
from django import VERSION as DJANGO_VERSION
48
from django.db import models
59
from django.utils.six import iteritems
6-
from elasticsearch.helpers import bulk
10+
from elasticsearch.helpers import bulk, parallel_bulk
711
from elasticsearch_dsl import Document as DSLDocument
812

913
from .exceptions import ModelFieldNotMappedError
@@ -45,11 +49,12 @@
4549
models.URLField: TextField,
4650
}
4751

48-
4952
class DocType(DSLDocument):
53+
_prepared_fields = []
5054
def __init__(self, related_instance_to_ignore=None, **kwargs):
5155
super(DocType, self).__init__(**kwargs)
5256
self._related_instance_to_ignore = related_instance_to_ignore
57+
self._prepared_fields = self.init_prepare()
5358

5459
def __eq__(self, other):
5560
return id(self) == id(other)
@@ -70,39 +75,56 @@ def get_queryset(self):
7075
"""
7176
Return the queryset that should be indexed by this doc type.
7277
"""
73-
primary_key_field_name = self.django.model._meta.pk.name
74-
return self.django.model._default_manager.all().order_by(primary_key_field_name)
78+
return self.django.model._default_manager.all()
7579

76-
def prepare(self, instance):
80+
def get_indexing_queryset(self):
7781
"""
78-
Take a model instance, and turn it into a dict that can be serialized
79-
based on the fields defined on this DocType subclass
82+
Build queryset (iterator) for use by indexing.
83+
"""
84+
qs = self.get_queryset()
85+
kwargs = {}
86+
if DJANGO_VERSION >= (2,) and self.django.queryset_pagination:
87+
kwargs = {'chunk_size': self.django.queryset_pagination}
88+
return qs.iterator(**kwargs)
89+
90+
def init_prepare(self):
91+
"""
92+
Initialise the data model preparers once here. Extracts the preparers
93+
from the model and generate a list of callables to avoid doing that
94+
work on every object instance over.
8095
"""
81-
data = {}
82-
for name, field in iteritems(self._fields):
96+
index_fields = getattr(self, '_fields', {})
97+
fields = []
98+
for name, field in iteritems(index_fields):
8399
if not isinstance(field, DEDField):
84100
continue
85101

86-
if field._path == []:
102+
if not field._path:
87103
field._path = [name]
88104

89105
prep_func = getattr(self, 'prepare_%s_with_related' % name, None)
90106
if prep_func:
91-
field_value = prep_func(
92-
instance,
93-
related_to_ignore=self._related_instance_to_ignore
94-
)
107+
fn = partial(prep_func, related_to_ignore=self._related_instance_to_ignore)
95108
else:
96109
prep_func = getattr(self, 'prepare_%s' % name, None)
97110
if prep_func:
98-
field_value = prep_func(instance)
111+
fn = prep_func
99112
else:
100-
field_value = field.get_value_from_instance(
101-
instance, self._related_instance_to_ignore
102-
)
113+
fn = partial(field.get_value_from_instance, field_value_to_ignore=self._related_instance_to_ignore)
103114

104-
data[name] = field_value
115+
fields.append((name, field, fn))
105116

117+
return fields
118+
119+
def prepare(self, instance):
120+
"""
121+
Take a model instance, and turn it into a dict that can be serialized
122+
based on the fields defined on this DocType subclass
123+
"""
124+
data = {
125+
name: prep_func(instance)
126+
for name, field, prep_func in self._prepared_fields
127+
}
106128
return data
107129

108130
@classmethod
@@ -124,6 +146,17 @@ def to_field(cls, field_name, model_field):
124146
def bulk(self, actions, **kwargs):
125147
return bulk(client=self._get_connection(), actions=actions, **kwargs)
126148

149+
def parallel_bulk(self, actions, **kwargs):
150+
if self.django.queryset_pagination and 'chunk_size' not in kwargs:
151+
kwargs['chunk_size'] = self.django.queryset_pagination
152+
bulk_actions = parallel_bulk(client=self._get_connection(), actions=actions, **kwargs)
153+
# As the `parallel_bulk` is lazy, we need to get it into `deque` to run it instantly
154+
# See https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2
155+
deque(bulk_actions, maxlen=0)
156+
# Fake return value to emulate bulk() since we don't have a result yet,
157+
# the result is currently not used upstream anyway.
158+
return (1, [])
159+
127160
def _prepare_action(self, object_instance, action):
128161
return {
129162
'_op_type': action,
@@ -135,18 +168,18 @@ def _prepare_action(self, object_instance, action):
135168
}
136169

137170
def _get_actions(self, object_list, action):
138-
if self.django.queryset_pagination is not None:
139-
paginator = Paginator(
140-
object_list, self.django.queryset_pagination
141-
)
142-
for page in paginator.page_range:
143-
for object_instance in paginator.page(page).object_list:
144-
yield self._prepare_action(object_instance, action)
171+
for object_instance in object_list:
172+
yield self._prepare_action(object_instance, action)
173+
174+
def _bulk(self, *args, **kwargs):
175+
"""Helper for switching between normal and parallel bulk operation"""
176+
parallel = kwargs.pop('parallel', False)
177+
if parallel:
178+
return self.parallel_bulk(*args, **kwargs)
145179
else:
146-
for object_instance in object_list:
147-
yield self._prepare_action(object_instance, action)
180+
return self.bulk(*args, **kwargs)
148181

149-
def update(self, thing, refresh=None, action='index', **kwargs):
182+
def update(self, thing, refresh=None, action='index', parallel=False, **kwargs):
150183
"""
151184
Update each document in ES for a model, iterable of models or queryset
152185
"""
@@ -160,8 +193,10 @@ def update(self, thing, refresh=None, action='index', **kwargs):
160193
else:
161194
object_list = thing
162195

163-
return self.bulk(
164-
self._get_actions(object_list, action), **kwargs
196+
return self._bulk(
197+
self._get_actions(object_list, action),
198+
parallel=parallel,
199+
**kwargs
165200
)
166201

167202

django_elasticsearch_dsl/management/commands/search_index.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from __future__ import unicode_literals, absolute_import
2+
3+
from django.conf import settings
24
from django.core.management.base import BaseCommand, CommandError
35
from django.utils.six.moves import input
46
from ...registries import registry
@@ -49,6 +51,26 @@ def add_arguments(self, parser):
4951
dest='force',
5052
help="Force operations without asking"
5153
)
54+
parser.add_argument(
55+
'--parallel',
56+
action='store_true',
57+
dest='parallel',
58+
help='Run populate/rebuild update multi threaded'
59+
)
60+
parser.add_argument(
61+
'--no-parallel',
62+
action='store_false',
63+
dest='parallel',
64+
help='Run populate/rebuild update single threaded'
65+
)
66+
parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False))
67+
parser.add_argument(
68+
'--no-count',
69+
action='store_false',
70+
default=True,
71+
dest='count',
72+
help='Do not include a total count in the summary log line'
73+
)
5274

5375
def _get_models(self, args):
5476
"""
@@ -84,12 +106,15 @@ def _create(self, models, options):
84106
index.create()
85107

86108
def _populate(self, models, options):
109+
parallel = options['parallel']
87110
for doc in registry.get_documents(models):
88-
qs = doc().get_queryset()
89-
self.stdout.write("Indexing {} '{}' objects".format(
90-
qs.count(), doc.django.model.__name__)
111+
self.stdout.write("Indexing {} '{}' objects {}".format(
112+
doc().get_queryset().count() if options['count'] else "all",
113+
doc.django.model.__name__,
114+
"(parallel)" if parallel else "")
91115
)
92-
doc().update(qs)
116+
qs = doc().get_indexing_queryset()
117+
doc().update(qs, parallel=parallel)
93118

94119
def _delete(self, models, options):
95120
index_names = [index._name for index in registry.get_indices(models)]

django_elasticsearch_dsl/test/testcases.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ def setUp(self):
1919
def tearDown(self):
2020
pattern = re.compile(self._index_suffixe + '$')
2121

22-
for doc in registry.get_documents():
23-
doc._index._name = pattern.sub('', doc._index._name)
24-
2522
for index in registry.get_indices():
2623
index.delete(ignore=[404, 400])
2724
index._name = pattern.sub('', index._name)
2825

26+
for doc in registry.get_documents():
27+
doc._index._name = pattern.sub('', doc._index._name)
28+
2929
super(ESTestCase, self).tearDown()

0 commit comments

Comments
 (0)