Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish annotation tasks to an authority queue #9402

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions h/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def configure(environ=None, settings=None): # noqa: PLR0915
# Reporting settings
settings_manager.set("h.report.fdw_users", "REPORT_FDW_USERS", type_=aslist)

# Optional configuration for publishing annotation events to an authority's queue
settings_manager.set("h.authority_queue_config", "AUTHORITY_QUEUE_CONFIG")

# Debug/development settings
settings_manager.set("debug_query", "DEBUG_QUERY")

Expand Down
5 changes: 5 additions & 0 deletions h/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Service definitions that handle business logic."""

from h.services.annotation_authority_queue import AnnotationAuthorityQueueService
from h.services.annotation_metadata import AnnotationMetadataService
from h.services.annotation_read import AnnotationReadService
from h.services.annotation_sync import AnnotationSyncService
Expand Down Expand Up @@ -49,6 +50,10 @@ def includeme(config): # pragma: no cover
config.register_service_factory(
"h.services.notification.factory", iface=NotificationService
)
config.register_service_factory(
"h.services.annotation_authority_queue.factory",
iface=AnnotationAuthorityQueueService,
)

# Other services
config.register_service_factory(
Expand Down
105 changes: 105 additions & 0 deletions h/services/annotation_authority_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
import logging
from dataclasses import dataclass

from celery import Celery

from h.services.annotation_read import AnnotationReadService

LOG = logging.getLogger(__name__)


@dataclass(frozen=True)
class AuthorityQueueConfiguration:
"""Celery setting for an authority specific queue."""

broker_url: str
queue_name: str
task_name: str


class AnnotationAuthorityQueueService:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we wouldn't need a separate service for this, but as this is going to be only used by LMS for now it isolates it from the rest of the codebase.

"""A service to publish annotation events to authority specific queues."""

def __init__(
self,
authority_queue_config_json: str | None,
annotation_read_service: AnnotationReadService,
annotation_json_service,
):
self._authority_queue_config = self._parse_authority_queue_config(
authority_queue_config_json
)
self._annotation_read_service = annotation_read_service
self._annotation_json_service = annotation_json_service

def publish(self, event_action: str, annotation_id: str) -> None:
annotation = self._annotation_read_service.get_annotation_by_id(annotation_id)
authority_queue_config = self._authority_queue_config.get(annotation.authority)
if not authority_queue_config:
return

if event_action != "create" or not annotation.mentions:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limits the amount of messages created by this to... just us testing for now.

# For now we'll limit the events to only those that create annotations that contain mentions
return

annotation_dict = self._annotation_json_service.present_for_user(
annotation=annotation, user=annotation.slim.user
)

payload = {
"action": event_action,
"annotation": annotation_dict,
}

authority_celery = Celery(annotation.authority)
authority_celery.conf.broker_url = authority_queue_config.broker_url

authority_celery.send_task(
authority_queue_config.task_name,
queue=authority_queue_config.queue_name,
kwargs={"event": payload},
)

def _parse_authority_queue_config(
self, config_json: str | None
) -> dict[str, AuthorityQueueConfiguration]:
"""Parse the authority queue config JSON string into dictionary by authority name."""
if not config_json:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being careful, if no config, broken config or any other issue, we just log and continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only checked once on the server instantiation.

LOG.debug("No authority queue config found")
return {}
try:
config = json.loads(config_json)
except json.JSONDecodeError:
LOG.exception("Failed to parse authority queue config: %s", config_json)
return {}

parsed_config = {}
for authority, authority_queue_config in config.items():
broker_url = authority_queue_config.get("broker_url")
queue_name = authority_queue_config.get("queue_name")
task_name = authority_queue_config.get("task_name")

if not all([broker_url, queue_name, task_name]):
LOG.error(
"Invalid authority queue config for %s: %s",
authority,
authority_queue_config,
)
continue

parsed_config[authority] = AuthorityQueueConfiguration(
broker_url=broker_url, queue_name=queue_name, task_name=task_name
)

return parsed_config


def factory(_context, request) -> AnnotationAuthorityQueueService:
return AnnotationAuthorityQueueService(
authority_queue_config_json=request.registry.settings.get(
"h.authority_queue_config"
),
annotation_read_service=request.find_service(AnnotationReadService),
annotation_json_service=request.find_service(name="annotation_json"),
)
9 changes: 8 additions & 1 deletion h/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from h.notification import mention, reply
from h.services import NotificationService
from h.services.annotation_read import AnnotationReadService
from h.tasks import mailer
from h.tasks import annotations, mailer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -158,3 +158,10 @@ def send_mention_notifications(event):
recipient=notification.mentioned_user,
notification_type=NotificationType.MENTION,
)


@subscriber(AnnotationEvent)
def publish_annotation_event_for_authority(event):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we do another level of indirection here, the subscriber creates a celery task, that way if this LMS only feature creates any problems the annotations will be created fine.

annotations.publish_annotation_event_for_authority.delay(
event.action, event.annotation_id
)
9 changes: 9 additions & 0 deletions h/tasks/annotations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from h.db.types import URLSafeUUID
from h.models import Annotation
from h.services.annotation_authority_queue import AnnotationAuthorityQueueService
from h.services.annotation_write import AnnotationWriteService
from h.tasks.celery import celery, get_task_logger

Expand Down Expand Up @@ -38,3 +39,11 @@ def sync_annotation_slim(limit):

# Remove all jobs we've processed
queue_svc.delete(jobs)


@celery.task
def publish_annotation_event_for_authority(event_action, annotation_id):
"""Optionally publish an annotation event to the authority's message queue."""
celery.request.find_service(AnnotationAuthorityQueueService).publish(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The celery task just calls the service method, not much to see here.

event_action, annotation_id
)
7 changes: 7 additions & 0 deletions tests/common/fixtures/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from h.services import MentionService, NotificationService
from h.services.analytics import AnalyticsService
from h.services.annotation_authority_queue import AnnotationAuthorityQueueService
from h.services.annotation_delete import AnnotationDeleteService
from h.services.annotation_json import AnnotationJSONService
from h.services.annotation_metadata import AnnotationMetadataService
Expand Down Expand Up @@ -48,6 +49,7 @@

__all__ = (
"analytics_service",
"annotation_authority_queue_service",
"annotation_delete_service",
"annotation_json_service",
"annotation_metadata_service",
Expand Down Expand Up @@ -325,6 +327,11 @@ def notification_service(mock_service):
return mock_service(NotificationService)


@pytest.fixture
def annotation_authority_queue_service(mock_service):
return mock_service(AnnotationAuthorityQueueService)


@pytest.fixture
def feature_service(mock_service):
return mock_service(FeatureService, name="feature")
Expand Down
143 changes: 143 additions & 0 deletions tests/unit/h/services/annotation_authority_queue_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import json
from unittest.mock import sentinel

import pytest

from h.services.annotation_authority_queue import (
AnnotationAuthorityQueueService,
AuthorityQueueConfiguration,
factory,
)


class TestAnnotationAuthorityQueueService:
def test_publish_when_no_authority_queue_config(self, svc, Celery):
svc._authority_queue_config = {} # noqa: SLF001

svc.publish("create", sentinel.annotation_id)

Celery.assert_not_called()

def test_publish_when_no_creation(
self, svc, Celery, annotation_read_service, annotation
):
annotation_read_service.get_annotation_by_id.return_value = annotation

svc.publish("edit", sentinel.annotation_id)

Celery.assert_not_called()

def test_publish_with_no_mentions(self, svc, Celery, annotation_read_service):
svc.publish("create", sentinel.annotation_id)

annotation_read_service.get_annotation_by_id.assert_called_once_with(
sentinel.annotation_id
)
Celery.assert_not_called()

def test_publish(
self, svc, Celery, annotation_read_service, annotation_json_service, annotation
):
annotation_read_service.get_annotation_by_id.return_value = annotation

svc.publish("create", sentinel.annotation_id)

annotation_read_service.get_annotation_by_id.assert_called_once_with(
sentinel.annotation_id
)
annotation_json_service.present_for_user.assert_called_once_with(
annotation=annotation_read_service.get_annotation_by_id.return_value,
user=annotation_read_service.get_annotation_by_id.return_value.slim.user,
)
Celery.assert_called_once_with(
annotation_read_service.get_annotation_by_id.return_value.authority
)
Celery.return_value.conf.broker_url = "broker_url"
Celery.return_value.send_task.assert_called_once_with(
"task",
queue="queue",
kwargs={
"event": {
"action": "create",
"annotation": annotation_json_service.present_for_user.return_value,
}
},
)

def test_parse_config_when_not_present(self, svc):
assert not svc._parse_authority_queue_config(None) # noqa: SLF001

def test_parse_config_when_invalid_json(self, svc):
assert not svc._parse_authority_queue_config("{") # noqa: SLF001

def test_parse_config_when_missing_key(self, svc):
assert not svc._parse_authority_queue_config( # noqa: SLF001
json.dumps({"lms": {"broker_url": "url"}})
)

def test_parse_config_with_config(self, svc, valid_config):
config = svc._parse_authority_queue_config(valid_config) # noqa: SLF001

assert config["lms"] == AuthorityQueueConfiguration(
broker_url="url", queue_name="queue", task_name="task"
)

@pytest.fixture
def Celery(self, patch):
return patch("h.services.annotation_authority_queue.Celery")

@pytest.fixture
def valid_config(self):
return json.dumps(
{
"lms": {
"broker_url": "url",
"queue_name": "queue",
"task_name": "task",
}
}
)

@pytest.fixture
def annotation(self, factories):
user = factories.User(authority="lms")
return factories.Annotation(
slim=factories.AnnotationSlim(user=user),
userid=user.userid,
mentions=[factories.Mention()],
)

@pytest.fixture
def svc(self, annotation_read_service, annotation_json_service, valid_config):
return AnnotationAuthorityQueueService(
authority_queue_config_json=valid_config,
annotation_read_service=annotation_read_service,
annotation_json_service=annotation_json_service,
)


class TestFactory:
def test_it(
self,
pyramid_request,
annotation_read_service,
AnnotationAuthorityQueueService,
annotation_json_service,
):
service = factory(sentinel.context, pyramid_request)

AnnotationAuthorityQueueService.assert_called_once_with(
authority_queue_config_json=pyramid_request.registry.settings.get(
"h.authority_queue_config"
),
annotation_read_service=annotation_read_service,
annotation_json_service=annotation_json_service,
)

assert service == AnnotationAuthorityQueueService.return_value

@pytest.fixture
def AnnotationAuthorityQueueService(self, patch):
return patch(
"h.services.annotation_authority_queue.AnnotationAuthorityQueueService"
)
16 changes: 16 additions & 0 deletions tests/unit/h/subscribers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,17 @@ def transaction_manager(self, pyramid_request):
return pyramid_request.tm


class TestPublishAnnotationEventForAuthority:
def test_it(self, pyramid_request, annotations):
event = AnnotationEvent(pyramid_request, {"id": "any"}, "action")

subscribers.publish_annotation_event_for_authority(event)

annotations.publish_annotation_event_for_authority.delay.assert_called_once_with(
event.action, event.annotation_id
)


@pytest.fixture(autouse=True)
def reply(patch):
return patch("h.subscribers.reply")
Expand All @@ -325,6 +336,11 @@ def mailer(patch):
return patch("h.subscribers.mailer")


@pytest.fixture(autouse=True)
def annotations(patch):
return patch("h.subscribers.annotations")


@pytest.fixture(autouse=True)
def emails(patch):
return patch("h.subscribers.emails")
Expand Down
Loading
Loading