diff --git a/changelog.d/18290.feature b/changelog.d/18290.feature new file mode 100644 index 00000000000..426f6ab31a6 --- /dev/null +++ b/changelog.d/18290.feature @@ -0,0 +1 @@ +Add support for sending notification counts and thread notification counts in simplified sliding sync mode. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 459d3c3e248..0abbfb425ef 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -15,7 +15,17 @@ import itertools import logging from itertools import chain -from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + AbstractSet, + Dict, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, +) from prometheus_client import Histogram from typing_extensions import assert_never @@ -38,6 +48,7 @@ tag_args, trace, ) +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.stream import PaginateFunction @@ -245,11 +256,31 @@ async def current_sync_for_user( to_token=to_token, ) + # fetch the user's receipts between the two points: these will be factor + # in deciding whether to send the room, since it may have changed their + # notification counts + receipts = await self.store.get_linearized_receipts_for_user_in_rooms( + user_id=user_id, + room_ids=interested_rooms.relevant_room_map.keys(), + from_key=from_token.stream_token.receipt_key if from_token else None, + to_key=to_token.receipt_key, + ) + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send( + sync_config.user, + previous_connection_state, + from_token.stream_token if from_token else None, + to_token, + interested_rooms.relevant_room_map, + receipts, + ) + lists = interested_rooms.lists relevant_room_map = interested_rooms.relevant_room_map all_rooms = interested_rooms.all_rooms room_membership_for_user_map = interested_rooms.room_membership_for_user_map - relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} @@ -272,6 +303,7 @@ async def handle_room(room_id: str) -> None: to_token=to_token, newly_joined=room_id in interested_rooms.newly_joined_rooms, is_dm=room_id in interested_rooms.dm_room_ids, + room_receipts=receipts[room_id] if room_id in receipts else None, ) # Filter out empty room results during incremental sync @@ -296,6 +328,7 @@ async def handle_room(room_id: str) -> None: actual_room_response_map=rooms, from_token=from_token, to_token=to_token, + user_receipts=receipts, ) if has_lists or has_room_subscriptions: @@ -543,6 +576,7 @@ async def get_room_sync_data( to_token: StreamToken, newly_joined: bool, is_dm: bool, + room_receipts: Optional[Sequence[ReceiptInRoom]], ) -> SlidingSyncResult.RoomResult: """ Fetch room data for the sync response. @@ -560,6 +594,8 @@ async def get_room_sync_data( to_token: The point in the stream to sync up to. newly_joined: If the user has newly joined the room is_dm: Whether the room is a DM room + room_receipts: Any read receipts from the in question in that room between + from_token and to_token """ user = sync_config.user @@ -1312,6 +1348,11 @@ async def get_room_sync_data( set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) + unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user( + room_id, + sync_config.user.to_string(), + ) + return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, @@ -1329,11 +1370,8 @@ async def get_room_sync_data( bump_stamp=bump_stamp, joined_count=joined_count, invited_count=invited_count, - # TODO: These are just dummy values. We could potentially just remove these - # since notifications can only really be done correctly on the client anyway - # (encrypted rooms). - notification_count=0, - highlight_count=0, + notif_counts=unread_notifs, + room_receipts=room_receipts, ) @trace diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py index 077887ec321..6f1930a57fb 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py @@ -80,6 +80,7 @@ async def get_extensions_response( actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult], to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], + user_receipts: Mapping[str, Sequence[ReceiptInRoom]], ) -> SlidingSyncResult.Extensions: """Handle extension requests. @@ -95,6 +96,7 @@ async def get_extensions_response( Sliding Sync response. to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. + user_receipts: Map of room ID to list of the syncing user's receipts in the room. """ if sync_config.extensions is None: @@ -142,6 +144,7 @@ async def get_extensions_response( receipts_request=sync_config.extensions.receipts, to_token=to_token, from_token=from_token, + user_receipts=user_receipts, ) typing_coro = None @@ -619,6 +622,7 @@ async def get_receipts_extension_response( receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], + user_receipts: Mapping[str, Sequence[ReceiptInRoom]], ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: """Handle Receipts extension (MSC3960) @@ -635,6 +639,7 @@ async def get_receipts_extension_response( account_data_request: The account_data extension from the request to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. + user_receipts: Map of room ID to list of the syncing user's receipts in the room. """ # Skip if the extension is not enabled if not receipts_request.enabled: @@ -726,15 +731,6 @@ async def handle_previously_room(room_id: str) -> None: ) if initial_rooms: - # We also always send down receipts for the current user. - user_receipts = ( - await self.store.get_linearized_receipts_for_user_in_rooms( - user_id=sync_config.user.to_string(), - room_ids=initial_rooms, - to_key=to_token.receipt_key, - ) - ) - # For rooms we haven't previously sent down, we could send all receipts # from that room but we only want to include receipts for events # in the timeline to avoid bloating and blowing up the sync response @@ -752,22 +748,23 @@ async def handle_previously_room(room_id: str) -> None: # Combine the receipts for a room and add them to # `fetched_receipts` for room_id in initial_receipts.keys() | user_receipts.keys(): - receipt_content = ReceiptInRoom.merge_to_content( - list( - itertools.chain( - initial_receipts.get(room_id, []), - user_receipts.get(room_id, []), + if room_id in initial_rooms: + receipt_content = ReceiptInRoom.merge_to_content( + list( + itertools.chain( + initial_receipts.get(room_id, []), + user_receipts.get(room_id, []), + ) ) ) - ) - fetched_receipts.append( - { - "room_id": room_id, - "type": EduTypes.RECEIPT, - "content": receipt_content, - } - ) + fetched_receipts.append( + { + "room_id": room_id, + "type": EduTypes.RECEIPT, + "content": receipt_content, + } + ) fetched_receipts = ReceiptEventSource.filter_out_private_receipts( fetched_receipts, sync_config.user.to_string() diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index a1730b7e05b..7e9e09c6cac 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -24,6 +24,7 @@ Literal, Mapping, Optional, + Sequence, Set, Tuple, Union, @@ -44,6 +45,7 @@ from synapse.events import StrippedStateEvent from synapse.events.utils import parse_stripped_state_event from synapse.logging.opentracing import start_active_span, trace +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.storage.databases.main.state import ( ROOM_UNKNOWN_SENTINEL, Sentinel as StateSentinel, @@ -102,10 +104,6 @@ class SlidingSyncInterestedRooms: lists: A mapping from list name to the list result for the response relevant_room_map: A map from rooms that match the sync request to their room sync config. - relevant_rooms_to_send_map: Subset of `relevant_room_map` that - includes the rooms that *may* have relevant updates. Rooms not - in this map will definitely not have room updates (though - extensions may have updates in these rooms). newly_joined_rooms: The set of rooms that were joined in the token range and the user is still joined to at the end of this range. newly_left_rooms: The set of rooms that we left in the token range @@ -115,7 +113,6 @@ class SlidingSyncInterestedRooms: lists: Mapping[str, SlidingSyncResult.SlidingWindowList] relevant_room_map: Mapping[str, RoomSyncConfig] - relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] all_rooms: Set[str] room_membership_for_user_map: Mapping[str, RoomsForUserType] @@ -128,7 +125,6 @@ def empty() -> "SlidingSyncInterestedRooms": return SlidingSyncInterestedRooms( lists={}, relevant_room_map={}, - relevant_rooms_to_send_map={}, all_rooms=set(), room_membership_for_user_map={}, newly_joined_rooms=set(), @@ -547,16 +543,9 @@ async def _compute_interested_rooms_new_tables( relevant_room_map[room_id] = room_sync_config - # Filtered subset of `relevant_room_map` for rooms that may have updates - # (in the event stream) - relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( - previous_connection_state, from_token, relevant_room_map - ) - return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, all_rooms=all_rooms, room_membership_for_user_map=room_membership_for_user_map, newly_joined_rooms=newly_joined_room_ids, @@ -735,16 +724,9 @@ async def _compute_interested_rooms_fallback( relevant_room_map[room_id] = room_sync_config - # Filtered subset of `relevant_room_map` for rooms that may have updates - # (in the event stream) - relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( - previous_connection_state, from_token, relevant_room_map - ) - return SlidingSyncInterestedRooms( lists=lists, relevant_room_map=relevant_room_map, - relevant_rooms_to_send_map=relevant_rooms_to_send_map, all_rooms=all_rooms, room_membership_for_user_map=room_membership_for_user_map, newly_joined_rooms=newly_joined_room_ids, @@ -752,18 +734,21 @@ async def _compute_interested_rooms_fallback( dm_room_ids=dm_room_ids, ) - async def _filter_relevant_rooms_to_send( + def filter_relevant_rooms_to_send( self, + user_id: UserID, previous_connection_state: PerConnectionState, from_token: Optional[StreamToken], - relevant_room_map: Dict[str, RoomSyncConfig], - ) -> Dict[str, RoomSyncConfig]: + to_token: StreamToken, + relevant_room_map: Mapping[str, RoomSyncConfig], + receipts: Mapping[str, Sequence[ReceiptInRoom]], + ) -> Mapping[str, RoomSyncConfig]: """Filters the `relevant_room_map` down to those rooms that may have updates we need to fetch and return.""" # Filtered subset of `relevant_room_map` for rooms that may have updates # (in the event stream) - relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map + relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] = relevant_room_map if relevant_room_map: with start_active_span("filter_relevant_rooms_to_send"): if from_token: @@ -814,6 +799,11 @@ async def _filter_relevant_rooms_to_send( ) ) rooms_should_send.update(rooms_that_have_updates) + + # Any rooms with receipts should be considered for sending as their + # notification counts may have changed. + rooms_should_send.update(receipts.keys()) + relevant_rooms_to_send_map = { room_id: room_sync_config for room_id, room_sync_config in relevant_room_map.items() diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 4fb9c0c8e7a..a5e6903e522 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1066,10 +1066,19 @@ async def encode_rooms( serialized_rooms: Dict[str, JsonDict] = {} for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { - "notification_count": room_result.notification_count, - "highlight_count": room_result.highlight_count, + "notification_count": room_result.notif_counts.main_timeline.notify_count, + "highlight_count": room_result.notif_counts.main_timeline.highlight_count, } + if len(room_result.notif_counts.threads) > 0: + serialized_rooms[room_id]["unread_thread_notifications"] = { + thread_id: { + "notification_count": counts.notify_count, + "highlight_count": counts.highlight_count, + } + for thread_id, counts in room_result.notif_counts.threads.items() + } + if room_result.bump_stamp is not None: serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index f42023418e2..c1572c69921 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -547,13 +547,20 @@ def _get_unread_counts_by_receipt_txn( # If the user has no receipts in the room, retrieve the stream ordering for # the latest membership event from this user in this room (which we assume is # a join). + # Sometimes (usually state resets) there can be no membership event either, + # so we allow None and return no notifications which is probably about + # the best we can do short of failing outright. event_id = self.db_pool.simple_select_one_onecol_txn( txn=txn, table="local_current_membership", keyvalues={"room_id": room_id, "user_id": user_id}, retcol="event_id", + allow_none=True, ) + if event_id is None: + return _EMPTY_ROOM_NOTIF_COUNTS + stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) return self._get_unread_counts_by_pos_txn( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 99643315107..2a096170585 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -666,7 +666,11 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: return results async def get_linearized_receipts_for_user_in_rooms( - self, user_id: str, room_ids: StrCollection, to_key: MultiWriterStreamToken + self, + user_id: str, + room_ids: StrCollection, + from_key: Optional[MultiWriterStreamToken] = None, + to_key: Optional[MultiWriterStreamToken] = None, ) -> Mapping[str, Sequence[ReceiptInRoom]]: """Fetch all receipts for the user in the given room. @@ -685,11 +689,18 @@ def get_linearized_receipts_for_user_in_rooms_txn( sql = f""" SELECT instance_name, stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized - WHERE {clause} AND user_id = ? AND stream_id <= ? + WHERE {clause} AND user_id = ? """ args.append(user_id) - args.append(to_key.get_max_stream_pos()) + + if from_key is not None: + sql += " AND stream_id >= ?" + args.append(from_key.get_max_stream_pos()) + + if to_key is not None: + sql += " AND stream_id <= ?" + args.append(to_key.get_max_stream_pos()) txn.execute(sql, args) diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 3ebd334a6d5..ff51404549b 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -40,6 +40,8 @@ from synapse._pydantic_compat import Extra from synapse.api.constants import EventTypes from synapse.events import EventBase +from synapse.storage.databases.main.event_push_actions import RoomNotifCounts +from synapse.storage.databases.main.receipts import ReceiptInRoom from synapse.types import ( DeviceListUpdates, JsonDict, @@ -163,10 +165,10 @@ class RoomResult: own user ID. (same as sync `v2 m.joined_member_count`) invited_count: The number of users with membership of invite. (same as sync v2 `m.invited_member_count`) - notification_count: The total number of unread notifications for this room. (same - as sync v2) - highlight_count: The number of unread notifications for this room with the highlight - flag set. (same as sync v2) + notif_counts: An object containing the number of unread notifications for both + the main thread and any other threads. + room_receipts: A sequence of any read receipts from the user in question in + the room, used to calculate whether the notif_counts could have changed """ @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -197,8 +199,8 @@ class StrippedHero: bump_stamp: Optional[int] joined_count: Optional[int] invited_count: Optional[int] - notification_count: int - highlight_count: int + notif_counts: RoomNotifCounts + room_receipts: Optional[Sequence[ReceiptInRoom]] def __bool__(self) -> bool: return ( @@ -215,6 +217,7 @@ def __bool__(self) -> bool: or bool(self.required_state) or bool(self.timeline_events) or bool(self.stripped_state) + or bool(self.room_receipts) ) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index 6e7700b533f..4dc31fd00d5 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -433,8 +433,11 @@ def test_receipts_incremental_sync(self) -> None: set(), exact=True, ) + # The room should be be in user1's sync because they sent a read receipt... + self.assertIn(room_id1, response_body["rooms"]) + # but there should be no timeline events # No events in the timeline since they were sent before the `from_token` - self.assertNotIn(room_id1, response_body["rooms"]) + self.assertNotIn("timeline", response_body["rooms"][room_id1]) # Check room3: # diff --git a/tests/rest/client/sliding_sync/test_notification_counts.py b/tests/rest/client/sliding_sync/test_notification_counts.py new file mode 100644 index 00000000000..e762a6ca07d --- /dev/null +++ b/tests/rest/client/sliding_sync/test_notification_counts.py @@ -0,0 +1,143 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +from http import HTTPStatus + +from twisted.test.proto_helpers import MemoryReactor + +import synapse.rest.admin +from synapse.api.constants import EventTypes, ReceiptTypes, RelationTypes +from synapse.rest.client import login, receipts, room, sync +from synapse.server import HomeServer +from synapse.util import Clock + +from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase + + +class SlidingSyncNotificationCountsTestCase(SlidingSyncBase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + receipts.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + super().prepare(reactor, clock, hs) + + def setUp(self) -> None: + super().setUp() + + self.user1_id = self.register_user("user1", "pass") + self.user1_tok = self.login(self.user1_id, "pass") + self.user2_id = self.register_user("user2", "pass") + self.user2_tok = self.login(self.user2_id, "pass") + + # Create room1 + self.room_id1 = self.helper.create_room_as(self.user2_id, tok=self.user2_tok) + self.helper.join(self.room_id1, self.user1_id, tok=self.user1_tok) + self.helper.join(self.room_id1, self.user2_id, tok=self.user2_tok) + + self.sync_req = { + "lists": {}, + "room_subscriptions": { + self.room_id1: { + "required_state": [], + "timeline_limit": 1, + }, + }, + } + sync_resp, self.user1_start_token = self.do_sync( + self.sync_req, tok=self.user1_tok + ) + + # send a read receipt to make sure the counts are 0 + channel = self.make_request( + "POST", + f"/rooms/{self.room_id1}/receipt/{ReceiptTypes.READ}/{sync_resp['rooms'][self.room_id1]['timeline'][0]['event_id']}", + {}, + access_token=self.user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + def test_main_thread_notification_count(self) -> None: + # send an event from user 2 + self.helper.send(self.room_id1, body="new event", tok=self.user2_tok) + + # user 1 syncs + sync_resp, from_token = self.do_sync( + self.sync_req, tok=self.user1_tok, since=self.user1_start_token + ) + + # notification count should now be 1 + self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1) + + def test_main_thread_highlight_count(self) -> None: + # send an event that mentions user1 + self.helper.send(self.room_id1, body="Hello user1", tok=self.user2_tok) + + # user 1 syncs + sync_resp, from_token = self.do_sync( + self.sync_req, tok=self.user1_tok, since=self.user1_start_token + ) + + # notification and highlight count should be 1 + self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1) + self.assertEqual(sync_resp["rooms"][self.room_id1]["highlight_count"], 1) + + def test_thread_notification_count(self) -> None: + room1_event_response1 = self.helper.send( + self.room_id1, body="Thread root", tok=self.user2_tok + ) + + thread_id = room1_event_response1["event_id"] + + _, from_token = self.do_sync( + self.sync_req, tok=self.user1_tok, since=self.user1_start_token + ) + + threaded_event_content = { + "msgtype": "m.text", + "body": "threaded response", + "m.relates_to": { + "event_id": thread_id, + "rel_type": RelationTypes.THREAD, + }, + } + + self.helper.send_event( + self.room_id1, + EventTypes.Message, + threaded_event_content, + None, + self.user2_tok, + HTTPStatus.OK, + custom_headers=None, + ) + + sync_resp, _ = self.do_sync(self.sync_req, tok=self.user1_tok, since=from_token) + + self.assertEqual( + sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][ + "notification_count" + ], + 1, + ) + self.assertEqual( + sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][ + "highlight_count" + ], + 0, + )