|
| 1 | +import 'dart:async'; |
| 2 | + |
| 3 | +import 'package:collection/collection.dart'; |
1 | 4 | import 'package:flutter/foundation.dart';
|
2 | 5 |
|
3 | 6 | import '../api/model/events.dart';
|
4 | 7 | import '../api/model/initial_snapshot.dart';
|
5 | 8 | import '../api/model/model.dart';
|
| 9 | +import '../api/route/channels.dart'; |
| 10 | +import 'store.dart'; |
6 | 11 |
|
7 | 12 | /// The portion of [PerAccountStore] for channels, topics, and stuff about them.
|
8 | 13 | ///
|
@@ -34,6 +39,22 @@ mixin ChannelStore {
|
34 | 39 | /// and [streamsByName].
|
35 | 40 | Map<int, Subscription> get subscriptions;
|
36 | 41 |
|
| 42 | + /// Fetch topics in a stream from the server. |
| 43 | + /// |
| 44 | + /// The results from the last successful fetch |
| 45 | + /// can be retrieved with [topicMaxIdsInStream]. |
| 46 | + Future<void> fetchTopics(int streamId); |
| 47 | + |
| 48 | + /// Pairs of the known topics and its latest message ID, in the given stream. |
| 49 | + /// |
| 50 | + /// The result is guaranteed to be sorted by maxId, and the topics are |
| 51 | + /// guaranteed to be distinct. In some cases when message moves occur, |
| 52 | + /// the same maxId can be present in multiple topics. |
| 53 | + /// |
| 54 | + /// Returns null if the data has never been fetched yet. |
| 55 | + /// To fetch it from the server, use [fetchTopics]. |
| 56 | + List<TopicMaxId>? topicMaxIdsInStream(int streamId); |
| 57 | + |
37 | 58 | /// The visibility policy that the self-user has for the given topic.
|
38 | 59 | ///
|
39 | 60 | /// This does not incorporate the user's channel-level policy,
|
@@ -208,6 +229,31 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
208 | 229 | @override
|
209 | 230 | final Map<int, Subscription> subscriptions;
|
210 | 231 |
|
| 232 | + /// Maps indexed by stream IDs, of the the known latest message IDs in each |
| 233 | + /// topic. |
| 234 | + /// |
| 235 | + /// For example: `_topicMaxIdsByStream[stream.id][topic] = maxId`. |
| 236 | + final Map<int, Map<TopicName, int>> _topicMaxIdsByStream = {}; |
| 237 | + |
| 238 | + @override |
| 239 | + Future<void> fetchTopics(int streamId) async { |
| 240 | + final result = await getStreamTopics(connection, streamId: streamId); |
| 241 | + _topicMaxIdsByStream[streamId] = { |
| 242 | + for (final GetStreamTopicsEntry(:name, :maxId) in result.topics) |
| 243 | + name: maxId, |
| 244 | + }; |
| 245 | + } |
| 246 | + |
| 247 | + @override |
| 248 | + List<TopicMaxId>? topicMaxIdsInStream(int streamId) { |
| 249 | + final topicMaxIdsInStream = _topicMaxIdsByStream[streamId]; |
| 250 | + if (topicMaxIdsInStream == null) return null; |
| 251 | + return [ |
| 252 | + for (final MapEntry(:key, :value) in topicMaxIdsInStream.entries) |
| 253 | + (topic: key, maxId: value) |
| 254 | + ].sortedBy((value) => -value.maxId); |
| 255 | + } |
| 256 | + |
211 | 257 | @override
|
212 | 258 | Map<int, Map<TopicName, UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;
|
213 | 259 |
|
@@ -374,4 +420,48 @@ class ChannelStoreImpl extends PerAccountStoreBase with ChannelStore {
|
374 | 420 | forStream[event.topicName] = visibilityPolicy;
|
375 | 421 | }
|
376 | 422 | }
|
| 423 | + |
| 424 | + /// Handle a [MessageEvent], returning whether listeners should be notified. |
| 425 | + bool handleMessageEvent(MessageEvent event) { |
| 426 | + if (event.message is! StreamMessage) return false; |
| 427 | + final StreamMessage(:streamId, :topic) = event.message as StreamMessage; |
| 428 | + |
| 429 | + final topicMaxIdsInStream = _topicMaxIdsByStream[streamId]; |
| 430 | + if (topicMaxIdsInStream == null) return false; |
| 431 | + assert(!topicMaxIdsInStream.containsKey(topic) |
| 432 | + || topicMaxIdsInStream[topic]! < event.message.id); |
| 433 | + topicMaxIdsInStream[topic] = event.message.id; |
| 434 | + return true; |
| 435 | + } |
| 436 | + |
| 437 | + /// Handle a [UpdateMessageEvent], returning whether listeners should be |
| 438 | + /// notified. |
| 439 | + bool handleUpdateMessageEvent(UpdateMessageEvent event) { |
| 440 | + if (event.moveData == null) return false; |
| 441 | + final UpdateMessageMoveData( |
| 442 | + :origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode, |
| 443 | + ) = event.moveData!; |
| 444 | + bool shouldNotify = false; |
| 445 | + |
| 446 | + final topicMaxIdsInOrigStream = _topicMaxIdsByStream[origStreamId]; |
| 447 | + if (propagateMode == PropagateMode.changeAll |
| 448 | + && topicMaxIdsInOrigStream != null) { |
| 449 | + shouldNotify = topicMaxIdsInOrigStream.remove(origTopic) != null; |
| 450 | + } |
| 451 | + |
| 452 | + final topicMaxIdsInNewStream = _topicMaxIdsByStream[newStreamId]; |
| 453 | + if (topicMaxIdsInNewStream != null) { |
| 454 | + final movedMaxId = event.messageIds.max; |
| 455 | + if (!topicMaxIdsInNewStream.containsKey(newTopic) |
| 456 | + || topicMaxIdsInNewStream[newTopic]! < movedMaxId) { |
| 457 | + topicMaxIdsInNewStream[newTopic] = movedMaxId; |
| 458 | + shouldNotify = true; |
| 459 | + } |
| 460 | + } |
| 461 | + |
| 462 | + return shouldNotify; |
| 463 | + } |
377 | 464 | }
|
| 465 | + |
| 466 | +/// A topic and its known latest message ID. |
| 467 | +typedef TopicMaxId = ({TopicName topic, int maxId}); |
0 commit comments