|
| 1 | +import 'dart:async'; |
| 2 | + |
| 3 | +import 'package:collection/collection.dart'; |
1 | 4 | import 'package:flutter/foundation.dart';
|
2 | 5 |
|
3 | 6 | import '../api/model/events.dart';
|
4 | 7 | import '../api/model/initial_snapshot.dart';
|
5 | 8 | import '../api/model/model.dart';
|
| 9 | +import '../api/route/channels.dart'; |
| 10 | +import 'store.dart'; |
| 11 | + |
| 12 | +final _apiGetStreamTopics = getStreamTopics; // similar to _apiSendMessage in lib/model/message.dart |
6 | 13 |
|
7 | 14 | /// The portion of [PerAccountStore] for channels, topics, and stuff about them.
|
8 | 15 | ///
|
@@ -34,6 +41,26 @@ mixin ChannelStore {
|
34 | 41 | /// and [streamsByName].
|
35 | 42 | Map<int, Subscription> get subscriptions;
|
36 | 43 |
|
| 44 | + /// Fetch topics in a stream from the server. |
| 45 | + /// |
| 46 | + /// The results from the last successful fetch |
| 47 | + /// can be retrieved with [getStreamTopics]. |
| 48 | + Future<void> fetchTopics(int streamId); |
| 49 | + |
| 50 | + /// Pairs of the known topics and its latest message ID, in the given stream. |
| 51 | + /// |
| 52 | + /// Returns null if the data has never been fetched yet. |
| 53 | + /// To fetch it from the server, use [fetchTopics]. |
| 54 | + /// |
| 55 | + /// The result is guaranteed to be sorted by [GetStreamTopicsEntry.maxId], and the |
| 56 | + /// topics are guaranteed to be distinct. |
| 57 | + /// |
| 58 | + /// In some cases, the same maxId affected by message moves can be present in |
| 59 | + /// multiple [GetStreamTopicsEntry] entries. For this reason, the caller |
| 60 | + /// should not rely on [getStreamTopics] to determine which topic the message |
| 61 | + /// is in. Instead, refer to [PerAccountStore.messages]. |
| 62 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId); |
| 63 | + |
37 | 64 | /// The visibility policy that the self-user has for the given topic.
|
38 | 65 | ///
|
39 | 66 | /// This does not incorporate the user's channel-level policy,
|
@@ -208,6 +235,33 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
208 | 235 | @override
|
209 | 236 | final Map<int, Subscription> subscriptions;
|
210 | 237 |
|
| 238 | + /// Maps indexed by stream IDs, of the known latest message IDs in each topic. |
| 239 | + /// |
| 240 | + /// For example: `_latestMessageIdsByStreamTopic[stream.id][topic] = maxId` |
| 241 | + /// |
| 242 | + /// In some cases, the same message IDs, when affected by message moves, can |
| 243 | + /// be present for mutliple stream-topic keys. |
| 244 | + final Map<int, Map<TopicName, int>> _latestMessageIdsByStreamTopic = {}; |
| 245 | + |
| 246 | + @override |
| 247 | + Future<void> fetchTopics(int streamId) async { |
| 248 | + final result = await _apiGetStreamTopics(connection, streamId: streamId); |
| 249 | + _latestMessageIdsByStreamTopic[streamId] = { |
| 250 | + for (final GetStreamTopicsEntry(:name, :maxId) in result.topics) |
| 251 | + name: maxId, |
| 252 | + }; |
| 253 | + } |
| 254 | + |
| 255 | + @override |
| 256 | + List<GetStreamTopicsEntry>? getStreamTopics(int streamId) { |
| 257 | + final latestMessageIdsInStream = _latestMessageIdsByStreamTopic[streamId]; |
| 258 | + if (latestMessageIdsInStream == null) return null; |
| 259 | + return [ |
| 260 | + for (final MapEntry(:key, :value) in latestMessageIdsInStream.entries) |
| 261 | + GetStreamTopicsEntry(maxId: value, name: key), |
| 262 | + ].sortedBy((value) => -value.maxId); |
| 263 | + } |
| 264 | + |
211 | 265 | @override
|
212 | 266 | Map<int, Map<TopicName, UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;
|
213 | 267 |
|
@@ -374,4 +428,45 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
374 | 428 | forStream[event.topicName] = visibilityPolicy;
|
375 | 429 | }
|
376 | 430 | }
|
| 431 | + |
| 432 | + /// Handle a [MessageEvent], returning whether listeners should be notified. |
| 433 | + bool handleMessageEvent(MessageEvent event) { |
| 434 | + if (event.message is! StreamMessage) return false; |
| 435 | + final StreamMessage(:streamId, :topic) = event.message as StreamMessage; |
| 436 | + |
| 437 | + final latestMessageIdsByTopic = _latestMessageIdsByStreamTopic[streamId]; |
| 438 | + if (latestMessageIdsByTopic == null) return false; |
| 439 | + assert(!latestMessageIdsByTopic.containsKey(topic) |
| 440 | + || latestMessageIdsByTopic[topic]! < event.message.id); |
| 441 | + latestMessageIdsByTopic[topic] = event.message.id; |
| 442 | + return true; |
| 443 | + } |
| 444 | + |
| 445 | + /// Handle a [UpdateMessageEvent], returning whether listeners should be |
| 446 | + /// notified. |
| 447 | + bool handleUpdateMessageEvent(UpdateMessageEvent event) { |
| 448 | + if (event.moveData == null) return false; |
| 449 | + final UpdateMessageMoveData( |
| 450 | + :origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode, |
| 451 | + ) = event.moveData!; |
| 452 | + bool shouldNotify = false; |
| 453 | + |
| 454 | + final origLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[origStreamId]; |
| 455 | + if (propagateMode == PropagateMode.changeAll |
| 456 | + && origLatestMessageIdsByTopics != null) { |
| 457 | + shouldNotify = origLatestMessageIdsByTopics.remove(origTopic) != null; |
| 458 | + } |
| 459 | + |
| 460 | + final newLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[newStreamId]; |
| 461 | + if (newLatestMessageIdsByTopics != null) { |
| 462 | + final movedMaxId = event.messageIds.max; |
| 463 | + if (!newLatestMessageIdsByTopics.containsKey(newTopic) |
| 464 | + || newLatestMessageIdsByTopics[newTopic]! < movedMaxId) { |
| 465 | + newLatestMessageIdsByTopics[newTopic] = movedMaxId; |
| 466 | + shouldNotify = true; |
| 467 | + } |
| 468 | + } |
| 469 | + |
| 470 | + return shouldNotify; |
| 471 | + } |
377 | 472 | }
|
0 commit comments