Skip to content

Commit 954eb7a

Browse files
committed
Add EventOrderings dataclass
1 parent 17ef8e3 commit 954eb7a

File tree

4 files changed

+78
-43
lines changed

4 files changed

+78
-43
lines changed

synapse/handlers/thread_subscriptions.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
AutomaticSubscriptionConflicted,
1010
ThreadSubscription,
1111
)
12-
from synapse.types import UserID
12+
from synapse.types import EventOrderings, UserID
1313

1414
if TYPE_CHECKING:
1515
from synapse.server import HomeServer
@@ -114,13 +114,7 @@ async def subscribe_user_to_thread(
114114
errcode=Codes.MSC4306_NOT_IN_THREAD,
115115
)
116116

117-
stream_ordering = event.internal_metadata.stream_ordering
118-
assert stream_ordering is not None
119-
automatic_event_orderings = (
120-
stream_ordering,
121-
# depth is topological_ordering
122-
event.depth,
123-
)
117+
automatic_event_orderings = EventOrderings.from_event(event)
124118
else:
125119
automatic_event_orderings = None
126120

synapse/storage/databases/main/thread_subscriptions.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
)
3333
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
3434
from synapse.storage.util.id_generators import MultiWriterIdGenerator
35+
from synapse.types import EventOrderings
3536
from synapse.util.caches.descriptors import cached
3637

3738
if TYPE_CHECKING:
@@ -110,10 +111,9 @@ def process_replication_position(
110111

111112
@staticmethod
112113
def _should_skip_autosubscription_after_unsubscription(
113-
autosub_stream_ordering: int,
114-
autosub_topological_ordering: int,
115-
unsubscribed_at_stream_ordering: int,
116-
unsubscribed_at_topological_ordering: int,
114+
*,
115+
autosub: EventOrderings,
116+
unsubscribed_at: EventOrderings,
117117
) -> bool:
118118
"""
119119
Returns whether an automatic subscription occurring *after* an unsubscription
@@ -123,28 +123,27 @@ def _should_skip_autosubscription_after_unsubscription(
123123
To determine *after*, we use `stream_ordering` unless the event is backfilled (negative `stream_ordering`) and fallback to topological ordering.
124124
125125
Args:
126-
autosub_stream_ordering: the stream_ordering of the cause event
127-
autosub_topological_ordering: the topological_ordering of the cause event
128-
unsubscribed_at_stream_ordering: the maximum stream ordering at the time of unsubscription
129-
unsubscribed_at_topological_ordering: the maximum stream ordering at the time of unsubscription
126+
autosub: the stream_ordering and topological_ordering of the cause event
127+
unsubscribed_at:
128+
the maximum stream ordering and the maximum topological ordering at the time of unsubscription
130129
131130
Returns:
132131
True if the automatic subscription should be skipped
133132
"""
134133
# these two orderings should be positive, because they don't refer to a specific event
135134
# but rather the maximum at the time of unsubscription
136-
assert unsubscribed_at_stream_ordering > 0
137-
assert unsubscribed_at_topological_ordering > 0
135+
assert unsubscribed_at.stream > 0
136+
assert unsubscribed_at.topological > 0
138137

139-
if unsubscribed_at_stream_ordering >= autosub_stream_ordering > 0:
138+
if unsubscribed_at.stream >= autosub.stream > 0:
140139
# non-backfilled events: the unsubscription is later according to
141140
# the stream
142141
return True
143142

144-
if autosub_stream_ordering < 0:
143+
if autosub.stream < 0:
145144
# the auto-subscription cause event was backfilled, so fall back to
146145
# topological ordering
147-
if unsubscribed_at_topological_ordering >= autosub_topological_ordering:
146+
if unsubscribed_at.topological >= autosub.topological:
148147
return True
149148

150149
return False
@@ -155,7 +154,7 @@ async def subscribe_user_to_thread(
155154
room_id: str,
156155
thread_root_event_id: str,
157156
*,
158-
automatic_event_orderings: Optional[Tuple[int, int]],
157+
automatic_event_orderings: Optional[EventOrderings],
159158
) -> Optional[Union[int, AutomaticSubscriptionConflicted]]:
160159
"""Updates a user's subscription settings for a specific thread root.
161160
@@ -165,10 +164,10 @@ async def subscribe_user_to_thread(
165164
user_id: The ID of the user whose settings are being updated.
166165
room_id: The ID of the room the thread root belongs to.
167166
thread_root_event_id: The event ID of the thread root.
168-
automatic_event_id:
167+
automatic_event_orderings:
169168
Value depends on whether the subscription was performed automatically by the user's client.
170169
For manual subscriptions: None.
171-
For automatic subscriptions: (stream_ordering, topological_ordering) of the event.
170+
For automatic subscriptions: the orderings of the event.
172171
173172
Returns:
174173
If a subscription is made: (int) the stream ID for this update.
@@ -242,14 +241,12 @@ def _subscribe_user_to_thread_txn(
242241
assert automatic_event_orderings is not None
243242
# we previously unsubscribed and we are now automatically subscribing
244243
# Check whether the new autosubscription should be skipped
245-
autosub_stream_ordering, autosub_topological_ordering = (
246-
automatic_event_orderings
247-
)
248244
if ThreadSubscriptionsWorkerStore._should_skip_autosubscription_after_unsubscription(
249-
autosub_stream_ordering,
250-
autosub_topological_ordering,
251-
unsubscribed_at_stream_ordering,
252-
unsubscribed_at_topological_ordering,
245+
autosub=automatic_event_orderings,
246+
unsubscribed_at=EventOrderings(
247+
unsubscribed_at_stream_ordering,
248+
unsubscribed_at_topological_ordering,
249+
),
253250
):
254251
# skip the subscription
255252
return AutomaticSubscriptionConflicted()

synapse/types/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
from typing_extensions import Self
7474

7575
from synapse.appservice.api import ApplicationService
76+
from synapse.events import EventBase
7677
from synapse.storage.databases.main import DataStore, PurgeEventsStore
7778
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
7879
from synapse.storage.util.id_generators import MultiWriterIdGenerator
@@ -1464,3 +1465,31 @@ class ScheduledTask:
14641465
result: Optional[JsonMapping]
14651466
# Optional error that should be assigned a value when the status is FAILED
14661467
error: Optional[str]
1468+
1469+
1470+
@attr.s(auto_attribs=True, frozen=True, slots=True)
1471+
class EventOrderings:
1472+
stream: int
1473+
"""
1474+
The stream_ordering of the event.
1475+
Negative numbers mean the event was backfilled.
1476+
"""
1477+
1478+
topological: int
1479+
"""
1480+
The topological_ordering of the event.
1481+
Currently this is equivalent to the `depth` attributes of
1482+
the PDU.
1483+
"""
1484+
1485+
@staticmethod
1486+
def from_event(event: "EventBase") -> "EventOrderings":
1487+
"""
1488+
Get the orderings from an event.
1489+
1490+
Preconditions:
1491+
- the event must have been persisted (otherwise it won't have a stream ordering)
1492+
"""
1493+
stream = event.internal_metadata.stream_ordering
1494+
assert stream is not None
1495+
return EventOrderings(stream, event.depth)

tests/storage/test_thread_subscriptions.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# <https://www.gnu.org/licenses/agpl-3.0.html>.
1313
#
1414

15-
from typing import Optional, Tuple, Union
15+
from typing import Optional, Union
1616

1717
from twisted.internet.testing import MemoryReactor
1818

@@ -23,6 +23,7 @@
2323
ThreadSubscriptionsWorkerStore,
2424
)
2525
from synapse.storage.engines.sqlite import Sqlite3Engine
26+
from synapse.types import EventOrderings
2627
from synapse.util import Clock
2728

2829
from tests import unittest
@@ -101,7 +102,7 @@ def _subscribe(
101102
self,
102103
thread_root_id: str,
103104
*,
104-
automatic: Optional[Tuple[int, int]],
105+
automatic: Optional[EventOrderings],
105106
room_id: Optional[str] = None,
106107
user_id: Optional[str] = None,
107108
) -> Optional[Union[int, AutomaticSubscriptionConflicted]]:
@@ -153,7 +154,7 @@ def test_set_and_get_thread_subscription(self) -> None:
153154
# Subscribe
154155
self._subscribe(
155156
self.thread_root_id,
156-
automatic=(1, 1),
157+
automatic=EventOrderings(1, 1),
157158
)
158159

159160
# Assert subscription went through
@@ -185,7 +186,7 @@ def test_set_and_get_thread_subscription(self) -> None:
185186
def test_purge_thread_subscriptions_for_user(self) -> None:
186187
"""Test purging all thread subscription settings for a user."""
187188
# Set subscription settings for multiple threads
188-
self._subscribe(self.thread_root_id, automatic=(1, 1))
189+
self._subscribe(self.thread_root_id, automatic=EventOrderings(1, 1))
189190
self._subscribe(self.other_thread_root_id, automatic=None)
190191

191192
subscriptions = self.get_success(
@@ -224,8 +225,12 @@ def test_purge_thread_subscriptions_for_user(self) -> None:
224225
def test_get_updated_thread_subscriptions(self) -> None:
225226
"""Test getting updated thread subscriptions since a stream ID."""
226227

227-
stream_id1 = self._subscribe(self.thread_root_id, automatic=(1, 1))
228-
stream_id2 = self._subscribe(self.other_thread_root_id, automatic=(2, 2))
228+
stream_id1 = self._subscribe(
229+
self.thread_root_id, automatic=EventOrderings(1, 1)
230+
)
231+
stream_id2 = self._subscribe(
232+
self.other_thread_root_id, automatic=EventOrderings(2, 2)
233+
)
229234
assert stream_id1 is not None and not isinstance(
230235
stream_id1, AutomaticSubscriptionConflicted
231236
)
@@ -253,15 +258,17 @@ def test_get_updated_thread_subscriptions_for_user(self) -> None:
253258
other_user_id = "@other_user:test"
254259

255260
# Set thread subscription for main user
256-
stream_id1 = self._subscribe(self.thread_root_id, automatic=(1, 1))
261+
stream_id1 = self._subscribe(
262+
self.thread_root_id, automatic=EventOrderings(1, 1)
263+
)
257264
assert stream_id1 is not None and not isinstance(
258265
stream_id1, AutomaticSubscriptionConflicted
259266
)
260267

261268
# Set thread subscription for other user
262269
stream_id2 = self._subscribe(
263270
self.other_thread_root_id,
264-
automatic=(1, 1),
271+
automatic=EventOrderings(1, 1),
265272
user_id=other_user_id,
266273
)
267274
assert stream_id2 is not None and not isinstance(
@@ -299,13 +306,21 @@ def test_should_skip_autosubscription_after_unsubscription(self) -> None:
299306
# unsubscribe maximums: stream order, then tological order
300307

301308
# both orderings agree that the unsub is after the cause event
302-
self.assertTrue(func(1, 1, 2, 2))
309+
self.assertTrue(
310+
func(autosub=EventOrderings(1, 1), unsubscribed_at=EventOrderings(2, 2))
311+
)
303312

304313
# topological ordering is inconsistent with stream ordering,
305314
# in that case favour stream ordering because it's what /sync uses
306-
self.assertTrue(func(1, 2, 2, 1))
315+
self.assertTrue(
316+
func(autosub=EventOrderings(1, 2), unsubscribed_at=EventOrderings(2, 1))
317+
)
307318

308319
# the automatic subscription is caused by a backfilled event here
309320
# unfortunately we must fall back to topological ordering here
310-
self.assertTrue(func(-50, 2, 2, 3))
311-
self.assertFalse(func(-50, 2, 2, 1))
321+
self.assertTrue(
322+
func(autosub=EventOrderings(-50, 2), unsubscribed_at=EventOrderings(2, 3))
323+
)
324+
self.assertFalse(
325+
func(autosub=EventOrderings(-50, 2), unsubscribed_at=EventOrderings(2, 1))
326+
)

0 commit comments

Comments
 (0)