Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 152 additions & 52 deletions example/src/ConversationScreen.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import * as ImagePicker from 'expo-image-picker'
import type { ImagePickerAsset } from 'expo-image-picker'
import { PermissionStatus } from 'expo-modules-core'
import moment from 'moment'
import React, { useCallback, useMemo, useRef, useState } from 'react'
import React, { useCallback, useMemo, useRef, useState, useEffect } from 'react'
import {
Button,
FlatList,
Expand Down Expand Up @@ -72,14 +72,10 @@ export default function ConversationScreen({
const [replyingTo, setReplyingTo] = useState<string | null>(null)
const [text, setText] = useState('')
const [isShowingAttachmentModal, setShowingAttachmentModal] = useState(false)
const [attachment, setAttachment] = useState<Attachment | null>(null)

const [isAttachmentPreviewing, setAttachmentPreviewing] = useState(false)
const [isSending, setSending] = useState(false)
// const { remoteAttachment } = usePrepareRemoteAttachment({
// fileUri: attachment?.image?.uri || attachment?.file?.uri,
// mimeType: attachment?.file?.mimeType,
// })

// Update state to handle multiple attachments
const [attachments, setAttachments] = useState<Attachment[]>([])
const [previewingAttachmentIndex, setPreviewingAttachmentIndex] = useState<
Expand All @@ -100,20 +96,126 @@ export default function ConversationScreen({
}
}),
})
const [streamedMessages, setStreamedMessages] = useState<any[]>([])

const streamCleanupRef = useRef<(() => void) | null>(null)

useEffect(() => {
if (!conversation) return

// If there's already an active stream, clean it up first
if (streamCleanupRef.current) {
console.log('🧹 Cleaning up existing stream before setting up new one')
streamCleanupRef.current()
streamCleanupRef.current = null
}

console.log('🔵 Setting up stream for conversation:', conversation.id)

let isActive = true
let retryTimeout: NodeJS.Timeout | null = null
let retryCount = 0
const maxRetries = 10

const attemptStreamSetup = async () => {
if (!isActive || retryCount >= maxRetries) {
if (retryCount >= maxRetries) {
console.log(
`❌ Max retries (${maxRetries}) reached, giving up on stream`
)
}
return
}

try {
// First, try to sync
console.log(
`🔄 Checking sync... (attempt ${retryCount + 1}/${maxRetries})`
)
await conversation.sync()
console.log(`✅ Conversation sync successful`)

// If sync works, try to start the stream
console.log(
`🔄 Starting stream... (attempt ${retryCount + 1}/${maxRetries})`
)
const cleanup = await conversation.streamMessages(
async (message) => {
console.log(
'📨 Streamed message:',
message.id,
message.contentTypeId,
message.content()
)

if (isActive) {
setStreamedMessages((prev) => {
const messageExists = prev.some((msg) => msg.id === message.id)
if (messageExists) return prev

return [message, ...prev]
})
}
},
// onClose callback - retry the whole setup
() => {
console.log('❌ Stream closed for conversation:', conversation.id)
if (isActive && retryCount < maxRetries) {
retryCount++
console.log(
`🔄 Stream closed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`
)
retryTimeout = setTimeout(attemptStreamSetup, 10000)
}
}
)

// Reset retry count on successful setup
retryCount = 0
streamCleanupRef.current = cleanup
console.log(
'✅ Stream setup complete for conversation:',
conversation.id
)
} catch (error) {
console.error('💥 Error during setup (sync or stream):', error)

// Retry the whole thing (sync + stream) on any error
if (isActive && retryCount < maxRetries) {
retryCount++
console.log(
`🔄 Setup failed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`
)
retryTimeout = setTimeout(attemptStreamSetup, 10000)
}
}
}

// Start the first attempt
attemptStreamSetup().catch((error) => {
console.error('Failed to start initial stream setup:', error)
})

// const sendRemoteAttachmentMessages = () => {
// if (remoteAttachments && remoteAttachments.length) {
// Promise.all(
// remoteAttachments.map(attachment =>
// sendMessage({ remoteAttachment: attachment })
// )
// )
// .then(() => setAttachments([]))
// .catch((e) => {
// console.error('Error sending messages: ', e)
// })
// }
// }
return () => {
console.log('🧹 Cleaning up stream for conversation:', conversation.id)
isActive = false

// Clear retry timeout if it exists
if (retryTimeout) {
clearTimeout(retryTimeout)
retryTimeout = null
}

if (streamCleanupRef.current) {
try {
streamCleanupRef.current()
} catch (error: any) {
console.error('💥 Error during cleanup:', error)
}
streamCleanupRef.current = null
}
}
}, [conversation?.id, topic])

const sendMultiRemoteAttachmentMessage = () => {
if (remoteAttachments && remoteAttachments.length) {
Expand All @@ -132,13 +234,27 @@ export default function ConversationScreen({
}
}

const filteredMessages = useMemo(
() =>
(messages ?? [])?.filter(
(message) => !hiddenMessageTypes.includes(message.contentTypeId)
),
[messages]
)
const filteredMessages = useMemo(() => {
// Start with fetched messages
let allMessages = [...(messages ?? [])]

// Only add streamed messages if they're not already in the fetched messages
const streamedToAdd = streamedMessages.filter(
(streamedMsg) =>
!allMessages.some((fetchedMsg) => fetchedMsg.id === streamedMsg.id)
)

// Combine them
allMessages = [...allMessages, ...streamedToAdd]

// Sort by timestamp (newest first since FlatList is inverted)
const sortedMessages = allMessages.sort((a, b) => b.sentNs - a.sentNs)

// Filter out hidden message types
return sortedMessages.filter(
(message) => !hiddenMessageTypes.includes(message.contentTypeId)
)
}, [messages, streamedMessages])

const sendMessage = async (
content: ConversationSendPayload<SupportedContentTypes>
Expand All @@ -155,23 +271,15 @@ export default function ConversationScreen({
} as ConversationSendPayload<SupportedContentTypes>)
: content
await conversation!.send(content)
await refreshMessages()
await handleRefreshMessages()
setReplyingTo(null)
} catch (e) {
console.log('Error sending message', e)
} finally {
setSending(false)
}
}
// const sendRemoteAttachmentMessage = () => {
// if (remoteAttachment) {
// sendMessage({ remoteAttachment })
// .then(() => setAttachment(null))
// .catch((e) => {
// console.error('Error sending message: ', e)
// })
// }
// }

const sendTextMessage = () => sendMessage({ text }).then(() => setText(''))
const scrollToMessageId = useCallback(
(messageId: string) => {
Expand All @@ -189,6 +297,13 @@ export default function ConversationScreen({
[filteredMessages]
)

const handleRefreshMessages = useCallback(async () => {
// Clear streamed messages since refresh will include them
setStreamedMessages([])
// Call the original refresh
return await refreshMessages()
}, [refreshMessages])

return (
<SafeAreaView style={{ flex: 1 }}>
<KeyboardAvoidingView
Expand Down Expand Up @@ -222,7 +337,7 @@ export default function ConversationScreen({
style={{ flexGrow: 1 }}
contentContainerStyle={{ paddingBottom: 100 }}
refreshing={isFetching || isRefetching}
onRefresh={refreshMessages}
onRefresh={handleRefreshMessages}
data={filteredMessages}
inverted
keyboardDismissMode="none"
Expand Down Expand Up @@ -263,21 +378,6 @@ export default function ConversationScreen({
onPress={() => scrollToMessageId(replyingTo!)}
/>
)}
{attachment && (
<>
<AttachmentInputHeader
topic={topic}
attachment={attachment}
onPress={() => setAttachmentPreviewing(true)}
onRemove={() => setAttachment(null)}
/>
<AttachmentPreviewModal
attachment={attachment}
visible={isAttachmentPreviewing}
onRequestClose={() => setAttachmentPreviewing(false)}
/>
</>
)}
{attachments.length > 0 && (
<>
<AttachmentInputHeader
Expand Down
20 changes: 13 additions & 7 deletions example/src/tests/contentTypeTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ function test(name: string, perform: () => Promise<boolean>) {

test('DecodedMessage.from() should throw informative error on null', async () => {
try {
DecodedMessage.from("undefined")
DecodedMessage.from('undefined')
} catch (e: any) {
assert(e.toString().includes('JSON Parse error'), 'Error: ' + e.toString())
}

try {
DecodedMessage.from("")
DecodedMessage.from('')
} catch (e: any) {
assert(e.toString().includes('JSON Parse error'), 'Error: ' + e.toString())
}
Expand All @@ -42,23 +42,29 @@ test('DecodedMessage.from() should throw informative error on null', async () =>
try {
DecodedMessage.from(null)
} catch (e: any) {
assert(e.toString().includes('Tried to parse null as a DecodedMessage'), 'Error: ' + e.toString())
assert(
e.toString().includes('Tried to parse null as a DecodedMessage'),
'Error: ' + e.toString()
)
}

try {
DecodedMessage.from("null")
DecodedMessage.from('null')
} catch (e: any) {
assert(e.toString().includes('Tried to parse null as a DecodedMessage'), 'Error: ' + e.toString())
assert(
e.toString().includes('Tried to parse null as a DecodedMessage'),
'Error: ' + e.toString()
)
}

let json = '{"id": "123", "topic": "123", "contentTypeId": "123", "senderInboxId": "123", "sentNs": 123, "content": "123", "fallback": "123", "deliveryStatus": "123", "childMessages": null}'
const json =
'{"id": "123", "topic": "123", "contentTypeId": "123", "senderInboxId": "123", "sentNs": 123, "content": "123", "fallback": "123", "deliveryStatus": "123", "childMessages": null}'
try {
DecodedMessage.from(json)
} catch (e: any) {
assert(false, 'Error: ' + e.toString())
}
return true

})

test('can fetch messages with reactions', async () => {
Expand Down
2 changes: 1 addition & 1 deletion example/src/tests/dmTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ test('can stream dm messages', async () => {
await alixDm.streamMessages(async () => {
dmMessageCallbacks++
})

await delayToPropogate(1000)
await alixConversation?.send({ text: `first message` })
await alixDm.send({ text: `first message` })
Expand Down
Loading