Skip to content

Commit

Permalink
feat: add SearchAfterMixin for ES search_after capability
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali-D-Akbar committed Jan 13, 2025
1 parent d63d119 commit 2b92b48
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 20 deletions.
61 changes: 61 additions & 0 deletions course_discovery/apps/course_metadata/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,67 @@ def search(cls, query, queryset=None):
return filtered_queryset


class SearchAfterMixin:
"""
Represents objects to query Elasticsearch with `search_after` pagination and load by primary key.
"""

@classmethod
def search(cls, query, queryset=None, page_size=settings.ELASTICSEARCH_DSL_QUERYSET_PAGINATION):
"""
Queries the Elasticsearch index with optional pagination using `search_after`.
Args:
query (str) -- Elasticsearch querystring (e.g. `title:intro*`)
queryset (models.QuerySet) -- base queryset to search, defaults to objects.all()
page_size (int) -- Number of results per page.
Returns:
QuerySet
"""
query = clean_query(query)
queryset = queryset or cls.objects.all()

if query == '(*)':
# Early-exit optimization. Wildcard searching is very expensive in elasticsearch. And since we just
# want everything, we don't need to actually query elasticsearch at all.
return queryset

logger.info(f"Attempting Elasticsearch document search against query: {query}")
es_document, *_ = registry.get_documents(models=(cls,))
dsl_query = ESDSLQ('query_string', query=query, analyze_wildcard=True)

all_ids = set()
search_after = None

while True:
search = (
es_document.search()
.query(dsl_query)
.sort('id')
.extra(size=page_size, search_after=search_after)
)

try:
results = search.execute()
except RequestError as e:
logger.warning(f"Elasticsearch request failed: {e}")
break

ids = {result.pk for result in results}
if not ids:
logger.info("No more results found.")
break

all_ids.update(ids)
search_after = results[-1].meta.sort
logger.info(f"Fetched {len(ids)} records; total so far: {len(all_ids)}")

filtered_queryset = queryset.filter(pk__in=all_ids)
logger.info(f"Filtered queryset of size {len(filtered_queryset)} for query: {query}")
return filtered_queryset


class Collaborator(TimeStampedModel):
"""
Collaborator model, defining any collaborators who helped write course content.
Expand Down
89 changes: 87 additions & 2 deletions course_discovery/apps/course_metadata/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from decimal import Decimal
from functools import partial
from unittest import mock
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import ddt
import pytest
Expand All @@ -17,6 +17,7 @@
from django.core.exceptions import ValidationError
from django.core.management import call_command
from django.db import IntegrityError, transaction
from django.db.models import QuerySet
from django.test import TestCase, override_settings
from edx_django_utils.cache import RequestCache
from edx_toggles.toggles.testutils import override_waffle_switch
Expand All @@ -39,7 +40,8 @@
FAQ, AbstractHeadingBlurbModel, AbstractMediaModel, AbstractNamedModel, AbstractTitleDescriptionModel,
AbstractValueModel, CorporateEndorsement, Course, CourseEditor, CourseRun, CourseRunType, CourseType, Curriculum,
CurriculumCourseMembership, CurriculumCourseRunExclusion, CurriculumProgramMembership, DegreeCost, DegreeDeadline,
Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, Seat, SeatType, Subject, Topic
Endorsement, Organization, OrganizationMapping, Program, ProgramType, Ranking, SearchAfterMixin, Seat, SeatType,
Subject, Topic
)
from course_discovery.apps.course_metadata.publishers import (
CourseRunMarketingSitePublisher, ProgramMarketingSitePublisher
Expand Down Expand Up @@ -4192,3 +4194,86 @@ def test_basic(self):
self.assertEqual(course_run.restricted_run, restricted_course_run)
self.assertEqual(restricted_course_run.restriction_type, 'custom-b2b-enterprise')
self.assertEqual(str(restricted_course_run), "course-v1:SC+BreadX+3T2015: <custom-b2b-enterprise>")


class MockQuerySet(QuerySet):
def __init__(self, model=None, items=None):
self.model = model
self.items = items or []
super().__init__()

def filter(self, *args, **kwargs):
"""
Override the filter method to mimic Django's QuerySet behavior.
"""
pk_in = kwargs.get("pk__in", [])
pk_in = set(map(int, pk_in))
print(f"Filtering with pk_in: {pk_in}, self.items: {[item.pk for item in self.items]}")
return MockQuerySet(
model=self.model,
items=[item for item in self.items if item.pk in pk_in],
)

def __iter__(self):
return iter(self.items)

def __len__(self):
return len(self.items)

def all(self):
return self

def _chain(self):
# Mimic Django's queryset chaining
return self.__class__(model=self.model, items=self.items)


class MockModel:
def __init__(self, pk):
self.pk = pk


class SearchAfterMixinTest(SearchAfterMixin, MockModel):
objects = MockQuerySet(model=MockModel)


class TestSearchAfterMixin(TestCase):
@patch("course_discovery.apps.course_metadata.models.registry.get_documents")
@patch("course_discovery.apps.course_metadata.models.logger")
@patch("course_discovery.apps.course_metadata.models.clean_query")
def test_search_with_mock_data(self, mock_clean_query, mock_logger, mock_registry):
mock_document = MagicMock()
mock_search = MagicMock()
mock_document.search.return_value = mock_search

mock_search.query.return_value = mock_search
mock_search.sort.return_value = mock_search
mock_search.extra.return_value = mock_search

mock_result1 = MagicMock()
mock_result1.pk = 1
mock_result1.meta.sort = ["sort1"]

mock_result2 = MagicMock()
mock_result2.pk = 2
mock_result2.meta.sort = ["sort2"]

mock_search.execute.side_effect = [
[mock_result1, mock_result2],
[],
]

mock_registry.return_value = (mock_document,)
mock_clean_query.return_value = "cleaned_query"

SearchAfterMixinTest.objects = MockQuerySet(
model=MockModel,
items=[MockModel(1), MockModel(2), MockModel(3)],
)

result_queryset = SearchAfterMixinTest.search("query")

self.assertEqual(len(result_queryset), 2)
self.assertTrue(all(item.pk in {1, 2} for item in result_queryset))
mock_logger.info.assert_called()
mock_registry.assert_called_once_with(models=(SearchAfterMixinTest,))
39 changes: 21 additions & 18 deletions docs/decisions/0030-use-elasticsearch-search-after.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,52 @@ Accepted (December 2024)

Context
---------
Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query,
controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000.
When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead,
Elasticsearch enforces a strict limit on the number of records that can be retrieved in a single query,
controlled by the `MAX_RESULT_WINDOW` setting, which defaults to 10,000.
When a query attempts to retrieve more results than this limit, Elasticsearch does not simply truncate the results—instead,
it can lead to partial or incomplete data retrieval across search endpoints, potentially causing significant data loss or incomplete query responses.

Increasing this limit is not a viable solution, as it can lead to significant performance issues,
Increasing this limit is not a viable solution, as it can lead to significant performance issues,
including increased memory usage and query latency, which can degrade the cluster's overall stability.

To address this issue, we need a more efficient way to paginate large query results.
The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system.
To address this issue, we need a more efficient way to paginate large query results.
The solution must allow for seamless and reliable pagination without imposing excessive resource demands on the system.
Furthermore, it should ensure that the existing search functionality and search responses remain unaffected in the current version of the endpoint.

Decision
----------
A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected.
A new version (v2) of the `search/all/` endpoint will be introduced to enhance functionality while ensuring that the existing v1 functionality remains unaffected.
This version will utilize ElasticSearch's search_after pagination mechanism, specifically designed to handle large query results efficiently.

*How search_after Works:**
`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page.
**How search_after Works:**
`search_after` is a pagination mechanism that allows retrieving results beyond the standard window limit by using the sort values of the last document from the previous page.
Instead of using traditional offset-based pagination, it uses the actual sort values of the last retrieved document to fetch the next set of results, ensuring efficient and accurate pagination for large datasets.

In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries.
This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations.
In the v2 implementation, response documents will include a `sort` field that can be used as the `search_after` query parameter in subsequent queries.
This approach enables scalable retrieval of large datasets by bypassing the `MAX_RESULT_WINDOW` limitations.
To support this, a new `SearchAfterPagination` class will be introduced, which will parse the `search_after` query parameter to facilitate efficient pagination of ElasticSearch results.

Additionally, new serializers will be integrated for the v2 implementation.
Additionally, new serializers will be integrated for the v2 implementation.
Specifically, the AggregateSearchListSerializerV2 will extend the existing AggregateSearchListSerializer,
supporting the `search_after` pagination mechanism and incorporating newer serializer versions for the same document types.

New versions of the serializers
New versions of the serializers
- CourseRunSearchDocumentSerializer
- CourseSearchDocumentSerializer
- ProgramSearchDocumentSerializer
- LearnerPathwaySearchDocumentSerializer
- PersonSearchDocumentSerializer

will be introduced to include additional search index fields, specifically `sort`` and `aggregate_uuid` in their responses.
will be introduced to include additional search index fields, specifically `sort`` and `aggregation_uuid` in their responses.

Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`,
which returns search results along with a `next` field representing the `sort` value of the last document.
Consumers will interact with the new v2 search endpoint by making an initial request to `/api/v2/search/all/`,
which returns search results along with a `next` field representing the `sort` value of the last document.
For subsequent pages, they simply include this `next` value as the `search_after` parameter in their next request.

A new mixin, SearchAfterMixin, will be created and added to enable the search_after functionality in catalog query endpoints, such as `/api/v1/catalog/query_contains`.

Example Usage:

```
# First request
response1 = requests.get('/api/v2/search/all/')
Expand All @@ -58,15 +61,15 @@ next_page_search_after = results1['next'] # This is the sort value of the last

# Next request using the 'next' value
response2 = requests.get(f'/api/v2/search/all/?search_after={json.dumps(next_page_search_after)}')

```

Consequences
--------------
- The v2 search endpoint will introduce two new fields, `aggregate_uuid` and `sort`, in response to support the search_after pagination mechanism.
- The v2 search endpoint will introduce two new fields, `aggregation_uuid` and `sort`, in response to support the search_after pagination mechanism.

Next Steps
-------------------------
- Create SearchAfter Mixin: Develop a mixin to enable search_after functionality in the Django shell.
- Extend SearchAfter Functionality: Implement `search_after` for other Elasticsearch search endpoints.
- Notify Users: Inform consumers about the changes and provide support during the transition.
- Monitor Performance: Track the performance of the new endpoint post-deployment.

0 comments on commit 2b92b48

Please sign in to comment.