@@ -16,11 +16,7 @@ use std::{collections::BTreeSet, sync::Arc};
16
16
17
17
use eyeball:: SharedObservable ;
18
18
use futures_util:: { pin_mut, StreamExt } ;
19
- use matrix_sdk:: {
20
- event_cache:: { self , RoomEventCacheUpdate } ,
21
- executor:: spawn,
22
- Room ,
23
- } ;
19
+ use matrix_sdk:: { event_cache:: RoomEventCacheUpdate , executor:: spawn, Room } ;
24
20
use ruma:: { events:: AnySyncTimelineEvent , RoomVersionId } ;
25
21
use tokio:: sync:: { broadcast, mpsc} ;
26
22
use tracing:: { info, info_span, trace, warn, Instrument , Span } ;
@@ -30,9 +26,12 @@ use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event};
30
26
use super :: {
31
27
inner:: { TimelineInner , TimelineInnerSettings } ,
32
28
queue:: send_queued_messages,
33
- BackPaginationStatus , Timeline , TimelineDropHandle ,
29
+ Error , Timeline , TimelineDropHandle , TimelineFocus ,
30
+ } ;
31
+ use crate :: {
32
+ timeline:: { event_item:: RemoteEventOrigin , PaginationStatus } ,
33
+ unable_to_decrypt_hook:: UtdHookManager ,
34
34
} ;
35
- use crate :: unable_to_decrypt_hook:: UtdHookManager ;
36
35
37
36
/// Builder that allows creating and configuring various parts of a
38
37
/// [`Timeline`].
@@ -41,6 +40,7 @@ use crate::unable_to_decrypt_hook::UtdHookManager;
41
40
pub struct TimelineBuilder {
42
41
room : Room ,
43
42
settings : TimelineInnerSettings ,
43
+ focus : TimelineFocus ,
44
44
45
45
/// An optional hook to call whenever we run into an unable-to-decrypt or a
46
46
/// late-decryption event.
@@ -56,10 +56,19 @@ impl TimelineBuilder {
56
56
room : room. clone ( ) ,
57
57
settings : TimelineInnerSettings :: default ( ) ,
58
58
unable_to_decrypt_hook : None ,
59
+ focus : TimelineFocus :: Live ,
59
60
internal_id_prefix : None ,
60
61
}
61
62
}
62
63
64
+ /// Sets up the initial focus for this timeline.
65
+ ///
66
+ /// This can be changed later on while the timeline is alive.
67
+ pub fn with_focus ( mut self , focus : TimelineFocus ) -> Self {
68
+ self . focus = focus;
69
+ self
70
+ }
71
+
63
72
/// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
64
73
/// we're building.
65
74
///
@@ -134,8 +143,8 @@ impl TimelineBuilder {
134
143
track_read_receipts = self . settings. track_read_receipts,
135
144
)
136
145
) ]
137
- pub async fn build ( self ) -> event_cache :: Result < Timeline > {
138
- let Self { room, settings, unable_to_decrypt_hook, internal_id_prefix } = self ;
146
+ pub async fn build ( self ) -> Result < Timeline , Error > {
147
+ let Self { room, settings, unable_to_decrypt_hook, focus , internal_id_prefix } = self ;
139
148
140
149
let client = room. client ( ) ;
141
150
let event_cache = client. event_cache ( ) ;
@@ -144,14 +153,12 @@ impl TimelineBuilder {
144
153
event_cache. subscribe ( ) ?;
145
154
146
155
let ( room_event_cache, event_cache_drop) = room. event_cache ( ) . await ?;
147
- let ( events, mut event_subscriber) = room_event_cache. subscribe ( ) . await ?;
148
-
149
- let has_events = !events. is_empty ( ) ;
156
+ let ( _, mut event_subscriber) = room_event_cache. subscribe ( ) . await ?;
150
157
151
- let inner = TimelineInner :: new ( room, internal_id_prefix, unable_to_decrypt_hook)
158
+ let inner = TimelineInner :: new ( room, focus , internal_id_prefix, unable_to_decrypt_hook)
152
159
. with_settings ( settings) ;
153
160
154
- inner. replace_with_initial_events ( events ) . await ;
161
+ let has_events = inner. init_focus ( & room_event_cache ) . await ? ;
155
162
156
163
let room = inner. room ( ) ;
157
164
let client = room. client ( ) ;
@@ -165,10 +172,10 @@ impl TimelineBuilder {
165
172
span. follows_from ( Span :: current ( ) ) ;
166
173
167
174
async move {
168
- trace ! ( "Spawned the event subscriber task" ) ;
175
+ trace ! ( "Spawned the event subscriber task. " ) ;
169
176
170
177
loop {
171
- trace ! ( "Waiting for an event" ) ;
178
+ trace ! ( "Waiting for an event. " ) ;
172
179
173
180
let update = match event_subscriber. recv ( ) . await {
174
181
Ok ( up) => up,
@@ -187,7 +194,7 @@ impl TimelineBuilder {
187
194
// current timeline.
188
195
match room_event_cache. subscribe ( ) . await {
189
196
Ok ( ( events, _) ) => {
190
- inner. replace_with_initial_events ( events) . await ;
197
+ inner. replace_with_initial_events ( events, RemoteEventOrigin :: Sync ) . await ;
191
198
}
192
199
Err ( err) => {
193
200
warn ! ( "Error when re-inserting initial events into the timeline: {err}" ) ;
@@ -200,18 +207,25 @@ impl TimelineBuilder {
200
207
} ;
201
208
202
209
match update {
203
- RoomEventCacheUpdate :: Clear => {
204
- trace ! ( "Clearing the timeline." ) ;
205
- inner. clear ( ) . await ;
206
- }
207
-
208
210
RoomEventCacheUpdate :: UpdateReadMarker { event_id } => {
209
211
trace ! ( target = %event_id, "Handling fully read marker." ) ;
210
212
inner. handle_fully_read_marker ( event_id) . await ;
211
213
}
212
214
215
+ RoomEventCacheUpdate :: Clear => {
216
+ if !inner. is_live ( ) . await {
217
+ // Ignore a clear for a timeline not in the live mode; the
218
+ // focused-on-event mode doesn't add any new items to the timeline
219
+ // anyways.
220
+ continue ;
221
+ }
222
+
223
+ trace ! ( "Clearing the timeline." ) ;
224
+ inner. clear ( ) . await ;
225
+ }
226
+
213
227
RoomEventCacheUpdate :: Append { events, ephemeral, ambiguity_changes } => {
214
- trace ! ( "Received new events" ) ;
228
+ trace ! ( "Received new events from sync. " ) ;
215
229
216
230
// TODO: (bnjbvr) ephemeral should be handled by the event cache, and
217
231
// we should replace this with a simple `add_events_at`.
@@ -300,7 +314,7 @@ impl TimelineBuilder {
300
314
301
315
let timeline = Timeline {
302
316
inner,
303
- back_pagination_status : SharedObservable :: new ( BackPaginationStatus :: Idle ) ,
317
+ back_pagination_status : SharedObservable :: new ( PaginationStatus :: Idle ) ,
304
318
msg_sender,
305
319
event_cache : room_event_cache,
306
320
drop_handle : Arc :: new ( TimelineDropHandle {
0 commit comments