-
Notifications
You must be signed in to change notification settings - Fork 322
Add support for sending notification counts in simplified sliding sync #18290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
) | ||
|
||
if event_id is None: | ||
return _EMPTY_ROOM_NOTIF_COUNTS | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit I'm particularly unsure about but was needed to make https://github.com/element-hq/synapse/blob/develop/tests/rest/client/sliding_sync/test_sliding_sync.py#L1111 test pass again since this function just threw in that case. This is basically lying. We could fail the sync, but the test explicitly tests that it works. We could assume the stream ordering is the lowest possible which might be more accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in a lot of cases, a normal user would probably have a read receipt in the room and we would return that like normal with the logic above.
But in our tests, we never send any read receipts so we hit this logic path. I think it makes sense to return empty notification counts in this case as they've literally never read anything in the room before and were removed before they even tried.
notification_count: int | ||
highlight_count: int | ||
notif_counts: RoomNotifCounts | ||
room_receipts: Optional[Sequence[ReceiptInRoom]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by review:
There is already a receipts extension for Sliding Sync that is implemented based on MSC3960
See tests/rest/client/sliding_sync/test_extension_receipts.py
for how to use it.
Here is where it is implemented:
synapse/synapse/handlers/sliding_sync/extensions.py
Lines 611 to 815 in 0e3c0ae
async def get_receipts_extension_response( | |
self, | |
sync_config: SlidingSyncConfig, | |
previous_connection_state: "PerConnectionState", | |
new_connection_state: "MutablePerConnectionState", | |
actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], | |
actual_room_ids: Set[str], | |
actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult], | |
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, | |
to_token: StreamToken, | |
from_token: Optional[SlidingSyncStreamToken], | |
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: | |
"""Handle Receipts extension (MSC3960) | |
Args: | |
sync_config: Sync configuration | |
previous_connection_state: The current per-connection state | |
new_connection_state: A mutable copy of the per-connection | |
state, used to record updates to the state. | |
actual_lists: Sliding window API. A map of list key to list results in the | |
Sliding Sync response. | |
actual_room_ids: The actual room IDs in the the Sliding Sync response. | |
actual_room_response_map: A map of room ID to room results in the the | |
Sliding Sync response. | |
account_data_request: The account_data extension from the request | |
to_token: The point in the stream to sync up to. | |
from_token: The point in the stream to sync from. | |
""" | |
# Skip if the extension is not enabled | |
if not receipts_request.enabled: | |
return None | |
relevant_room_ids = self.find_relevant_room_ids_for_extension( | |
requested_lists=receipts_request.lists, | |
requested_room_ids=receipts_request.rooms, | |
actual_lists=actual_lists, | |
actual_room_ids=actual_room_ids, | |
) | |
room_id_to_receipt_map: Dict[str, JsonMapping] = {} | |
if len(relevant_room_ids) > 0: | |
# We need to handle the different cases depending on if we have sent | |
# down receipts previously or not, so we split the relevant rooms | |
# up into different collections based on status. | |
live_rooms = set() | |
previously_rooms: Dict[str, MultiWriterStreamToken] = {} | |
initial_rooms = set() | |
for room_id in relevant_room_ids: | |
if not from_token: | |
initial_rooms.add(room_id) | |
continue | |
# If we're sending down the room from scratch again for some | |
# reason, we should always resend the receipts as well | |
# (regardless of if we've sent them down before). This is to | |
# mimic the behaviour of what happens on initial sync, where you | |
# get a chunk of timeline with all of the corresponding receipts | |
# for the events in the timeline. | |
# | |
# We also resend down receipts when we "expand" the timeline, | |
# (see the "XXX: Odd behavior" in | |
# `synapse.handlers.sliding_sync`). | |
room_result = actual_room_response_map.get(room_id) | |
if room_result is not None: | |
if room_result.initial or room_result.unstable_expanded_timeline: | |
initial_rooms.add(room_id) | |
continue | |
room_status = previous_connection_state.receipts.have_sent_room(room_id) | |
if room_status.status == HaveSentRoomFlag.LIVE: | |
live_rooms.add(room_id) | |
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: | |
assert room_status.last_token is not None | |
previously_rooms[room_id] = room_status.last_token | |
elif room_status.status == HaveSentRoomFlag.NEVER: | |
initial_rooms.add(room_id) | |
else: | |
assert_never(room_status.status) | |
# The set of receipts that we fetched. Private receipts need to be | |
# filtered out before returning. | |
fetched_receipts = [] | |
# For live rooms we just fetch all receipts in those rooms since the | |
# `since` token. | |
if live_rooms: | |
assert from_token is not None | |
receipts = await self.store.get_linearized_receipts_for_rooms( | |
room_ids=live_rooms, | |
from_key=from_token.stream_token.receipt_key, | |
to_key=to_token.receipt_key, | |
) | |
fetched_receipts.extend(receipts) | |
# For rooms we've previously sent down, but aren't up to date, we | |
# need to use the from token from the room status. | |
if previously_rooms: | |
# Fetch any missing rooms concurrently. | |
async def handle_previously_room(room_id: str) -> None: | |
receipt_token = previously_rooms[room_id] | |
# TODO: Limit the number of receipts we're about to send down | |
# for the room, if its too many we should TODO | |
previously_receipts = ( | |
await self.store.get_linearized_receipts_for_room( | |
room_id=room_id, | |
from_key=receipt_token, | |
to_key=to_token.receipt_key, | |
) | |
) | |
fetched_receipts.extend(previously_receipts) | |
await concurrently_execute( | |
handle_previously_room, previously_rooms.keys(), 20 | |
) | |
if initial_rooms: | |
# We also always send down receipts for the current user. | |
user_receipts = ( | |
await self.store.get_linearized_receipts_for_user_in_rooms( | |
user_id=sync_config.user.to_string(), | |
room_ids=initial_rooms, | |
to_key=to_token.receipt_key, | |
) | |
) | |
# For rooms we haven't previously sent down, we could send all receipts | |
# from that room but we only want to include receipts for events | |
# in the timeline to avoid bloating and blowing up the sync response | |
# as the number of users in the room increases. (this behavior is part of the spec) | |
initial_rooms_and_event_ids = [ | |
(room_id, event.event_id) | |
for room_id in initial_rooms | |
if room_id in actual_room_response_map | |
for event in actual_room_response_map[room_id].timeline_events | |
] | |
initial_receipts = await self.store.get_linearized_receipts_for_events( | |
room_and_event_ids=initial_rooms_and_event_ids, | |
) | |
# Combine the receipts for a room and add them to | |
# `fetched_receipts` | |
for room_id in initial_receipts.keys() | user_receipts.keys(): | |
receipt_content = ReceiptInRoom.merge_to_content( | |
list( | |
itertools.chain( | |
initial_receipts.get(room_id, []), | |
user_receipts.get(room_id, []), | |
) | |
) | |
) | |
fetched_receipts.append( | |
{ | |
"room_id": room_id, | |
"type": EduTypes.RECEIPT, | |
"content": receipt_content, | |
} | |
) | |
fetched_receipts = ReceiptEventSource.filter_out_private_receipts( | |
fetched_receipts, sync_config.user.to_string() | |
) | |
for receipt in fetched_receipts: | |
# These fields should exist for every receipt | |
room_id = receipt["room_id"] | |
type = receipt["type"] | |
content = receipt["content"] | |
room_id_to_receipt_map[room_id] = {"type": type, "content": content} | |
# Update the per-connection state to track which rooms we have sent | |
# all the receipts for. | |
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys()) | |
new_connection_state.receipts.record_sent_rooms(initial_rooms) | |
if from_token: | |
# Now find the set of rooms that may have receipts that we're not sending | |
# down. We only need to check rooms that we have previously returned | |
# receipts for (in `previous_connection_state`) because we only care about | |
# updating `LIVE` rooms to `PREVIOUSLY`. The `PREVIOUSLY` rooms will just | |
# stay pointing at their previous position so we don't need to waste time | |
# checking those and since we default to `NEVER`, rooms that were `NEVER` | |
# sent before don't need to be recorded as we'll handle them correctly when | |
# they come into range for the first time. | |
rooms_no_receipts = [ | |
room_id | |
for room_id, room_status in previous_connection_state.receipts._statuses.items() | |
if room_status.status == HaveSentRoomFlag.LIVE | |
and room_id not in relevant_room_ids | |
] | |
changed_rooms = await self.store.get_rooms_with_receipts_between( | |
rooms_no_receipts, | |
from_key=from_token.stream_token.receipt_key, | |
to_key=to_token.receipt_key, | |
) | |
new_connection_state.receipts.record_unsent_rooms( | |
changed_rooms, from_token.stream_token.receipt_key | |
) | |
return SlidingSyncResult.Extensions.ReceiptsExtension( | |
room_id_to_receipt_map=room_id_to_receipt_map, | |
) |
Does this new room_receipts
provide something beyond what that does? Does the receipt extension need to be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is just re-arranging where the receipts are fetched so that they can be used both for that extension and to calculate the unread notification counts, since both of these need the receipt information.
# fetch the user's receipts between the two points: these will be factor | ||
# in deciding whether to send the room, since it may have changed their | ||
# notification counts | ||
receipts = await self.store.get_linearized_receipts_for_user_in_rooms( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can share the work with the receipts
extension if it's enabled. Seems less than ideal to have to run get_linearized_receipts_for_user_in_rooms(...)
twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle, yes, although this uses a the function get_linearized_receipts_for_rooms
which returns a different type for reasons I don't really understand: JsonMapping
vs ReceiptInRoom
: is one just converting to a real type and the other isn't? Apart from that, I guess it should be possible to either move the receipts logic upwards to fetch all the receipts and pass them into the main sync part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh actually, the receipts extension calls get_linearized_receipts_for_user_in_rooms
too, so presumably we can just pass this in and then we'll be doing the same number of database queries as before. The receipts extension doesn't give a 'from' key, though, which seems weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, done.
# TODO: These are just dummy values. We could potentially just remove these | ||
# since notifications can only really be done correctly on the client anyway | ||
# (encrypted rooms). | ||
notification_count=0, | ||
highlight_count=0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erikjohnston Are we even interested in including notification values?
I've also asked this question on the MSC itself: matrix-org/matrix-spec-proposals#4186 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dbkr It would also be useful to understand how notification counts are calculated on Element Web today. I assume the Element X apps are accomplishing something without these counts so could Element Web adopt the same strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, EX doesn't do backfill as such but does obviously keep history and generally wake up to receive messages via push, so it calculates them all locally and in practice this seems to be enough. Element Web doesn't do either of these things and so currently does a slightly funky combination of trusting server counts for unencrypted rooms and fixing them up locally for encrypted rooms, except when the server sets them to 0 when we know to clear the count.
Ultimately we would probably like to backfill to calculate the notification counts entirely client side, although probably just for e2e rooms: doing this for large public rooms is arguably a lot of work unnecessarily, so I feel like the path to Doing It Right involves having these counts at least to use in unencrypted rooms (where Doing It Right in this case involves Matrix in its dual role of trying to be a both an e2e messenger and one for large public groups, but that's where we are).
Meanwhile, EW also downloads the entire history of e2e rooms on desktop in the seshat search index, but unfortunately that's it's own thing, so really we would have to replace seshat first, otherwise we'd have to bodge the two together or end up downloading e2e room history twice.
This adds notification count support to simplified sliding sync, ie. implementing the
notification_count
andhighlight_count
fields which are already in the SSS MSC but were set to dummy values, and also addsunread_thread_notifications
which I've proposed adding to the MSC.This allows notifications to work properly in Element Web's SSS support without us rewriting it to crawl complete room history (again, since we already do this on desktop for search, but not in the right place).
This required a bit of rejigging things around to get it to work without duplicating work and database requests so it's not as trivial as I was hoping. I hope it's in the right direction but would appreciate guidance if not.
Pull Request Checklist
EventStore
toEventWorkerStore
.".code blocks
.(run the linters)