diff --git a/lib/galaxy/managers/base.py b/lib/galaxy/managers/base.py index 829df35cdca8..57106c917205 100644 --- a/lib/galaxy/managers/base.py +++ b/lib/galaxy/managers/base.py @@ -388,6 +388,21 @@ def list(self, filters=None, order_by=None, limit=None, offset=None, **kwargs): items = self._apply_fn_filters_gen(items, fn_filters) return list(self._apply_fn_limit_offset_gen(items, limit, offset)) + def count(self, filters=None, **kwargs): + """ + Returns the number of objects matching the given filters. + + If the filters include functional filters, this function will raise an exception as they might cause + performance issues. + """ + self._handle_filters_case_sensitivity(filters) + orm_filters, fn_filters = self._split_filters(filters) + if fn_filters: + raise exceptions.RequestParameterInvalidException("Counting with functional filters is not supported.") + + query = self.query(filters=orm_filters, **kwargs) + return query.count() + def _handle_filters_case_sensitivity(self, filters): """Modifies the filters to make them case insensitive if needed.""" if filters is None: diff --git a/test/unit/app/managers/test_HistoryManager.py b/test/unit/app/managers/test_HistoryManager.py index 6be12582246e..1ed4c9d3b426 100644 --- a/test/unit/app/managers/test_HistoryManager.py +++ b/test/unit/app/managers/test_HistoryManager.py @@ -2,6 +2,7 @@ """ from unittest import mock +import pytest import sqlalchemy from sqlalchemy import true @@ -938,6 +939,41 @@ def test_list(self): found = self.history_manager.list(filters=filters, offset=-1) assert found == deleted_and_annotated - # TODO: eq, ge, le - # def test_ratings( self ): - # pass + def test_count(self): + user2 = self.user_manager.create(**user2_data) + history1 = self.history_manager.create(name="history1", user=user2) + history2 = self.history_manager.create(name="history2", user=user2) + history3 = self.history_manager.create(name="history3", user=user2) + history4 = self.history_manager.create(name="history4", user=user2) + + self.history_manager.delete(history1) + self.history_manager.delete(history2) + self.history_manager.delete(history3) + + test_annotation = "testing" + history2.add_item_annotation(self.trans.sa_session, user2, history2, test_annotation) + self.trans.sa_session.flush() + history3.add_item_annotation(self.trans.sa_session, user2, history3, test_annotation) + self.trans.sa_session.flush() + history3.add_item_annotation(self.trans.sa_session, user2, history4, test_annotation) + self.trans.sa_session.flush() + + all_histories = [history1, history2, history3, history4] + deleted = [history1, history2, history3] + + assert self.history_manager.count() == len(all_histories), "having no filters should count all histories" + filters = [model.History.deleted == true()] + assert self.history_manager.count(filters=filters) == len(deleted), "counting with orm filters should work" + + raw_annotation_fn_filter = ("annotation", "has", test_annotation) + # functional filtering is not supported + with pytest.raises(exceptions.RequestParameterInvalidException) as exc_info: + filters = self.filter_parser.parse_filters([raw_annotation_fn_filter]) + self.history_manager.count(filters=filters) + assert "not supported" in str(exc_info) + + raw_deleted_orm_filter = ("deleted", "eq", "True") + with pytest.raises(exceptions.RequestParameterInvalidException) as exc_info: + filters = self.filter_parser.parse_filters([raw_deleted_orm_filter, raw_annotation_fn_filter]) + self.history_manager.count(filters=filters) + assert "not supported" in str(exc_info)