From ee713ccc8f8594584499747473048f0ae25ddb89 Mon Sep 17 00:00:00 2001 From: Marcos Prieto Date: Fri, 14 Mar 2025 11:24:58 +0100 Subject: [PATCH] Publish annotation tasks to an authority queue Based on a per authority configuration publish new annotation messages that containt mentions to a queue. The motivation for this is to support mention notification for LMS users. --- h/config.py | 3 + h/services/__init__.py | 5 + h/services/annotation_authority_queue.py | 105 +++++++++++++ h/subscribers.py | 9 +- h/tasks/annotations.py | 9 ++ tests/common/fixtures/services.py | 7 + .../annotation_authority_queue_test.py | 143 ++++++++++++++++++ tests/unit/h/subscribers_test.py | 16 ++ tests/unit/h/tasks/annotations_test.py | 25 ++- 9 files changed, 315 insertions(+), 7 deletions(-) create mode 100644 h/services/annotation_authority_queue.py create mode 100644 tests/unit/h/services/annotation_authority_queue_test.py diff --git a/h/config.py b/h/config.py index 2d7d6b02183..d38dbeafc61 100644 --- a/h/config.py +++ b/h/config.py @@ -121,6 +121,9 @@ def configure(environ=None, settings=None): # noqa: PLR0915 # Reporting settings settings_manager.set("h.report.fdw_users", "REPORT_FDW_USERS", type_=aslist) + # Optional configuration for publishing annotation events to an authority's queue + settings_manager.set("h.authority_queue_config", "AUTHORITY_QUEUE_CONFIG") + # Debug/development settings settings_manager.set("debug_query", "DEBUG_QUERY") diff --git a/h/services/__init__.py b/h/services/__init__.py index db44c04a3f9..88be8b3683c 100644 --- a/h/services/__init__.py +++ b/h/services/__init__.py @@ -1,5 +1,6 @@ """Service definitions that handle business logic.""" +from h.services.annotation_authority_queue import AnnotationAuthorityQueueService from h.services.annotation_metadata import AnnotationMetadataService from h.services.annotation_read import AnnotationReadService from h.services.annotation_sync import AnnotationSyncService @@ -49,6 +50,10 @@ def includeme(config): # pragma: no cover config.register_service_factory( "h.services.notification.factory", iface=NotificationService ) + config.register_service_factory( + "h.services.annotation_authority_queue.factory", + iface=AnnotationAuthorityQueueService, + ) # Other services config.register_service_factory( diff --git a/h/services/annotation_authority_queue.py b/h/services/annotation_authority_queue.py new file mode 100644 index 00000000000..518dd4d55a6 --- /dev/null +++ b/h/services/annotation_authority_queue.py @@ -0,0 +1,105 @@ +import json +import logging +from dataclasses import dataclass + +from celery import Celery + +from h.services.annotation_read import AnnotationReadService + +LOG = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class AuthorityQueueConfiguration: + """Celery setting for an authority specific queue.""" + + broker_url: str + queue_name: str + task_name: str + + +class AnnotationAuthorityQueueService: + """A service to publish annotation events to authority specific queues.""" + + def __init__( + self, + authority_queue_config_json: str | None, + annotation_read_service: AnnotationReadService, + annotation_json_service, + ): + self._authority_queue_config = self._parse_authority_queue_config( + authority_queue_config_json + ) + self._annotation_read_service = annotation_read_service + self._annotation_json_service = annotation_json_service + + def publish(self, event_action: str, annotation_id: str) -> None: + annotation = self._annotation_read_service.get_annotation_by_id(annotation_id) + authority_queue_config = self._authority_queue_config.get(annotation.authority) + if not authority_queue_config: + return + + if event_action != "create" or not annotation.mentions: + # For now we'll limit the events to only those that create annotations that contain mentions + return + + annotation_dict = self._annotation_json_service.present_for_user( + annotation=annotation, user=annotation.slim.user + ) + + payload = { + "action": event_action, + "annotation": annotation_dict, + } + + authority_celery = Celery(annotation.authority) + authority_celery.conf.broker_url = authority_queue_config.broker_url + + authority_celery.send_task( + authority_queue_config.task_name, + queue=authority_queue_config.queue_name, + kwargs={"event": payload}, + ) + + def _parse_authority_queue_config( + self, config_json: str | None + ) -> dict[str, AuthorityQueueConfiguration]: + """Parse the authority queue config JSON string into dictionary by authority name.""" + if not config_json: + LOG.debug("No authority queue config found") + return {} + try: + config = json.loads(config_json) + except json.JSONDecodeError: + LOG.exception("Failed to parse authority queue config: %s", config_json) + return {} + + parsed_config = {} + for authority, authority_queue_config in config.items(): + broker_url = authority_queue_config.get("broker_url") + queue_name = authority_queue_config.get("queue_name") + task_name = authority_queue_config.get("task_name") + + if not all([broker_url, queue_name, task_name]): + LOG.error( + "Invalid authority queue config for %s: %s", + authority, + authority_queue_config, + ) + continue + + parsed_config[authority] = AuthorityQueueConfiguration( + broker_url=broker_url, queue_name=queue_name, task_name=task_name + ) + + return parsed_config + + +def factory(_context, request) -> AnnotationAuthorityQueueService: + return AnnotationAuthorityQueueService( + authority_queue_config_json=request.registry.settings.get( + "h.authority_queue_config" + ), + annotation_read_service=request.find_service(AnnotationReadService), + annotation_json_service=request.find_service(name="annotation_json"), + ) diff --git a/h/subscribers.py b/h/subscribers.py index 70fdfdccc63..10c2a748685 100644 --- a/h/subscribers.py +++ b/h/subscribers.py @@ -12,7 +12,7 @@ from h.notification import mention, reply from h.services import NotificationService from h.services.annotation_read import AnnotationReadService -from h.tasks import mailer +from h.tasks import annotations, mailer logger = logging.getLogger(__name__) @@ -158,3 +158,10 @@ def send_mention_notifications(event): recipient=notification.mentioned_user, notification_type=NotificationType.MENTION, ) + + +@subscriber(AnnotationEvent) +def publish_annotation_event_for_authority(event): + annotations.publish_annotation_event_for_authority.delay( + event.action, event.annotation_id + ) diff --git a/h/tasks/annotations.py b/h/tasks/annotations.py index 0d933dd3104..331d9d28b9f 100644 --- a/h/tasks/annotations.py +++ b/h/tasks/annotations.py @@ -1,5 +1,6 @@ from h.db.types import URLSafeUUID from h.models import Annotation +from h.services.annotation_authority_queue import AnnotationAuthorityQueueService from h.services.annotation_write import AnnotationWriteService from h.tasks.celery import celery, get_task_logger @@ -38,3 +39,11 @@ def sync_annotation_slim(limit): # Remove all jobs we've processed queue_svc.delete(jobs) + + +@celery.task +def publish_annotation_event_for_authority(event_action, annotation_id): + """Optionally publish an annotation event to the authority's message queue.""" + celery.request.find_service(AnnotationAuthorityQueueService).publish( + event_action, annotation_id + ) diff --git a/tests/common/fixtures/services.py b/tests/common/fixtures/services.py index 4fe28cc8448..dc0d0ebd5cf 100644 --- a/tests/common/fixtures/services.py +++ b/tests/common/fixtures/services.py @@ -4,6 +4,7 @@ from h.services import MentionService, NotificationService from h.services.analytics import AnalyticsService +from h.services.annotation_authority_queue import AnnotationAuthorityQueueService from h.services.annotation_delete import AnnotationDeleteService from h.services.annotation_json import AnnotationJSONService from h.services.annotation_metadata import AnnotationMetadataService @@ -48,6 +49,7 @@ __all__ = ( "analytics_service", + "annotation_authority_queue_service", "annotation_delete_service", "annotation_json_service", "annotation_metadata_service", @@ -325,6 +327,11 @@ def notification_service(mock_service): return mock_service(NotificationService) +@pytest.fixture +def annotation_authority_queue_service(mock_service): + return mock_service(AnnotationAuthorityQueueService) + + @pytest.fixture def feature_service(mock_service): return mock_service(FeatureService, name="feature") diff --git a/tests/unit/h/services/annotation_authority_queue_test.py b/tests/unit/h/services/annotation_authority_queue_test.py new file mode 100644 index 00000000000..a89c7469191 --- /dev/null +++ b/tests/unit/h/services/annotation_authority_queue_test.py @@ -0,0 +1,143 @@ +import json +from unittest.mock import sentinel + +import pytest + +from h.services.annotation_authority_queue import ( + AnnotationAuthorityQueueService, + AuthorityQueueConfiguration, + factory, +) + + +class TestAnnotationAuthorityQueueService: + def test_publish_when_no_authority_queue_config(self, svc, Celery): + svc._authority_queue_config = {} # noqa: SLF001 + + svc.publish("create", sentinel.annotation_id) + + Celery.assert_not_called() + + def test_publish_when_no_creation( + self, svc, Celery, annotation_read_service, annotation + ): + annotation_read_service.get_annotation_by_id.return_value = annotation + + svc.publish("edit", sentinel.annotation_id) + + Celery.assert_not_called() + + def test_publish_with_no_mentions(self, svc, Celery, annotation_read_service): + svc.publish("create", sentinel.annotation_id) + + annotation_read_service.get_annotation_by_id.assert_called_once_with( + sentinel.annotation_id + ) + Celery.assert_not_called() + + def test_publish( + self, svc, Celery, annotation_read_service, annotation_json_service, annotation + ): + annotation_read_service.get_annotation_by_id.return_value = annotation + + svc.publish("create", sentinel.annotation_id) + + annotation_read_service.get_annotation_by_id.assert_called_once_with( + sentinel.annotation_id + ) + annotation_json_service.present_for_user.assert_called_once_with( + annotation=annotation_read_service.get_annotation_by_id.return_value, + user=annotation_read_service.get_annotation_by_id.return_value.slim.user, + ) + Celery.assert_called_once_with( + annotation_read_service.get_annotation_by_id.return_value.authority + ) + Celery.return_value.conf.broker_url = "broker_url" + Celery.return_value.send_task.assert_called_once_with( + "task", + queue="queue", + kwargs={ + "event": { + "action": "create", + "annotation": annotation_json_service.present_for_user.return_value, + } + }, + ) + + def test_parse_config_when_not_present(self, svc): + assert not svc._parse_authority_queue_config(None) # noqa: SLF001 + + def test_parse_config_when_invalid_json(self, svc): + assert not svc._parse_authority_queue_config("{") # noqa: SLF001 + + def test_parse_config_when_missing_key(self, svc): + assert not svc._parse_authority_queue_config( # noqa: SLF001 + json.dumps({"lms": {"broker_url": "url"}}) + ) + + def test_parse_config_with_config(self, svc, valid_config): + config = svc._parse_authority_queue_config(valid_config) # noqa: SLF001 + + assert config["lms"] == AuthorityQueueConfiguration( + broker_url="url", queue_name="queue", task_name="task" + ) + + @pytest.fixture + def Celery(self, patch): + return patch("h.services.annotation_authority_queue.Celery") + + @pytest.fixture + def valid_config(self): + return json.dumps( + { + "lms": { + "broker_url": "url", + "queue_name": "queue", + "task_name": "task", + } + } + ) + + @pytest.fixture + def annotation(self, factories): + user = factories.User(authority="lms") + return factories.Annotation( + slim=factories.AnnotationSlim(user=user), + userid=user.userid, + mentions=[factories.Mention()], + ) + + @pytest.fixture + def svc(self, annotation_read_service, annotation_json_service, valid_config): + return AnnotationAuthorityQueueService( + authority_queue_config_json=valid_config, + annotation_read_service=annotation_read_service, + annotation_json_service=annotation_json_service, + ) + + +class TestFactory: + def test_it( + self, + pyramid_request, + annotation_read_service, + AnnotationAuthorityQueueService, + annotation_json_service, + ): + service = factory(sentinel.context, pyramid_request) + + AnnotationAuthorityQueueService.assert_called_once_with( + authority_queue_config_json=pyramid_request.registry.settings.get( + "h.authority_queue_config" + ), + annotation_read_service=annotation_read_service, + annotation_json_service=annotation_json_service, + ) + + assert service == AnnotationAuthorityQueueService.return_value + + @pytest.fixture + def AnnotationAuthorityQueueService(self, patch): + return patch( + "h.services.annotation_authority_queue.AnnotationAuthorityQueueService" + ) diff --git a/tests/unit/h/subscribers_test.py b/tests/unit/h/subscribers_test.py index 8f032f28a45..cbb1b373ae2 100644 --- a/tests/unit/h/subscribers_test.py +++ b/tests/unit/h/subscribers_test.py @@ -308,6 +308,17 @@ def transaction_manager(self, pyramid_request): return pyramid_request.tm +class TestPublishAnnotationEventForAuthority: + def test_it(self, pyramid_request, annotations): + event = AnnotationEvent(pyramid_request, {"id": "any"}, "action") + + subscribers.publish_annotation_event_for_authority(event) + + annotations.publish_annotation_event_for_authority.delay.assert_called_once_with( + event.action, event.annotation_id + ) + + @pytest.fixture(autouse=True) def reply(patch): return patch("h.subscribers.reply") @@ -325,6 +336,11 @@ def mailer(patch): return patch("h.subscribers.mailer") +@pytest.fixture(autouse=True) +def annotations(patch): + return patch("h.subscribers.annotations") + + @pytest.fixture(autouse=True) def emails(patch): return patch("h.subscribers.emails") diff --git a/tests/unit/h/tasks/annotations_test.py b/tests/unit/h/tasks/annotations_test.py index 415399b44b1..e7fae4cd8e8 100644 --- a/tests/unit/h/tasks/annotations_test.py +++ b/tests/unit/h/tasks/annotations_test.py @@ -1,6 +1,9 @@ import pytest -from h.tasks.annotations import sync_annotation_slim +from h.tasks.annotations import ( + publish_annotation_event_for_authority, + sync_annotation_slim, +) class TestSyncAnnotationSlim: @@ -49,8 +52,18 @@ def test_it_with_no_pending_jobs(self, queue_service, annotation_write_service): annotation_write_service.upsert_annotation_slim.assert_not_called() - @pytest.fixture(autouse=True) - def celery(self, patch, pyramid_request): - cel = patch("h.tasks.annotations.celery", autospec=False) - cel.request = pyramid_request - return cel + +class TestPublishAnnotationEventForAuthority: + def test_it(self, annotation_authority_queue_service): + publish_annotation_event_for_authority("create", "123") + + annotation_authority_queue_service.publish.assert_called_once_with( + "create", "123" + ) + + +@pytest.fixture(autouse=True) +def celery(patch, pyramid_request): + cel = patch("h.tasks.annotations.celery", autospec=False) + cel.request = pyramid_request + return cel