Skip to content

Commit e7ff810

Browse files
committed
Implement MSC4308
1 parent a5837cf commit e7ff810

File tree

3 files changed

+452
-6
lines changed

3 files changed

+452
-6
lines changed

synapse/handlers/sliding_sync/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ async def current_sync_for_user(
201201
202202
Args:
203203
sync_config: Sync configuration
204-
to_token: The point in the stream to sync up to.
204+
to_token: The latest point in the stream to sync up to.
205205
from_token: The point in the stream to sync from. Token of the end of the
206206
previous batch. May be `None` if this is the initial sync request.
207207
"""
@@ -283,7 +283,7 @@ async def handle_room(room_id: str) -> None:
283283
with start_active_span("sliding_sync.generate_room_entries"):
284284
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 20)
285285

286-
extensions = await self.extensions.get_extensions_response(
286+
extensions, to_token = await self.extensions.get_extensions_response(
287287
sync_config=sync_config,
288288
actual_lists=lists,
289289
previous_connection_state=previous_connection_state,

synapse/handlers/sliding_sync/extensions.py

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
Optional,
2525
Sequence,
2626
Set,
27+
Tuple,
2728
cast,
2829
)
2930

@@ -39,6 +40,7 @@
3940
MultiWriterStreamToken,
4041
SlidingSyncStreamToken,
4142
StrCollection,
43+
StreamKeyType,
4244
StreamToken,
4345
)
4446
from synapse.types.handlers.sliding_sync import (
@@ -80,9 +82,12 @@ async def get_extensions_response(
8082
actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult],
8183
to_token: StreamToken,
8284
from_token: Optional[SlidingSyncStreamToken],
83-
) -> SlidingSyncResult.Extensions:
85+
) -> Tuple[SlidingSyncResult.Extensions, StreamToken]:
8486
"""Handle extension requests.
8587
88+
May clamp the `to_token` and return a new, earlier, one to replace it, if some
89+
of the extensions are limited to sending less data down sync.
90+
8691
Args:
8792
sync_config: Sync configuration
8893
new_connection_state: Snapshot of the current per-connection state
@@ -93,12 +98,13 @@ async def get_extensions_response(
9398
actual_room_ids: The actual room IDs in the the Sliding Sync response.
9499
actual_room_response_map: A map of room ID to room results in the the
95100
Sliding Sync response.
96-
to_token: The point in the stream to sync up to.
101+
to_token: The latest point in the stream to sync up to.
102+
Extensions may limit themselves to earlier than this.
97103
from_token: The point in the stream to sync from.
98104
"""
99105

100106
if sync_config.extensions is None:
101-
return SlidingSyncResult.Extensions()
107+
return SlidingSyncResult.Extensions(), to_token
102108

103109
to_device_coro = None
104110
if sync_config.extensions.to_device is not None:
@@ -156,27 +162,51 @@ async def get_extensions_response(
156162
from_token=from_token,
157163
)
158164

165+
thread_subs_coro = None
166+
if sync_config.extensions.thread_subscriptions is not None:
167+
thread_subs_coro = self.get_thread_subscriptions_extension_response(
168+
sync_config=sync_config,
169+
thread_subscriptions_request=sync_config.extensions.thread_subscriptions,
170+
to_token=to_token,
171+
from_token=from_token,
172+
)
173+
159174
(
160175
to_device_response,
161176
e2ee_response,
162177
account_data_response,
163178
receipts_response,
164179
typing_response,
180+
thread_subs_result,
165181
) = await gather_optional_coroutines(
166182
to_device_coro,
167183
e2ee_coro,
168184
account_data_coro,
169185
receipts_coro,
170186
typing_coro,
187+
thread_subs_coro,
171188
)
172189

190+
if thread_subs_result is not None:
191+
thread_subs_response, limited_thread_subs_stream_position = (
192+
thread_subs_result
193+
)
194+
if limited_thread_subs_stream_position is not None:
195+
to_token = to_token.copy_and_replace(
196+
StreamKeyType.THREAD_SUBSCRIPTIONS,
197+
limited_thread_subs_stream_position,
198+
)
199+
else:
200+
thread_subs_response = None
201+
173202
return SlidingSyncResult.Extensions(
174203
to_device=to_device_response,
175204
e2ee=e2ee_response,
176205
account_data=account_data_response,
177206
receipts=receipts_response,
178207
typing=typing_response,
179-
)
208+
thread_subscriptions=thread_subs_response,
209+
), to_token
180210

181211
def find_relevant_room_ids_for_extension(
182212
self,
@@ -877,3 +907,69 @@ async def get_typing_extension_response(
877907
return SlidingSyncResult.Extensions.TypingExtension(
878908
room_id_to_typing_map=room_id_to_typing_map,
879909
)
910+
911+
async def get_thread_subscriptions_extension_response(
912+
self,
913+
sync_config: SlidingSyncConfig,
914+
thread_subscriptions_request: SlidingSyncConfig.Extensions.ThreadSubscriptionsExtension,
915+
to_token: StreamToken,
916+
from_token: Optional[SlidingSyncStreamToken],
917+
) -> Tuple[
918+
Optional[SlidingSyncResult.Extensions.ThreadSubscriptionsExtension],
919+
Optional[int],
920+
]:
921+
"""Handle Thread Subscriptions extension (MSC4308)
922+
923+
Args:
924+
sync_config: Sync configuration
925+
thread_subscriptions_request: The thread_subscriptions extension from the request
926+
to_token: The point in the stream to sync up to.
927+
from_token: The point in the stream to sync from.
928+
929+
Returns:
930+
- the response (or None if empty)
931+
- optionally, a new thread_subscriptions stream position to use as the new position in
932+
the `to_token` (because we generated a limited response)
933+
"""
934+
if not thread_subscriptions_request.enabled:
935+
return None, None
936+
937+
limit = thread_subscriptions_request.limit
938+
939+
if from_token:
940+
from_stream_id = from_token.stream_token.thread_subscriptions_key
941+
else:
942+
from_stream_id = StreamToken.START.thread_subscriptions_key
943+
944+
to_stream_id = to_token.thread_subscriptions_key
945+
946+
updates = await self.store.get_updated_thread_subscriptions_for_user(
947+
user_id=sync_config.user.to_string(),
948+
from_id=from_stream_id,
949+
to_id=to_stream_id,
950+
limit=limit,
951+
)
952+
953+
if len(updates) == 0:
954+
return None, None
955+
956+
changes = [
957+
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscriptionChange(
958+
room_id=room_id,
959+
root_event_id=thread_root_id,
960+
subscribed=subscribed,
961+
automatic=automatic,
962+
)
963+
for _stream_id, room_id, thread_root_id, subscribed, automatic in updates
964+
]
965+
966+
limited_max_stream_position = None
967+
if len(updates) == limit:
968+
# Given the limited window, we didn't send down all
969+
# thread subscription changes to the client.
970+
# So don't advance our to_token all the way yet.
971+
limited_max_stream_position = updates[-1][0]
972+
973+
return SlidingSyncResult.Extensions.ThreadSubscriptionsExtension(
974+
changed=changes
975+
), limited_max_stream_position

0 commit comments

Comments
 (0)