diff --git a/backend/plugins/frontline_api/package.json b/backend/plugins/frontline_api/package.json index e1abbeaf36..eb7a508468 100644 --- a/backend/plugins/frontline_api/package.json +++ b/backend/plugins/frontline_api/package.json @@ -13,12 +13,17 @@ "license": "ISC", "dependencies": { "aws-sdk": "^2.1692.0", + "base64-stream": "^1.0.0", "debug": "^4.4.0", + "dotenv": "^17.2.1", "erxes-api-shared": "workspace:^", "fast-xml-parser": "^5.2.5", "fbgraph": "^1.4.4", + "ioredis": "^5.7.0", + "mailparser": "^3.7.4", "moment-timezone": "^0.5.48", - "redlock": "5.0.0-beta.2", + "node-imap": "^0.9.6", + "redlock": "4.2.0", "string-strip-html": "^13.4.12", "strip-html": "^1.0.2" } diff --git a/backend/plugins/frontline_api/src/main.ts b/backend/plugins/frontline_api/src/main.ts index b1a319a706..ad5b787155 100644 --- a/backend/plugins/frontline_api/src/main.ts +++ b/backend/plugins/frontline_api/src/main.ts @@ -8,6 +8,7 @@ import { generateModels } from './connectionResolvers'; import { initializeCallQueueMonitoring } from '~/modules/integrations/call/worker/callDashboard'; import automations from './meta/automations'; import initCallApp from '~/modules/integrations/call/initApp'; +import initImapApp from '~/modules/integrations/imap/configs'; startPlugin({ name: 'frontline', @@ -28,8 +29,8 @@ startPlugin({ expressRouter: router, onServerInit: async (app) => { + await initImapApp(app); await initCallApp(app); - try { if (getEnv({ name: 'CALL_DASHBOARD_ENABLED' })) { await initializeCallQueueMonitoring(); diff --git a/backend/plugins/frontline_api/src/modules/inbox/graphql/resolvers/mutations/integrations.ts b/backend/plugins/frontline_api/src/modules/inbox/graphql/resolvers/mutations/integrations.ts index aff7f045ee..2ede4aeced 100644 --- a/backend/plugins/frontline_api/src/modules/inbox/graphql/resolvers/mutations/integrations.ts +++ b/backend/plugins/frontline_api/src/modules/inbox/graphql/resolvers/mutations/integrations.ts @@ -19,6 +19,12 @@ import { facebookRepairIntegrations, facebookCreateIntegrations, } from '@/integrations/facebook/messageBroker'; + +import { + imapUpdateIntegrations, + imapRemoveIntegrations, + imapCreateIntegrations, +} from '@/integrations/imap/messageBroker'; import { callCreateIntegration, callRemoveIntergration, @@ -59,7 +65,8 @@ export const sendCreateIntegration = async ( return await facebookCreateIntegrations({ subdomain, data }); case 'calls': return await callCreateIntegration({ subdomain, data }); - + case 'imap': + return await imapCreateIntegrations({ subdomain, data }); case 'instagram': // TODO: Implement Instagram integration break; @@ -89,6 +96,8 @@ export const sendUpdateIntegration = async ( return await facebookUpdateIntegrations({ subdomain, data }); case 'calls': return await callUpdateIntegration({ subdomain, data }); + case 'imap': + return await imapUpdateIntegrations({ subdomain, data }); case 'instagram': break; @@ -116,6 +125,8 @@ export const sendRemoveIntegration = async ( return await facebookRemoveIntegrations({ subdomain, data }); case 'calls': return await callRemoveIntergration({ subdomain, data }); + case 'imap': + return await imapRemoveIntegrations({ subdomain, data }); case 'instagram': break; diff --git a/backend/plugins/frontline_api/src/modules/integrations/imap/configs.ts b/backend/plugins/frontline_api/src/modules/integrations/imap/configs.ts index eea30f9109..721942a450 100644 --- a/backend/plugins/frontline_api/src/modules/integrations/imap/configs.ts +++ b/backend/plugins/frontline_api/src/modules/integrations/imap/configs.ts @@ -1,145 +1,182 @@ -// import * as fs from 'fs'; -// import { Base64Decode } from 'base64-stream'; -// import typeDefs from './graphql/typeDefs'; -// import resolvers from './graphql/resolvers'; - -// import { setupMessageConsumers } from './messageBroker'; -// import { generateModels } from '~/connectionResolvers'; -// import { getEnv, getSubdomain } from '@erxes/api-utils/src/core'; -// import startDistributingJobs, { -// findAttachmentParts, -// createImap, -// toUpper, -// routeErrorHandling, -// } from './utils'; -// import { getSaasOrganizations } from 'erxes-api-shared/utils'; -// import express from 'express'; -// import { debugError } from '~/modules/inbox/utils'; - -// const app = express(); - -// export default { -// onServerInit: async () => { -// app.get( -// '/read-mail-attachment', -// routeErrorHandling( -// async (req, res, next) => { -// const subdomain = getSubdomain(req); -// const models = await generateModels(subdomain); - -// const { messageId, integrationId, filename } = req.query; - -// const integration = await models.Integrations.findOne({ -// inboxId: integrationId, -// }); - -// if (!integration) { -// throw new Error('Integration not found'); -// } - -// const sentMessage = await models.Messages.findOne({ -// messageId, -// inboxIntegrationId: integrationId, -// type: 'SENT', -// }); - -// let folderType = 'INBOX'; - -// if (sentMessage) { -// folderType = '[Gmail]/Sent Mail'; -// } - -// const imap = createImap(integration); - -// imap.once('ready', () => { -// imap.openBox(folderType, true, async (err, box) => { -// imap.search( -// [['HEADER', 'MESSAGE-ID', messageId]], -// function (err, results) { -// if (err) { -// imap.end(); -// console.log('read-mail-attachment =============', err); -// return next(err); -// } - -// let f; - -// try { -// f = imap.fetch(results, { bodies: '', struct: true }); -// } catch (e) { -// imap.end(); -// debugError('messageId ', messageId); -// return next(e); -// } - -// f.on('message', function (msg) { -// msg.once('attributes', function (attrs) { -// const attachments = findAttachmentParts(attrs.struct); - -// if (attachments.length === 0) { -// imap.end(); -// return res.status(404).send('Not found'); -// } - -// for (let i = 0, len = attachments.length; i < len; ++i) { -// const attachment = attachments[i]; - -// if (attachment.params.name === filename) { -// const f = imap.fetch(attrs.uid, { -// bodies: [attachment.partID], -// struct: true, -// }); - -// f.on('message', (msg) => { -// const filename = attachment.params.name; -// const encoding = attachment.encoding; - -// msg.on('body', function (stream) { -// const writeStream = fs.createWriteStream( -// `${__dirname}/${filename}`, -// ); - -// if (toUpper(encoding) === 'BASE64') { -// stream -// .pipe(new Base64Decode()) -// .pipe(writeStream); -// } else { -// stream.pipe(writeStream); -// } -// }); - -// msg.once('end', function () { -// imap.end(); -// return res.download(`${__dirname}/${filename}`); -// }); -// }); -// } -// } -// }); -// }); -// }, -// ); -// }); -// }); - -// imap.connect(); -// }, -// (res) => res.send('ok'), -// ), -// ); - -// const VERSION = getEnv({ name: 'VERSION' }); - -// if (VERSION && VERSION === 'saas') { -// const organizations = await getOrganizations(); - -// for (const org of organizations) { -// console.log(`Started listening for organization [${org.subdomain}]`); -// await startDistributingJobs(org.subdomain); -// } -// } else { -// startDistributingJobs('os'); -// } -// }, -// setupMessageConsumers, -// }; +import * as fs from 'fs'; +import * as path from 'path'; +import os from 'os'; +import { Base64Decode } from 'base64-stream'; +import express from 'express'; +import { generateModels, IModels } from '~/connectionResolvers'; +import { + getEnv, + getSubdomain, + getSaasOrganizations, +} from 'erxes-api-shared/utils'; +import startDistributingJobs, { + findAttachmentParts, + createImap, + toUpper, + routeErrorHandling, +} from './utils'; +import { debugError } from '~/modules/inbox/utils'; + +// Import IMAP consumers utils +import { + imapCreateIntegrations, + imapUpdateIntegrations, + imapRemoveIntegrations, + ImapListen, + sendImapMessage, +} from './messageBroker'; +import { models } from 'mongoose'; + +// ================= IMAP APP =================== +const initImapApp = async (app: express.Application) => { + app.use(express.json({ limit: '15mb' })); + + // ====== Read Mail Attachment Route ====== + app.get( + '/read-mail-attachment', + routeErrorHandling( + async (req, res, next) => { + const subdomain = getSubdomain(req); + + const models = await generateModels(subdomain); + + const { messageId, integrationId, filename } = req.query as Record< + string, + string + >; + + if (!messageId || !integrationId || !filename) { + return res.status(400).send('Missing required query parameters'); + } + + const integration = await models.ImapIntegrations.findOne({ + inboxId: integrationId, + }); + if (!integration) throw new Error('Integration not found'); + + const sentMessage = await models.ImapMessages.findOne({ + messageId, + inboxIntegrationId: integrationId, + type: 'SENT', + }); + + const folderType = sentMessage ? '[Gmail]/Sent Mail' : 'INBOX'; + const imap = createImap(integration); + + imap.once('ready', () => { + imap.openBox(folderType, true, (err) => { + if (err) return next(err); + + imap.search( + [['HEADER', 'MESSAGE-ID', messageId]], + (err, results) => { + if (err || !results || results.length === 0) { + imap.end(); + return res.status(404).send('Message not found'); + } + + const f = imap.fetch(results, { bodies: '', struct: true }); + + f.on('message', (msg) => { + msg.once('attributes', (attrs) => { + const attachments = findAttachmentParts(attrs.struct); + + if (!attachments || attachments.length === 0) { + imap.end(); + return res.status(404).send('No attachments found'); + } + + const attachment = attachments.find( + (att) => att.params?.name === filename, + ); + if (!attachment) { + imap.end(); + return res.status(404).send('Attachment not found'); + } + + const tempPath = path.join( + os.tmpdir(), + `${Date.now()}-${attachment.params.name}`, + ); + + const fetcher = imap.fetch(attrs.uid, { + bodies: [attachment.partID], + struct: true, + }); + + fetcher.on('message', (msg) => { + msg.on('body', (stream) => { + const writeStream = fs.createWriteStream(tempPath); + + if (toUpper(attachment.encoding) === 'BASE64') { + stream.pipe(new Base64Decode()).pipe(writeStream); + } else { + stream.pipe(writeStream); + } + + writeStream.on('finish', () => { + res.download( + tempPath, + attachment.params.name, + (err) => { + fs.unlink(tempPath, () => {}); + imap.end(); + if (err) return next(err); + }, + ); + }); + }); + }); + }); + }); + + f.once('error', (err) => { + imap.end(); + next(err); + }); + }, + ); + }); + }); + + imap.once('error', (err) => next(err)); + imap.connect(); + }, + (res) => res.send('ok'), + ), + ); + + // ====== Setup IMAP Consumers ====== + + const setupMessageConsumers = async (subdomain: string) => { + const models: IModels = await generateModels(subdomain); + const integrations = await models.ImapIntegrations.find({ + healthStatus: 'healthy', + }); + + for (const integration of integrations) { + await ImapListen({ + subdomain, + data: { _id: (integration._id as any).toString() }, + }); + } + }; + + // ====== SaaS / OS job distribution ====== + const VERSION = getEnv({ name: 'VERSION' }); + + if (VERSION === 'saas') { + const organizations = await getSaasOrganizations(); + for (const org of organizations) { + await setupMessageConsumers(org.subdomain); + await startDistributingJobs(org.subdomain); + } + } else { + await setupMessageConsumers('os'); + await startDistributingJobs('os'); + } + + console.log('IMAP plugin initialized successfully.'); +}; + +export default initImapApp; diff --git a/backend/plugins/frontline_api/src/modules/integrations/imap/messageBroker.ts b/backend/plugins/frontline_api/src/modules/integrations/imap/messageBroker.ts new file mode 100644 index 0000000000..f6318f02be --- /dev/null +++ b/backend/plugins/frontline_api/src/modules/integrations/imap/messageBroker.ts @@ -0,0 +1,145 @@ +import { generateModels } from '~/connectionResolvers'; +import { listenIntegration } from '~/modules/integrations/imap/utils'; + +interface SendImapMessageArgs { + subdomain: string; + action?: string; + data: { + _id: string; + }; +} + +export async function imapCreateIntegrations({ subdomain, data }) { + try { + const models = await generateModels(subdomain); + + const integration = await models.ImapIntegrations.create({ + inboxId: data.integrationId, + healthStatus: 'healthy', + error: '', + ...JSON.parse(data), + }); + + listenIntegration(subdomain, integration, models); + + await models.ImapLogs.createLog({ + type: 'info', + message: `Started syncing ${integration.user}`, + }); + + return { + status: 'success', + }; + } catch (e) { + return { + status: 'error', + errorMessage: `Failed to create integration: ${e.message}`, + }; + } +} + +export async function imapUpdateIntegrations({ + subdomain, + data: { integrationId, doc }, +}) { + try { + const detail = JSON.parse(doc.data); + const models = await generateModels(subdomain); + + const integration = await models.ImapIntegrations.findOne({ + inboxId: integrationId, + }); + + if (!integration) { + return { + status: 'error', + errorMessage: 'Integration not found.', + }; + } + + detail.healthStatus = 'healthy'; + detail.error = ''; + + await models.ImapIntegrations.updateOne( + { inboxId: integrationId }, + { $set: detail }, + ); + + const updatedIntegration = await models.ImapIntegrations.findOne({ + inboxId: integrationId, + }); + + if (updatedIntegration) { + listenIntegration(subdomain, integration, models); + } + + return { + status: 'success', + }; + } catch (e) { + return { + status: 'error', + errorMessage: `Failed to update integration: ${e.message}`, + }; + } +} + +export async function imapRemoveIntegrations({ + subdomain, + data: { integrationId }, +}) { + try { + const models = await generateModels(subdomain); + + await models.ImapMessages.deleteMany({ + inboxIntegrationId: integrationId, + }); + await models.ImapCustomers.deleteMany({ + inboxIntegrationId: integrationId, + }); + await models.ImapIntegrations.deleteMany({ + inboxId: integrationId, + }); + + return { + status: 'success', + }; + } catch (e) { + return { + status: 'error', + errorMessage: `Failed to remove integration: ${e.message}`, + }; + } +} + +export async function sendImapMessage({ + subdomain, + data: { _id }, +}: SendImapMessageArgs) { + const models = await generateModels(subdomain); + + const integration = await models.ImapIntegrations.findById(_id); + + if (!integration) { + console.log(`Queue: imap:listen. Integration not found ${_id}`); + return; + } + + listenIntegration(subdomain, integration, models); + + return { status: 'success' }; +} + +export async function ImapListen({ + subdomain, + data: { _id }, +}: SendImapMessageArgs) { + const models = await generateModels(subdomain); + const integration = await models.ImapIntegrations.findById(_id); + if (!integration) { + console.log(`Queue: imap:listen. Integration not found ${_id}`); + return; + } + + listenIntegration(subdomain, integration, models); +} diff --git a/backend/plugins/frontline_api/src/modules/integrations/imap/models.ts b/backend/plugins/frontline_api/src/modules/integrations/imap/models.ts index c096f5d1d7..5c74461710 100644 --- a/backend/plugins/frontline_api/src/modules/integrations/imap/models.ts +++ b/backend/plugins/frontline_api/src/modules/integrations/imap/models.ts @@ -1,6 +1,7 @@ import { Document, Model, Schema } from 'mongoose'; import { IModels } from '~/connectionResolvers'; -// import { sendCoreMessage, sendInboxMessage } from '../src/messageBroker'; +import { sendTRPCMessage } from 'erxes-api-shared/utils'; + import * as nodemailer from 'nodemailer'; interface IMapMail { @@ -131,25 +132,31 @@ export const loadImapMessageClass = (models) => { ? { _id: customerId } : { status: { $ne: 'deleted' }, emails: { $in: to } }; - // customer = await sendCoreMessage({ - // subdomain, - // action: 'customers.findOne', - // data: selector, - // isRPC: true - // }); + customer = await sendTRPCMessage({ + pluginName: 'core', + method: 'query', + module: 'customers', + action: 'findOne', + input: { + selector, + }, + }); if (!customer) { const [primaryEmail] = to; - // customer = await sendCoreMessage({ - // subdomain, - // action: 'customers.createCustomer', - // data: { - // state: 'lead', - // primaryEmail - // }, - // isRPC: true - // }); + customer = await sendTRPCMessage({ + pluginName: 'core', + method: 'mutation', + module: 'customers', + action: 'createCustomer', + input: { + doc: { + state: 'lead', + primaryEmail, + }, + }, + }); } let integration; @@ -167,15 +174,15 @@ export const loadImapMessageClass = (models) => { } if (!integration && conversationId) { - // const conversation = await sendInboxMessage({ - // subdomain, - // action: 'conversations.findOne', - // data: { _id: conversationId }, - // isRPC: true - // }); - // integration = await models.Integrations.findOne({ - // inboxId: conversation.integrationId - // }); + const conversation = await models.Conversations.findOne({ + _id: conversationId, + }); + + if (conversation) { + integration = await models.ImapIntegrations.findOne({ + inboxId: conversation.integrationId, + }); + } } if (!integration) { @@ -184,20 +191,16 @@ export const loadImapMessageClass = (models) => { if (conversationId) { if (shouldResolve) { - // await sendInboxMessage({ - // subdomain, - // action: 'conversations.changeStatus', - // data: { id: conversationId, status: 'closed' }, - // isRPC: true - // }); + await models.Conversations.updateOne( + { _id: conversationId }, + { status: 'closed' }, + ); } if (shouldOpen) { - // await sendInboxMessage({ - // subdomain, - // action: 'conversations.changeStatus', - // data: { id: conversationId, status: 'new' }, - // isRPC: true - // }); + await models.Conversations.updateOne( + { _id: conversationId }, + { status: 'new' }, + ); } } @@ -230,26 +233,26 @@ export const loadImapMessageClass = (models) => { const info = await transporter.sendMail(mailData); - // models.Messages.create({ - // inboxIntegrationId: integration.inboxId, - // inboxConversationId: conversationId, - // createdAt: new Date(), - // messageId: info.messageId, - // inReplyTo: replyToMessageId, - // references: mailData.references, - // subject: mailData.subject, - // body: mailData.html, - // to: (mailData.to || []).map((to) => ({ name: to, address: to })), - // from: [{ name: mailData.from, address: mailData.from }], - // attachments: attachments - // ? attachments.map(({ name, type, size }) => ({ - // filename: name, - // type, - // size - // })) - // : [], - // type: 'SENT' - // }); + models.ImapMessages.create({ + inboxIntegrationId: integration.inboxId, + inboxConversationId: conversationId, + createdAt: new Date(), + messageId: info.messageId, + inReplyTo: replyToMessageId, + references: mailData.references, + subject: mailData.subject, + body: mailData.html, + to: (mailData.to || []).map((to) => ({ name: to, address: to })), + from: [{ name: mailData.from, address: mailData.from }], + attachments: attachments + ? attachments.map(({ name, type, size }) => ({ + filename: name, + type, + size, + })) + : [], + type: 'SENT', + }); return { info: info, }; diff --git a/backend/plugins/frontline_api/src/modules/integrations/imap/redlock.ts b/backend/plugins/frontline_api/src/modules/integrations/imap/redlock.ts index f0d52719e0..21d1e33080 100644 --- a/backend/plugins/frontline_api/src/modules/integrations/imap/redlock.ts +++ b/backend/plugins/frontline_api/src/modules/integrations/imap/redlock.ts @@ -8,7 +8,7 @@ const { REDIS_HOST, REDIS_PORT, REDIS_PASSWORD } = process.env; const redis = new Redis({ host: REDIS_HOST, port: parseInt(REDIS_PORT || '6379', 10), - password: REDIS_PASSWORD + password: REDIS_PASSWORD, }); export const redlock = new Redlock([redis]); diff --git a/backend/plugins/frontline_api/src/modules/integrations/imap/utils.ts b/backend/plugins/frontline_api/src/modules/integrations/imap/utils.ts index c442b31286..d2c0602d46 100644 --- a/backend/plugins/frontline_api/src/modules/integrations/imap/utils.ts +++ b/backend/plugins/frontline_api/src/modules/integrations/imap/utils.ts @@ -1,502 +1,479 @@ -// import * as dotenv from 'dotenv'; -// dotenv.config(); -// import * as Imap from 'node-imap'; -// import { simpleParser } from 'mailparser'; -// import { IModels, generateModels } from '~/connectionResolvers'; -// import { -// sendCoreMessage, -// sendImapMessage, -// sendInboxMessage, -// } from './messageBroker'; -// import { IIntegrationImapDocument } from '~/modules/integrations/imap/models'; -// import { throttle } from 'lodash'; -// import { redlock } from './redlock'; -// const { NODE_ENV } = process.env; - -// export const toUpper = (thing) => { -// return thing && thing.toUpperCase ? thing.toUpperCase() : thing; -// }; - -// export const findAttachmentParts = (struct, attachments?) => { -// attachments = attachments || []; - -// for (let i = 0, len = struct.length, _r: any; i < len; ++i) { -// if (Array.isArray(struct[i])) { -// findAttachmentParts(struct[i], attachments); -// } else { -// if ( -// struct[i].disposition && -// ['INLINE', 'ATTACHMENT'].indexOf(toUpper(struct[i].disposition.type)) > -// -1 -// ) { -// attachments.push(struct[i]); -// } -// } -// } -// return attachments; -// }; - -// export const createImap = (integration: IIntegrationImapDocument): Imap => { -// return new Imap({ -// user: integration.mainUser || integration.user, -// password: integration.password, -// host: integration.host, -// keepalive: { forceNoop: true }, -// port: 993, -// tls: true, -// }); -// }; - -// const searchMessages = (imap: Imap, criteria) => { -// return new Promise((resolve, reject) => { -// const messages: string[] = []; - -// imap.search(criteria, (err, results) => { -// if (err) { -// throw err; -// } - -// let f: Imap.ImapFetch; - -// try { -// f = imap.fetch(results, { bodies: '', struct: true }); -// f.on('error', (error: any) => { -// throw error; -// }); -// } catch (e) { -// if (e.message?.includes('Nothing to fetch')) { -// return resolve([]); -// } -// throw e; -// } - -// f.on('message', (msg) => { -// msg.on('body', async (stream) => { -// let buffers: Buffer[] = []; - -// stream.on('data', (buffer) => { -// buffers.push(buffer); -// }); - -// stream.once('end', async () => { -// messages.push(Buffer.concat(buffers).toString('utf8')); -// }); -// }); -// }); - -// f.once('end', async () => { -// const data: any = []; - -// for (const message of messages) { -// const parsed = await simpleParser(message); -// data.push(parsed); -// } - -// resolve(data); -// }); -// }); -// }); -// }; - -// const saveMessages = async ( -// subdomain: string, -// imap: Imap, -// integration: IIntegrationImapDocument, -// criteria, -// models: IModels, -// ) => { -// const msgs: any = await searchMessages(imap, criteria); - -// console.log(`======== found ${msgs.length} messages`); - -// for (const msg of msgs) { -// if ( -// msg.to && -// msg.to.value && -// msg.to.value[0] && -// msg.to.value[0].address !== integration.user -// ) { -// continue; -// } - -// const message = await models.Messages.findOne({ -// messageId: msg.messageId, -// }); - -// if (message) { -// continue; -// } - -// const from = msg.from.value[0].address; -// const prev = await models.Customers.findOne({ email: from }); - -// let customerId; - -// if (!prev) { -// const customer = await sendCoreMessage({ -// subdomain, -// action: 'customers.findOne', -// data: { -// customerPrimaryEmail: from, -// }, -// isRPC: true, -// }); - -// if (customer) { -// customerId = customer._id; -// } else { -// const apiCustomerResponse = await sendCoreMessage({ -// subdomain, -// action: 'customers.createCustomer', -// data: { -// integrationId: integration.inboxId, -// primaryEmail: from, -// }, -// isRPC: true, -// }); - -// customerId = apiCustomerResponse._id; -// } - -// await models.Customers.create({ -// inboxIntegrationId: integration.inboxId, -// contactsId: customerId, -// email: from, -// }); -// } else { -// customerId = prev.contactsId; -// } - -// let conversationId; - -// const $or: any[] = [ -// { references: { $in: [msg.messageId] } }, -// { messageId: { $in: msg.references || [] } }, -// ]; - -// if (msg.inReplyTo) { -// $or.push({ messageId: msg.inReplyTo }); -// $or.push({ references: { $in: [msg.inReplyTo] } }); -// } - -// const relatedMessage = await models.Messages.findOne({ -// $or, -// }); - -// if (relatedMessage) { -// conversationId = relatedMessage.inboxConversationId; -// } else { -// const { _id } = await sendInboxMessage({ -// subdomain, -// action: 'integrations.receive', -// data: { -// action: 'create-or-update-conversation', -// payload: JSON.stringify({ -// integrationId: integration.inboxId, -// customerId, -// createdAt: msg.date, -// content: msg.subject, -// }), -// }, -// isRPC: true, -// }); - -// conversationId = _id; -// } - -// const conversationMessage = await models.Messages.create({ -// inboxIntegrationId: integration.inboxId, -// inboxConversationId: conversationId, -// createdAt: msg.date, -// messageId: msg.messageId, -// inReplyTo: msg.inReplyTo, -// references: msg.references, -// subject: msg.subject, -// body: msg.html, -// to: msg.to && msg.to.value, -// cc: msg.cc && msg.cc.value, -// bcc: msg.bcc && msg.bcc.value, -// from: msg.from && msg.from.value, -// attachments: msg.attachments.map(({ filename, contentType, size }) => ({ -// filename, -// type: contentType, -// size, -// })), -// type: 'INBOX', -// }); - -// await sendInboxMessage({ -// subdomain, -// action: 'conversationClientMessageInserted', -// data: { -// _id: conversationMessage._id, -// content: msg.html, -// conversationId, -// }, -// }); -// } -// }; - -// export const listenIntegration = async ( -// subdomain: string, -// integration: IIntegrationImapDocument, -// models: IModels, -// ) => { -// interface ListenResult { -// reconnect: boolean; -// error?: Error; -// result?: any; -// } - -// const listen = async (): Promise => { -// return new Promise(async (resolve) => { -// let lock; -// let reconnect = true; -// let closing = false; -// let error: Error | undefined; -// let result: string | undefined; - -// try { -// lock = await redlock.lock( -// `${subdomain}:imap:integration:${integration._id}`, -// 60000, -// ); -// } catch (e) { -// // 1 other pod or container is already listening on it -// return resolve({ -// reconnect: false, -// result: `Integration ${integration._id} is already being listened to`, -// }); -// } - -// await lock.extend(60000); - -// const updatedIntegration = await models.Integrations.findById( -// integration._id, -// ); - -// if (!updatedIntegration) { -// return resolve({ -// reconnect: false, -// error: new Error(`Integration ${integration._id} not found`), -// }); -// } - -// let lastFetchDate = updatedIntegration.lastFetchDate -// ? new Date(updatedIntegration.lastFetchDate) -// : new Date(Date.now() - 3 * 24 * 60 * 60 * 1000); - -// const imap = createImap(updatedIntegration); - -// const syncEmail = async () => { -// if (closing) { -// return; -// } -// try { -// const criteria: any = [ -// 'UNSEEN', -// ['SINCE', lastFetchDate.toISOString()], -// ]; -// const nextLastFetchDate = new Date(); -// await saveMessages( -// subdomain, -// imap, -// updatedIntegration, -// criteria, -// models, -// ); -// lastFetchDate = nextLastFetchDate; - -// await models.Integrations.updateOne( -// { _id: updatedIntegration._id }, -// { $set: { lastFetchDate } }, -// ); -// } catch (e) { -// error = e; -// reconnect = false; -// await models.Logs.createLog({ -// type: 'error', -// message: 'syncEmail error:' + e.message, -// errorStack: e.stack, -// }); -// imap.end(); -// } -// }; - -// imap.once('ready', (_response) => { -// imap.openBox('INBOX', true, async (e, box) => { -// if (e) { -// // if we can't open the inbox, we can't sync emails -// error = e; -// reconnect = false; -// closing = true; -// await models.Logs.createLog({ -// type: 'error', -// message: 'openBox error:' + e.message, -// errorStack: e.stack, -// }); -// return imap.end(); -// } -// return syncEmail(); -// }); -// }); - -// imap.on('mail', throttle(syncEmail, 30000, { leading: true })); - -// imap.on('error', async (e) => { -// if (closing) { -// return; -// } -// error = e; -// closing = true; -// if (e.message.includes('Invalid credentials')) { -// // We shouldn't try to reconnect, since it's impossible to reconnect when the credentials are wrong. -// reconnect = false; -// await models.Integrations.updateOne( -// { _id: updatedIntegration._id }, -// { -// $set: { -// healthStatus: 'unHealthy', -// error: `${e.message}`, -// }, -// }, -// ); -// } -// await models.Logs.createLog({ -// type: 'error', -// message: 'error event: ' + e.message, -// errorStack: e.stack, -// }); -// imap.end(); -// }); - -// const closeEndHandler = async () => { -// closing = true; - -// try { -// await imap.end(); -// } catch (e) {} - -// try { -// imap.removeAllListeners(); -// } catch (e) {} - -// await cleanupLock(); - -// return resolve({ -// reconnect, -// error, -// result, -// }); -// }; - -// imap.once('close', closeEndHandler); -// imap.once('end', closeEndHandler); - -// imap.connect(); - -// let lockExtendInterval = setInterval(async () => { -// try { -// await lock.extend(60000); -// } catch (e) { -// reconnect = false; -// result = `Integration ${integration._id} is already being listened to`; -// await cleanupLock(); -// imap.end(); -// } -// }, 30_000); - -// const cleanupLock = async () => { -// try { -// clearInterval(lockExtendInterval); -// } catch (e) {} -// try { -// await lock.unlock(); -// } catch (e) {} -// }; -// }); -// }; - -// while (true) { -// try { -// const result = await listen(); -// result.error && console.error(result.error); - -// if (!result.reconnect) { -// break; -// } -// await new Promise((resolve) => setTimeout(resolve, 10_000)); -// } catch (e) { -// console.error(e); -// break; -// } -// } -// }; - -// const startDistributingJobs = async (subdomain: string) => { -// const models = await generateModels(subdomain); - -// const distributeJob = async () => { -// let lock; -// try { -// lock = await redlock.lock(`${subdomain}:imap:work_distributor`, 60000); -// } catch (e) { -// // 1 other pod or container is already working on job distribution -// return; -// } - -// try { -// await models.Logs.createLog({ -// type: 'info', -// message: `Distributing imap sync jobs`, -// }); - -// const integrations = await models.Integrations.find({ -// healthStatus: 'healthy', -// }); - -// for (const integration of integrations) { -// sendImapMessage({ -// subdomain, -// action: 'listen', -// data: { -// _id: integration._id, -// }, -// }); -// } -// } catch (e) { -// await lock.unlock(); -// } -// }; -// // wait for other containers to start up -// NODE_ENV === 'production' && -// (await new Promise((resolve) => setTimeout(resolve, 60000))); - -// while (true) { -// try { -// await distributeJob(); -// // try doing it every 10 minutes -// await new Promise((resolve) => setTimeout(resolve, 10 * 60 * 1000)); -// } catch (e) { -// console.log('distributeWork error', e); -// } -// } -// }; - -// export default startDistributingJobs; - -// export const routeErrorHandling = (fn, callback?: any) => { -// return async (req, res, next) => { -// try { -// await fn(req, res, next); -// } catch (e) { -// console.log(e.message); - -// if (callback) { -// return callback(res, e, next); -// } - -// return next(e); -// } -// }; -// }; +import * as dotenv from 'dotenv'; +dotenv.config(); + +import Imap from 'node-imap'; + +import { simpleParser } from 'mailparser'; +import { IModels, generateModels } from '~/connectionResolvers'; +import { IIntegrationImapDocument } from '~/modules/integrations/imap/models'; +import { throttle } from 'lodash'; +import { redlock } from '~/modules/integrations/imap/redlock'; +import { sendTRPCMessage } from 'erxes-api-shared/utils'; +const { NODE_ENV } = process.env; +import { receiveInboxMessage } from '@/inbox/receiveMessage'; +import { pConversationClientMessageInserted } from '@/inbox/graphql/resolvers/mutations/widget'; +import { sendImapMessage } from '~/modules/integrations/imap/messageBroker'; +export const toUpper = (thing) => { + return thing && thing.toUpperCase ? thing.toUpperCase() : thing; +}; + +export const findAttachmentParts = (struct, attachments?) => { + attachments = attachments || []; + + for (let i = 0, len = struct.length, _r: any; i < len; ++i) { + if (Array.isArray(struct[i])) { + findAttachmentParts(struct[i], attachments); + } else { + if ( + struct[i].disposition && + ['INLINE', 'ATTACHMENT'].indexOf(toUpper(struct[i].disposition.type)) > + -1 + ) { + attachments.push(struct[i]); + } + } + } + return attachments; +}; + +export const createImap = (integration: IIntegrationImapDocument): Imap => { + return new Imap({ + user: integration.mainUser || integration.user, + password: integration.password, + host: integration.host, + keepalive: { forceNoop: true }, + port: 993, + tls: true, + }); +}; + +const searchMessages = (imap: Imap, criteria) => { + return new Promise((resolve, reject) => { + const messages: string[] = []; + + imap.search(criteria, (err, results) => { + if (err) { + throw err; + } + + let f: Imap.ImapFetch; + + try { + f = imap.fetch(results, { bodies: '', struct: true }); + f.on('error', (error: any) => { + throw error; + }); + } catch (e) { + if (e.message?.includes('Nothing to fetch')) { + return resolve([]); + } + throw e; + } + + f.on('message', (msg) => { + msg.on('body', async (stream) => { + let buffers: Buffer[] = []; + + stream.on('data', (buffer) => { + buffers.push(buffer); + }); + + stream.once('end', async () => { + messages.push(Buffer.concat(buffers).toString('utf8')); + }); + }); + }); + + f.once('end', async () => { + const data: any = []; + + for (const message of messages) { + const parsed = await simpleParser(message); + data.push(parsed); + } + + resolve(data); + }); + }); + }); +}; + +const saveMessages = async ( + subdomain: string, + imap: Imap, + integration: IIntegrationImapDocument, + criteria, + models: IModels, +) => { + const msgs: any = await searchMessages(imap, criteria); + + console.log(`======== found ${msgs.length} messages`); + + for (const msg of msgs) { + if ( + msg.to && + msg.to.value && + msg.to.value[0] && + msg.to.value[0].address !== integration.user + ) { + continue; + } + + const message = await models.ImapMessages.findOne({ + messageId: msg.messageId, + }); + + if (message) { + continue; + } + + const from = msg.from.value[0].address; + const prev = await models.ImapCustomers.findOne({ email: from }); + + let customerId; + + if (!prev) { + const customer = await sendTRPCMessage({ + pluginName: 'core', + method: 'query', + module: 'customers', + action: 'findOne', + input: { + customerPrimaryEmail: from, + }, + }); + + if (customer) { + customerId = customer._id; + } else { + const apiCustomerResponse = await sendTRPCMessage({ + pluginName: 'core', + method: 'mutation', + module: 'customers', + action: 'createCustomer', + input: { + integrationId: integration.inboxId, + primaryEmail: from, + }, + }); + customerId = apiCustomerResponse._id; + } + + await models.ImapCustomers.create({ + inboxIntegrationId: integration.inboxId, + contactsId: customerId, + email: from, + }); + } else { + customerId = prev.contactsId; + } + + let conversationId; + + const $or: any[] = [ + { references: { $in: [msg.messageId] } }, + { messageId: { $in: msg.references || [] } }, + ]; + + if (msg.inReplyTo) { + $or.push({ messageId: msg.inReplyTo }); + $or.push({ references: { $in: [msg.inReplyTo] } }); + } + + const relatedMessage = await models.ImapMessages.findOne({ + $or, + }); + + if (relatedMessage) { + conversationId = relatedMessage.inboxConversationId; + } else { + const data = { + action: 'create-or-update-conversation', + payload: JSON.stringify({ + integrationId: integration.inboxId, + customerId, + createdAt: msg.date, + content: msg.subject, + }), + }; + + const apiConversationResponse = await receiveInboxMessage( + subdomain, + data, + ); + + if (apiConversationResponse.status === 'success') { + conversationId = apiConversationResponse.data._id; + } else { + throw new Error( + `Conversation creation failed: ${JSON.stringify( + apiConversationResponse, + )}`, + ); + } + } + + const conversationMessage = await models.ImapMessages.create({ + inboxIntegrationId: integration.inboxId, + inboxConversationId: conversationId, + createdAt: msg.date, + messageId: msg.messageId, + inReplyTo: msg.inReplyTo, + references: msg.references, + subject: msg.subject, + body: msg.html, + to: msg.to && msg.to.value, + cc: msg.cc && msg.cc.value, + bcc: msg.bcc && msg.bcc.value, + from: msg.from && msg.from.value, + attachments: msg.attachments.map(({ filename, contentType, size }) => ({ + filename, + type: contentType, + size, + })), + type: 'INBOX', + }); + + await pConversationClientMessageInserted(subdomain, { + _id: conversationMessage._id as string, + content: msg.html, + conversationId, + }); + } +}; + +export const listenIntegration = async ( + subdomain: string, + integration: IIntegrationImapDocument, + models: IModels, +) => { + const listen = async () => { + let lock; + let reconnect = true; + let closing = false; + let error: Error | undefined; + + try { + lock = await redlock.lock( + `${subdomain}:imap:integration:${integration._id}`, + 60000, + ); + } catch (e) { + return { reconnect: false, result: 'Already locked' }; + } + + await lock.extend(60000); + + const updatedIntegration = await models.ImapIntegrations.findById( + integration._id, + ); + if (!updatedIntegration) { + console.error('Integration not found:', integration._id); + return { reconnect: false, error: new Error('Integration not found') }; + } + + let lastFetchDate = updatedIntegration.lastFetchDate + ? new Date(updatedIntegration.lastFetchDate) + : new Date(Date.now() - 3 * 24 * 60 * 60 * 1000); + + const imap = createImap(updatedIntegration); + + const syncEmail = async () => { + if (closing) return; + try { + const criteria: any = [ + 'UNSEEN', + ['SINCE', lastFetchDate.toISOString()], + ]; + const nextLastFetchDate = new Date(); + await saveMessages( + subdomain, + imap, + updatedIntegration, + criteria, + models, + ); + lastFetchDate = nextLastFetchDate; + + await models.ImapIntegrations.updateOne( + { _id: updatedIntegration._id }, + { $set: { lastFetchDate } }, + ); + + console.log(`Integration ${integration._id} synced emails`); + } catch (e: any) { + error = e; + reconnect = false; + await models.ImapLogs.createLog({ + type: 'error', + message: 'syncEmail error:' + e.message, + errorStack: e.stack, + }); + imap.end(); + } + }; + + // IMAP events + imap.once('ready', () => { + console.log('IMAP ready for integration:', integration._id); + imap.openBox('INBOX', true, async (e) => { + if (e) { + error = e; + reconnect = false; + closing = true; + await models.ImapLogs.createLog({ + type: 'error', + message: 'openBox error:' + e.message, + errorStack: e.stack, + }); + return imap.end(); + } + await syncEmail(); + }); + }); + + imap.on('mail', throttle(syncEmail, 30000, { leading: true })); + imap.on('error', async (e: any) => { + console.error('IMAP error:', e.message); + if (closing) return; + error = e; + closing = true; + + if (e.message.includes('Invalid credentials')) { + reconnect = false; + await models.ImapIntegrations.updateOne( + { _id: updatedIntegration._id }, + { $set: { healthStatus: 'unHealthy', error: e.message } }, + ); + } + + await models.ImapLogs.createLog({ + type: 'error', + message: 'error event: ' + e.message, + errorStack: e.stack, + }); + imap.end(); + }); + + const cleanupLock = async () => { + try { + clearInterval(lockExtendInterval); + } catch {} + try { + await lock.unlock(); + } catch {} + }; + + const closeEndHandler = async () => { + closing = true; + try { + imap.end(); + } catch {} + try { + imap.removeAllListeners(); + } catch {} + await cleanupLock(); + return { reconnect, error }; + }; + + imap.once('close', closeEndHandler); + imap.once('end', closeEndHandler); + + imap.connect(); + + const lockExtendInterval = setInterval(async () => { + try { + await lock.extend(60000); + } catch (e) { + reconnect = false; + await cleanupLock(); + imap.end(); + } + }, 30_000); + + return new Promise<{ reconnect: boolean; error?: Error }>((resolve) => { + imap.once('end', () => resolve({ reconnect, error })); + }); + }; + + // Main loop + while (true) { + try { + const result = await listen(); + if ('error' in result && result.error) { + console.error(result.error); + } + + if (!result.reconnect) break; + await new Promise((r) => setTimeout(r, 10_000)); + } catch (e) { + console.error(e); + break; + } + } +}; + +const startDistributingJobs = async (subdomain: string) => { + const models = await generateModels(subdomain); + const distributeJob = async () => { + let lock; + try { + lock = await redlock.lock(`${subdomain}:imap:work_distributor`, 60000); + } catch (e) { + // 1 other pod or container is already working on job distribution + return; + } + try { + await models.ImapLogs.createLog({ + type: 'info', + message: `Distributing imap sync jobs`, + }); + + const integrations = await models.ImapIntegrations.find({ + healthStatus: 'healthy', + }); + for (const integration of integrations) { + sendImapMessage({ + subdomain, + action: 'listen', + data: { + _id: integration._id as string, + }, + }); + } + } catch (e) { + await lock.unlock(); + } + }; + // wait for other containers to start up + NODE_ENV === 'production' && + (await new Promise((resolve) => setTimeout(resolve, 60000))); + + while (true) { + try { + await distributeJob(); + // try doing it every 10 minutes + await new Promise((resolve) => setTimeout(resolve, 10 * 60 * 1000)); + } catch (e) { + console.log('distributeWork error', e); + } + } +}; + +export default startDistributingJobs; + +export const routeErrorHandling = (fn, callback?: any) => { + return async (req, res, next) => { + try { + await fn(req, res, next); + } catch (e) { + console.log(e.message); + + if (callback) { + return callback(res, e, next); + } + + return next(e); + } + }; +};