@@ -582,7 +582,7 @@ class Channel {
582
582
// Eg. Updating the message while the previous call is in progress.
583
583
_messageAttachmentsUploadCompleter
584
584
.remove (message.id)
585
- ? .completeError ('Message Cancelled' );
585
+ ? .completeError (const StreamChatError ( 'Message cancelled' ) );
586
586
587
587
final quotedMessage = state! .messages.firstWhereOrNull (
588
588
(m) => m.id == message.quotedMessageId,
@@ -592,7 +592,7 @@ class Channel {
592
592
localCreatedAt: DateTime .now (),
593
593
user: _client.state.currentUser,
594
594
quotedMessage: quotedMessage,
595
- status : MessageSendingStatus .sending,
595
+ state : MessageState .sending,
596
596
attachments: message.attachments.map (
597
597
(it) {
598
598
if (it.uploadState.isSuccess) return it;
@@ -630,15 +630,24 @@ class Channel {
630
630
),
631
631
);
632
632
633
- final sentMessage = response.message.syncWith (message);
633
+ final sentMessage = response.message.syncWith (message).copyWith (
634
+ // Update the message state to sent.
635
+ state: MessageState .sent,
636
+ );
634
637
635
638
state! .updateMessage (sentMessage);
636
639
if (cooldown > 0 ) cooldownStartedAt = DateTime .now ();
637
640
return response;
638
641
} catch (e) {
639
642
if (e is StreamChatNetworkError && e.isRetriable) {
640
- state! ._retryQueue.add ([message]);
643
+ state! ._retryQueue.add ([
644
+ message.copyWith (
645
+ // Update the message state to failed.
646
+ state: MessageState .sendingFailed,
647
+ ),
648
+ ]);
641
649
}
650
+
642
651
rethrow ;
643
652
}
644
653
}
@@ -653,17 +662,18 @@ class Channel {
653
662
Message message, {
654
663
bool skipEnrichUrl = false ,
655
664
}) async {
665
+ _checkInitialized ();
656
666
final originalMessage = message;
657
667
658
668
// Cancelling previous completer in case it's called again in the process
659
669
// Eg. Updating the message while the previous call is in progress.
660
670
_messageAttachmentsUploadCompleter
661
671
.remove (message.id)
662
- ? .completeError ('Message Cancelled' );
672
+ ? .completeError (const StreamChatError ( 'Message cancelled' ) );
663
673
664
674
// ignore: parameter_assignments
665
675
message = message.copyWith (
666
- status : MessageSendingStatus .updating,
676
+ state : MessageState .updating,
667
677
localUpdatedAt: DateTime .now (),
668
678
attachments: message.attachments.map (
669
679
(it) {
@@ -699,19 +709,30 @@ class Channel {
699
709
),
700
710
);
701
711
702
- final updatedMessage = response.message
703
- .syncWith (message)
704
- .copyWith (ownReactions: message.ownReactions);
712
+ final updateMessage = response.message.syncWith (message).copyWith (
713
+ // Update the message state to updated.
714
+ state: MessageState .updated,
715
+ ownReactions: message.ownReactions,
716
+ );
705
717
706
- state? .updateMessage (updatedMessage );
718
+ state? .updateMessage (updateMessage );
707
719
708
720
return response;
709
721
} catch (e) {
710
722
if (e is StreamChatNetworkError ) {
711
723
if (e.isRetriable) {
712
- state! ._retryQueue.add ([message]);
724
+ state! ._retryQueue.add ([
725
+ message.copyWith (
726
+ // Update the message state to failed.
727
+ state: MessageState .updatingFailed,
728
+ ),
729
+ ]);
713
730
} else {
714
- state? .updateMessage (originalMessage);
731
+ // Reset the message to original state if the update fails and is not
732
+ // retriable.
733
+ state? .updateMessage (originalMessage.copyWith (
734
+ state: MessageState .updatingFailed,
735
+ ));
715
736
}
716
737
}
717
738
rethrow ;
@@ -729,6 +750,23 @@ class Channel {
729
750
List <String >? unset,
730
751
bool skipEnrichUrl = false ,
731
752
}) async {
753
+ _checkInitialized ();
754
+ final originalMessage = message;
755
+
756
+ // Cancelling previous completer in case it's called again in the process
757
+ // Eg. Updating the message while the previous call is in progress.
758
+ _messageAttachmentsUploadCompleter
759
+ .remove (message.id)
760
+ ? .completeError (const StreamChatError ('Message cancelled' ));
761
+
762
+ // ignore: parameter_assignments
763
+ message = message.copyWith (
764
+ state: MessageState .updating,
765
+ localUpdatedAt: DateTime .now (),
766
+ );
767
+
768
+ state? .updateMessage (message);
769
+
732
770
try {
733
771
// Wait for the previous update call to finish. Otherwise, the order of
734
772
// messages will not be maintained.
@@ -741,78 +779,120 @@ class Channel {
741
779
),
742
780
);
743
781
744
- final updatedMessage = response.message
745
- .syncWith (message)
746
- .copyWith (ownReactions: message.ownReactions);
782
+ final updatedMessage = response.message.syncWith (message).copyWith (
783
+ // Update the message state to updated.
784
+ state: MessageState .updated,
785
+ ownReactions: message.ownReactions,
786
+ );
747
787
748
788
state? .updateMessage (updatedMessage);
749
789
750
790
return response;
751
791
} catch (e) {
752
- if (e is StreamChatNetworkError && e.isRetriable) {
753
- state! ._retryQueue.add ([message]);
792
+ if (e is StreamChatNetworkError ) {
793
+ if (e.isRetriable) {
794
+ state! ._retryQueue.add ([
795
+ message.copyWith (
796
+ // Update the message state to failed.
797
+ state: MessageState .updatingFailed,
798
+ ),
799
+ ]);
800
+ } else {
801
+ // Reset the message to original state if the update fails and is not
802
+ // retriable.
803
+ state? .updateMessage (originalMessage.copyWith (
804
+ state: MessageState .updatingFailed,
805
+ ));
806
+ }
754
807
}
808
+
755
809
rethrow ;
756
810
}
757
811
}
758
812
759
813
final _deleteMessageLock = Lock ();
760
814
761
815
/// Deletes the [message] from the channel.
762
- Future <EmptyResponse > deleteMessage (Message message, {bool ? hard}) async {
763
- final hardDelete = hard ?? false ;
816
+ Future <EmptyResponse > deleteMessage (
817
+ Message message, {
818
+ bool hard = false ,
819
+ }) async {
820
+ _checkInitialized ();
764
821
765
- // Directly deleting the local messages which are not yet sent to server
766
- if (message.status == MessageSendingStatus .sending ||
767
- message.status == MessageSendingStatus .failed) {
822
+ // Directly deleting the local messages which are not yet sent to server.
823
+ if (message.remoteCreatedAt == null ) {
768
824
state! .deleteMessage (
769
825
message.copyWith (
770
826
type: 'deleted' ,
771
827
localDeletedAt: DateTime .now (),
772
- status : MessageSendingStatus .sent ,
828
+ state : MessageState . deleted (hard : hard) ,
773
829
),
774
- hardDelete: hardDelete ,
830
+ hardDelete: hard ,
775
831
);
776
832
777
833
// Removing the attachments upload completer to stop the `sendMessage`
778
834
// waiting for attachments to complete.
779
835
_messageAttachmentsUploadCompleter
780
836
.remove (message.id)
781
- ? .completeError (Exception ('Message deleted' ));
837
+ ? .completeError (const StreamChatError ('Message deleted' ));
838
+
839
+ // Returning empty response to mark the api call as success.
782
840
return EmptyResponse ();
783
841
}
784
842
785
- try {
786
- // ignore: parameter_assignments
787
- message = message.copyWith (
788
- type: 'deleted' ,
789
- status: MessageSendingStatus .deleting,
790
- deletedAt: message.deletedAt ?? DateTime .now (),
791
- );
843
+ // ignore: parameter_assignments
844
+ message = message.copyWith (
845
+ type: 'deleted' ,
846
+ deletedAt: DateTime .now (),
847
+ state: MessageState .deleting (hard: hard),
848
+ );
792
849
793
- state? .deleteMessage (message, hardDelete: hardDelete );
850
+ state? .deleteMessage (message, hardDelete: hard );
794
851
852
+ try {
795
853
// Wait for the previous delete call to finish. Otherwise, the order of
796
854
// messages will not be maintained.
797
855
final response = await _deleteMessageLock.synchronized (
798
856
() => _client.deleteMessage (message.id, hard: hard),
799
857
);
800
858
801
859
final deletedMessage = message.copyWith (
802
- status : MessageSendingStatus .sent ,
860
+ state : MessageState . deleted (hard : hard) ,
803
861
);
804
862
805
- state? .deleteMessage (deletedMessage, hardDelete: hardDelete );
863
+ state? .deleteMessage (deletedMessage, hardDelete: hard );
806
864
807
865
return response;
808
866
} catch (e) {
809
867
if (e is StreamChatNetworkError && e.isRetriable) {
810
- state! ._retryQueue.add ([message]);
868
+ state! ._retryQueue.add ([
869
+ message.copyWith (
870
+ // Update the message state to failed.
871
+ state: MessageState .deletingFailed (hard: hard),
872
+ ),
873
+ ]);
811
874
}
812
875
rethrow ;
813
876
}
814
877
}
815
878
879
+ /// Retry the operation on the message based on the failed state.
880
+ ///
881
+ /// For example, if the message failed to send, it will retry sending the
882
+ /// message and vice-versa.
883
+ Future <Object > retryMessage (Message message) async {
884
+ assert (message.state.isFailed, 'Message state is not failed' );
885
+
886
+ return message.state.maybeWhen (
887
+ failed: (state, _) => state.when (
888
+ sendingFailed: () => sendMessage (message),
889
+ updatingFailed: () => updateMessage (message),
890
+ deletingFailed: (hard) => deleteMessage (message, hard: hard),
891
+ ),
892
+ orElse: () => throw StateError ('Message state is not failed' ),
893
+ );
894
+ }
895
+
816
896
/// Pins provided message
817
897
Future <UpdateMessageResponse > pinMessage (
818
898
Message message, {
@@ -1895,15 +1975,7 @@ class ChannelClientState {
1895
1975
/// Retry failed message.
1896
1976
Future <void > retryFailedMessages () async {
1897
1977
final failedMessages = [...messages, ...threads.values.expand ((v) => v)]
1898
- .where (
1899
- (message) =>
1900
- message.status != MessageSendingStatus .sent &&
1901
- message.createdAt.isBefore (
1902
- DateTime .now ().subtract (const Duration (seconds: 5 )),
1903
- ),
1904
- )
1905
- .toList ();
1906
-
1978
+ .where ((it) => it.state.isFailed);
1907
1979
_retryQueue.add (failedMessages);
1908
1980
}
1909
1981
0 commit comments