Skip to content

Commit b4aa726

Browse files
committed
Merge branch 'rei/t2_msc4306_conflict' into rei/t2_pushrules
2 parents 47afadb + 29c1ae4 commit b4aa726

File tree

9 files changed

+160
-88
lines changed

9 files changed

+160
-88
lines changed

synapse/api/errors.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,10 @@ class Codes(str, Enum):
141141
INVITE_BLOCKED = "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED"
142142

143143
# Part of MSC4306: Thread Subscriptions
144-
SKIPPED = "M_SKIPPED"
145-
NOT_IN_THREAD = "M_NOT_IN_THREAD"
144+
MSC4306_CONFLICTING_UNSUBSCRIPTION = (
145+
"IO.ELEMENT.MSC4306.M_CONFLICTING_UNSUBSCRIPTION"
146+
)
147+
MSC4306_NOT_IN_THREAD = "IO.ELEMENT.MSC4306.M_NOT_IN_THREAD"
146148

147149

148150
class CodeMessageException(RuntimeError):

synapse/handlers/thread_subscriptions.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
AutomaticSubscriptionConflicted,
1010
ThreadSubscription,
1111
)
12-
from synapse.types import UserID
12+
from synapse.types import EventOrderings, UserID
1313

1414
if TYPE_CHECKING:
1515
from synapse.server import HomeServer
@@ -87,46 +87,40 @@ async def subscribe_user_to_thread(
8787
# First check that the user can access the thread root event
8888
# and that it exists
8989
try:
90-
event = await self.event_handler.get_event(
90+
thread_root_event = await self.event_handler.get_event(
9191
user_id, room_id, thread_root_event_id
9292
)
93-
if event is None:
93+
if thread_root_event is None:
9494
raise NotFoundError("No such thread root")
9595
except AuthError:
9696
logger.info("rejecting thread subscriptions change (thread not accessible)")
9797
raise NotFoundError("No such thread root")
9898

9999
if automatic_event_id:
100-
event = await self.event_handler.get_event(
100+
autosub_cause_event = await self.event_handler.get_event(
101101
user_id, room_id, automatic_event_id
102102
)
103-
if event is None:
104-
raise NotFoundError("No such automatic subscription cause event")
105-
relation = relation_from_event(event)
103+
if autosub_cause_event is None:
104+
raise NotFoundError("Automatic subscription event not found")
105+
relation = relation_from_event(autosub_cause_event)
106106
if (
107107
relation is None
108108
or relation.rel_type != RelationTypes.THREAD
109109
or relation.parent_id != thread_root_event_id
110110
):
111111
raise SynapseError(
112112
HTTPStatus.BAD_REQUEST,
113-
"Automatic subscription must be caused by an event in the thread",
114-
errcode=Codes.NOT_IN_THREAD,
113+
"Automatic subscription must use an event in the thread",
114+
errcode=Codes.MSC4306_NOT_IN_THREAD,
115115
)
116116

117-
stream_ordering = event.internal_metadata.stream_ordering
118-
assert stream_ordering is not None
119-
automatic_event_orderings = (
120-
stream_ordering,
121-
# depth is topological_ordering
122-
event.depth,
123-
)
117+
automatic_event_orderings = EventOrderings.from_event(autosub_cause_event)
124118
else:
125119
automatic_event_orderings = None
126120

127121
outcome = await self.store.subscribe_user_to_thread(
128122
user_id.to_string(),
129-
event.room_id,
123+
room_id,
130124
thread_root_event_id,
131125
automatic_event_orderings=automatic_event_orderings,
132126
)
@@ -135,7 +129,7 @@ async def subscribe_user_to_thread(
135129
raise SynapseError(
136130
HTTPStatus.CONFLICT,
137131
"Automatic subscription obsoleted by an unsubscription request.",
138-
errcode=Codes.SKIPPED,
132+
errcode=Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
139133
)
140134

141135
return outcome

synapse/storage/databases/main/thread_subscriptions.py

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
)
3434
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
3535
from synapse.storage.util.id_generators import MultiWriterIdGenerator
36+
from synapse.types import EventOrderings
3637
from synapse.util.caches.descriptors import cached
3738

3839
if TYPE_CHECKING:
@@ -112,39 +113,48 @@ def process_replication_position(
112113

113114
@staticmethod
114115
def _should_skip_autosubscription_after_unsubscription(
115-
autosub_stream_ordering: int,
116-
autosub_topological_ordering: int,
117-
unsubscribed_at_stream_ordering: int,
118-
unsubscribed_at_topological_ordering: int,
116+
*,
117+
autosub: EventOrderings,
118+
unsubscribed_at: EventOrderings,
119119
) -> bool:
120120
"""
121-
Returns whether an automatic subscription occurring following an unsubscription
121+
Returns whether an automatic subscription occurring *after* an unsubscription
122122
should be skipped, because the unsubscription already 'acknowledges' the event
123123
causing the automatic subscription (the cause event).
124124
125+
To determine *after*, we use `stream_ordering` unless the event is backfilled
126+
(negative `stream_ordering`) and fallback to topological ordering.
127+
125128
Args:
126-
autosub_stream_ordering: the stream_ordering of the cause event
127-
autosub_topological_ordering: the topological_ordering of the cause event
128-
unsubscribed_at_stream_ordering: the maximum stream ordering at the time of unsubscription
129-
unsubscribed_at_topological_ordering: the maximum stream ordering at the time of unsubscription
129+
autosub: the stream_ordering and topological_ordering of the cause event
130+
unsubscribed_at:
131+
the maximum stream ordering and the maximum topological ordering at the time of unsubscription
130132
131133
Returns:
132134
True if the automatic subscription should be skipped
133135
"""
134-
# these two orderings should be positive, because they don't refer to a specific event
135-
# but rather the maximum at the time of unsubscription
136-
assert unsubscribed_at_stream_ordering > 0
137-
assert unsubscribed_at_topological_ordering > 0
138-
139-
if unsubscribed_at_stream_ordering >= autosub_stream_ordering > 0:
136+
# For normal rooms, these two orderings should be positive, because
137+
# they don't refer to a specific event but rather the maximum at the
138+
# time of unsubscription.
139+
#
140+
# However, for rooms that have never been joined and that are being peeked at,
141+
# we might not have a single non-backfilled event and therefore the stream
142+
# ordering might be negative, so we don't assert this case.
143+
assert unsubscribed_at.topological > 0
144+
145+
unsubscribed_at_backfilled = unsubscribed_at.stream < 0
146+
if (
147+
not unsubscribed_at_backfilled
148+
and unsubscribed_at.stream >= autosub.stream > 0
149+
):
140150
# non-backfilled events: the unsubscription is later according to
141151
# the stream
142152
return True
143153

144-
if autosub_stream_ordering < 0:
154+
if autosub.stream < 0:
145155
# the auto-subscription cause event was backfilled, so fall back to
146156
# topological ordering
147-
if unsubscribed_at_topological_ordering >= autosub_topological_ordering:
157+
if unsubscribed_at.topological >= autosub.topological:
148158
return True
149159

150160
return False
@@ -155,20 +165,29 @@ async def subscribe_user_to_thread(
155165
room_id: str,
156166
thread_root_event_id: str,
157167
*,
158-
automatic_event_orderings: Optional[Tuple[int, int]],
168+
automatic_event_orderings: Optional[EventOrderings],
159169
) -> Optional[Union[int, AutomaticSubscriptionConflicted]]:
160170
"""Updates a user's subscription settings for a specific thread root.
161171
162172
If no change would be made to the subscription, does not produce any database change.
163173
174+
Case-by-case:
175+
- if we already have an automatic subscription:
176+
- new automatic subscriptions will be no-ops (no database write),
177+
- new manual subscriptions will overwrite the automatic subscription
178+
- if we already have a manual subscription:
179+
we don't update (no database write) in either case, because:
180+
- the existing manual subscription wins over a new automatic subscription request
181+
- there would be no need to write a manual subscription because we already have one
182+
164183
Args:
165184
user_id: The ID of the user whose settings are being updated.
166185
room_id: The ID of the room the thread root belongs to.
167186
thread_root_event_id: The event ID of the thread root.
168-
automatic_event_id:
187+
automatic_event_orderings:
169188
Value depends on whether the subscription was performed automatically by the user's client.
170189
For manual subscriptions: None.
171-
For automatic subscriptions: (stream_ordering, topological_ordering) of the event.
190+
For automatic subscriptions: the orderings of the event.
172191
173192
Returns:
174193
If a subscription is made: (int) the stream ID for this update.
@@ -243,20 +262,19 @@ def _subscribe_user_to_thread_txn(
243262
# is good enough (either we already have a manual subscription,
244263
# or we requested an automatic subscription)
245264
# In that case, nothing to change here.
265+
# (See docstring for case-by-case explanation)
246266
return None
247267

248268
if not subscribed and requested_automatic:
249269
assert automatic_event_orderings is not None
250270
# we previously unsubscribed and we are now automatically subscribing
251271
# Check whether the new autosubscription should be skipped
252-
autosub_stream_ordering, autosub_topological_ordering = (
253-
automatic_event_orderings
254-
)
255272
if ThreadSubscriptionsWorkerStore._should_skip_autosubscription_after_unsubscription(
256-
autosub_stream_ordering,
257-
autosub_topological_ordering,
258-
unsubscribed_at_stream_ordering,
259-
unsubscribed_at_topological_ordering,
273+
autosub=automatic_event_orderings,
274+
unsubscribed_at=EventOrderings(
275+
unsubscribed_at_stream_ordering,
276+
unsubscribed_at_topological_ordering,
277+
),
260278
):
261279
# skip the subscription
262280
return AutomaticSubscriptionConflicted()
@@ -473,7 +491,7 @@ def get_max_thread_subscriptions_stream_id(self) -> int:
473491
return self._thread_subscriptions_id_gen.get_current_token()
474492

475493
async def get_updated_thread_subscriptions(
476-
self, from_id: int, to_id: int, limit: int
494+
self, *, from_id: int, to_id: int, limit: int
477495
) -> List[Tuple[int, str, str, str]]:
478496
"""Get updates to thread subscriptions between two stream IDs.
479497
@@ -506,7 +524,7 @@ def get_updated_thread_subscriptions_txn(
506524
)
507525

508526
async def get_updated_thread_subscriptions_for_user(
509-
self, user_id: str, from_id: int, to_id: int, limit: int
527+
self, user_id: str, *, from_id: int, to_id: int, limit: int
510528
) -> List[Tuple[int, str, str]]:
511529
"""Get updates to thread subscriptions for a specific user.
512530

synapse/types/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
from typing_extensions import Self
7474

7575
from synapse.appservice.api import ApplicationService
76+
from synapse.events import EventBase
7677
from synapse.storage.databases.main import DataStore, PurgeEventsStore
7778
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
7879
from synapse.storage.util.id_generators import MultiWriterIdGenerator
@@ -1464,3 +1465,31 @@ class ScheduledTask:
14641465
result: Optional[JsonMapping]
14651466
# Optional error that should be assigned a value when the status is FAILED
14661467
error: Optional[str]
1468+
1469+
1470+
@attr.s(auto_attribs=True, frozen=True, slots=True)
1471+
class EventOrderings:
1472+
stream: int
1473+
"""
1474+
The stream_ordering of the event.
1475+
Negative numbers mean the event was backfilled.
1476+
"""
1477+
1478+
topological: int
1479+
"""
1480+
The topological_ordering of the event.
1481+
Currently this is equivalent to the `depth` attributes of
1482+
the PDU.
1483+
"""
1484+
1485+
@staticmethod
1486+
def from_event(event: "EventBase") -> "EventOrderings":
1487+
"""
1488+
Get the orderings from an event.
1489+
1490+
Preconditions:
1491+
- the event must have been persisted (otherwise it won't have a stream ordering)
1492+
"""
1493+
stream = event.internal_metadata.stream_ordering
1494+
assert stream is not None
1495+
return EventOrderings(stream, event.depth)

synapse/util/pydantic_models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ def validate_event_id(cls, value: str) -> str:
7272
# Room version 3 is base64, 4+ are base64Url
7373
# In both cases, the base64 is unpadded.
7474
# refs:
75-
# - https://spec.matrix.org/v1.15/rooms/v3/
76-
# - https://spec.matrix.org/v1.15/rooms/v4/
75+
# - https://spec.matrix.org/v1.15/rooms/v3/ e.g. $acR1l0raoZnm60CBwAVgqbZqoO/mYU81xysh1u7XcJk
76+
# - https://spec.matrix.org/v1.15/rooms/v4/ e.g. $Rqnc-F-dvnEYJTyHq_iKxU2bZ1CI92-kuZq3a5lr5Zg
7777
b64_hash = value[1:]
7878
if cls.EVENT_ID_HASH_ROOM_VERSION_3_PLUS.fullmatch(b64_hash) is None:
7979
raise ValueError(

tests/push/test_bulk_push_rule_evaluator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ def test_suppress_edits(self) -> None:
453453
@override_config({"experimental_features": {"msc4306_enabled": True}})
454454
def test_thread_subscriptions(self) -> None:
455455
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
456-
(thread_root_id,) = self.helper.send_events(self.room_id, 1, tok=self.token)
456+
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
457457

458458
self.assertFalse(
459459
self._create_and_process(
@@ -501,7 +501,7 @@ def test_with_disabled_thread_subscriptions(self) -> None:
501501
FUTURE: If MSC4306 becomes enabled-by-default/accepted, this test is to be removed.
502502
"""
503503
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
504-
(thread_root_id,) = self.helper.send_events(self.room_id, 1, tok=self.token)
504+
(thread_root_id,) = self.helper.send_messages(self.room_id, 1, tok=self.token)
505505

506506
# When MSC4306 is not enabled, a threaded message generates a notification
507507
# by default.

tests/rest/client/test_thread_subscriptions.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from twisted.internet.testing import MemoryReactor
1717

18+
from synapse.api.errors import Codes
1819
from synapse.rest import admin
1920
from synapse.rest.client import login, profile, room, thread_subscriptions
2021
from synapse.server import HomeServer
@@ -49,10 +50,12 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
4950
# Create a room and send a message to use as a thread root
5051
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
5152
self.helper.join(self.room_id, self.other_user_id, tok=self.other_token)
52-
(self.root_event_id,) = self.helper.send_events(self.room_id, 1, tok=self.token)
53+
(self.root_event_id,) = self.helper.send_messages(
54+
self.room_id, 1, tok=self.token
55+
)
5356

5457
# Send a message in the thread
55-
self.threaded_events = self.helper.send_events(
58+
self.threaded_events = self.helper.send_messages(
5659
self.room_id,
5760
2,
5861
content_fn=lambda idx: {
@@ -265,15 +268,17 @@ def test_auto_subscribe_cause_event_not_in_thread(self) -> None:
265268
actually in the thread.
266269
This is an error.
267270
"""
268-
(unrelated_event_id,) = self.helper.send_events(self.room_id, 1, tok=self.token)
271+
(unrelated_event_id,) = self.helper.send_messages(
272+
self.room_id, 1, tok=self.token
273+
)
269274
channel = self.make_request(
270275
"PUT",
271276
f"{PREFIX}/{self.room_id}/thread/{self.root_event_id}/subscription",
272277
{"automatic": unrelated_event_id},
273278
access_token=self.token,
274279
)
275280
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.text_body)
276-
self.assertEqual(channel.json_body["errcode"], "M_NOT_IN_THREAD")
281+
self.assertEqual(channel.json_body["errcode"], Codes.MSC4306_NOT_IN_THREAD)
277282

278283
def test_auto_resubscription_conflict(self) -> None:
279284
"""
@@ -297,7 +302,9 @@ def test_auto_resubscription_conflict(self) -> None:
297302
)
298303
self.assertEqual(channel.code, HTTPStatus.CONFLICT, channel.text_body)
299304
self.assertEqual(
300-
channel.json_body["errcode"], "M_SKIPPED", channel.text_body
305+
channel.json_body["errcode"],
306+
Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
307+
channel.text_body,
301308
)
302309

303310
# Check the subscription was not made
@@ -310,7 +317,7 @@ def test_auto_resubscription_conflict(self) -> None:
310317

311318
# But if a new event is sent after the unsubscription took place,
312319
# that one can be used for an automatic subscription
313-
(later_event_id,) = self.helper.send_events(
320+
(later_event_id,) = self.helper.send_messages(
314321
self.room_id,
315322
1,
316323
content_fn=lambda _: {

tests/rest/client/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
from twisted.internet.testing import MemoryReactorClock
4848
from twisted.web.server import Site
4949

50-
from synapse.api.constants import Membership, ReceiptTypes
50+
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
5151
from synapse.api.errors import Codes
5252
from synapse.server import HomeServer
5353
from synapse.types import JsonDict
@@ -396,7 +396,7 @@ def send(
396396
custom_headers=custom_headers,
397397
)
398398

399-
def send_events(
399+
def send_messages(
400400
self,
401401
room_id: str,
402402
num_events: int,
@@ -414,7 +414,7 @@ def send_events(
414414
for event_index in range(num_events):
415415
response = self.send_event(
416416
room_id,
417-
"m.room.message",
417+
EventTypes.Message,
418418
content_fn(event_index),
419419
tok=tok,
420420
)

0 commit comments

Comments
 (0)