diff --git a/.gitignore b/.gitignore index 14d720f8..18812609 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ test/ # Build output out release +native/**/target/ # Database *.db @@ -77,4 +78,4 @@ wechat-research-site weflow-web-offical /Wedecrypt /scripts/syncwcdb.py -/scripts/syncWedecrypt.py \ No newline at end of file +/scripts/syncWedecrypt.py diff --git a/electron/exportWorker.ts b/electron/exportWorker.ts index 60f896e6..ccd54d7c 100644 --- a/electron/exportWorker.ts +++ b/electron/exportWorker.ts @@ -9,6 +9,7 @@ interface ExportWorkerConfig { options?: any taskId?: string dbPath?: string + accountDir?: string decryptKey?: string myWxid?: string imageXorKey?: unknown @@ -137,10 +138,31 @@ if (config.userDataPath) { } process.env.WEFLOW_PROJECT_NAME = process.env.WEFLOW_PROJECT_NAME || 'WeFlow' +function isExportControlInterruption(error: unknown): boolean { + const text = error instanceof Error + ? `${(error as Error & { code?: string }).code || ''} ${error.message}` + : String(error || '') + return ( + text.includes('WEFLOW_EXPORT_STOP_REQUESTED') || + text.includes('WEFLOW_EXPORT_PAUSE_REQUESTED') || + text.includes('导出任务已停止') || + text.includes('导出任务已暂停') + ) +} + async function run() { - const [{ wcdbService }, { exportService }] = await Promise.all([ + const [ + { wcdbService }, + { exportService }, + { chooseExportEngine, getRustExportDisabledReason }, + { exportSessionsWithRustStreaming }, + { canUseTypeScriptStreamingExport, exportSessionsWithTypeScriptStreaming } + ] = await Promise.all([ import('./services/wcdbService'), - import('./services/exportService') + import('./services/exportService'), + import('./services/export/exportEngineRouter'), + import('./services/export/rustStreamingExporter'), + import('./services/export/typescriptStreamingExporter') ]) wcdbService.setPaths(config.resourcesPath || '', config.userDataPath || '') @@ -188,13 +210,115 @@ async function run() { taskControl ) } else { - result = await exportService.exportSessions( + const options = config.options || { format: 'json' } + const requestedEngine = String(options.engine || 'auto') + const resolvedEngine = chooseExportEngine(options) + const rustDisabledReason = getRustExportDisabledReason(options) + const rustProgress = (progress: any) => onProgress({ + ...progress, + exportEngine: 'rust', + exportEngineLabel: 'Rust' + }) + let typeScriptEngineLabel = requestedEngine === 'typescript' + ? 'TypeScript · 手动指定' + : rustDisabledReason + ? `TypeScript · Rust未启用:${rustDisabledReason}` + : 'TypeScript' + const typeScriptProgress = (progress: any) => onProgress({ + ...progress, + exportEngine: 'typescript', + exportEngineLabel: typeScriptEngineLabel + }) + const runTypeScriptExport = async () => exportService.exportSessions( Array.isArray(config.sessionIds) ? config.sessionIds : [], String(config.outputDir || ''), - config.options || { format: 'json' }, - onProgress, + options, + typeScriptProgress, taskControl ) + const runTypeScriptStreamingExport = async () => exportSessionsWithTypeScriptStreaming({ + source: wcdbService, + sessionIds: Array.isArray(config.sessionIds) ? config.sessionIds : [], + outputDir: String(config.outputDir || ''), + options, + accountDir: String(config.accountDir || config.dbPath || ''), + decryptKey: String(config.decryptKey || ''), + cleanedMyWxid: String(config.myWxid || ''), + onProgress: typeScriptProgress, + control: taskControl + }) + const runRustStreamingExport = async () => exportSessionsWithRustStreaming({ + source: wcdbService, + sessionIds: Array.isArray(config.sessionIds) ? config.sessionIds : [], + outputDir: String(config.outputDir || ''), + options, + accountDir: String(config.accountDir || config.dbPath || ''), + decryptKey: String(config.decryptKey || ''), + cleanedMyWxid: String(config.myWxid || ''), + resourcesPath: String(config.resourcesPath || ''), + onProgress: rustProgress, + control: taskControl + }) + + if (resolvedEngine === 'rust') { + try { + onProgress({ + current: 0, + total: 100, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseLabel: 'Rust 引擎准备导出', + exportEngine: 'rust', + exportEngineLabel: 'Rust' + }) + result = await runRustStreamingExport() + } catch (error) { + if (requestedEngine === 'rust' || isExportControlInterruption(error)) { + throw error + } + const fallbackReason = error instanceof Error ? error.message : String(error) + typeScriptEngineLabel = `TypeScript · Rust回退:${fallbackReason.slice(0, 160)}` + console.warn(`[exportWorker] Rust exporter unavailable, falling back to TypeScript: ${fallbackReason}`) + onProgress({ + current: 0, + total: 100, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseLabel: `Rust 引擎不可用,已回退 TypeScript:${fallbackReason.slice(0, 160)}`, + exportEngine: 'typescript', + exportEngineLabel: typeScriptEngineLabel + }) + result = await runTypeScriptExport() + } + } else { + if (requestedEngine === 'auto' && rustDisabledReason) { + onProgress({ + current: 0, + total: 100, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseLabel: `TypeScript 引擎导出(Rust 未启用:${rustDisabledReason})` + }) + } + if (requestedEngine === 'typescript') { + onProgress({ + current: 0, + total: 100, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseLabel: canUseTypeScriptStreamingExport(options) + ? 'TypeScript 流式引擎准备导出' + : 'TypeScript 引擎准备导出' + }) + } + result = requestedEngine === 'typescript' && canUseTypeScriptStreamingExport(options) + ? await runTypeScriptStreamingExport() + : await runTypeScriptExport() + } } flushProgress() diff --git a/electron/main.ts b/electron/main.ts index cf80daf9..4c51f002 100644 --- a/electron/main.ts +++ b/electron/main.ts @@ -3191,6 +3191,7 @@ function registerIpcHandlers() { const dbPath = String(cfg.get('dbPath') || '').trim() const decryptKey = String(cfg.get('decryptKey') || '').trim() const myWxid = String(cfg.getMyWxidCleaned() || '').trim() + const accountDir = cfg.getAccountDir(dbPath, myWxid) || '' const imageKeys = cfg.getImageKeysForCurrentWxid() const resourcesPath = app.isPackaged ? join(process.resourcesPath, 'resources') @@ -3207,6 +3208,7 @@ function registerIpcHandlers() { options, taskId, dbPath, + accountDir, decryptKey, myWxid, imageXorKey: imageKeys.xorKey, diff --git a/electron/preload.ts b/electron/preload.ts index bb175c04..423332ef 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -494,6 +494,8 @@ contextBridge.exposeInMainWorld('electronAPI', { total: number currentSession: string currentSessionId?: string + exportEngine?: 'rust' | 'typescript' + exportEngineLabel?: string phase: string phaseProgress?: number phaseTotal?: number diff --git a/electron/services/chatService.ts b/electron/services/chatService.ts index b827d416..cccb9ff4 100644 --- a/electron/services/chatService.ts +++ b/electron/services/chatService.ts @@ -18,6 +18,10 @@ import { voiceTranscribeService } from './voiceTranscribeService' import { ImageDecryptService } from './imageDecryptService' import { CONTACT_REGION_LOOKUP_DATA } from './contactRegionLookupData' import { LRUCache } from '../utils/LRUCache.js' +import { + cleanSystemMessageContent, + extractReadableSystemMessageText as extractReadableSystemMessageTextValue +} from './systemMessageFormatter' export interface ChatSession { username: string @@ -6812,33 +6816,11 @@ class ChatService { } private cleanSystemMessage(content: string): string { - if (!content) return '[系统消息]' - - const normalized = this.cleanUtf16(this.decodeHtmlEntities(String(content))) - const readableSysmsg = this.extractReadableSystemMessageText(normalized) - if (readableSysmsg) { - return readableSysmsg - } - - // 移除 XML 声明 - let cleaned = normalized.replace(/<\?xml[^?]*\?>/gi, '') - // 移除所有 XML/HTML 标签 - cleaned = cleaned.replace(/<[^>]+>/g, '') - // 移除尾部的数字(如撤回消息后的时间戳) - cleaned = cleaned.replace(/\d+\s*$/, '') - // 清理多余空白 - cleaned = this.stripSenderPrefix(cleaned).replace(/\s+/g, ' ').trim() - return cleaned || '[系统消息]' + return cleanSystemMessageContent(content) } private extractReadableSystemMessageText(content: string): string { - const sysmsgMatch = /]*>([\s\S]*?)<\/sysmsg>/i.exec(content) - const source = sysmsgMatch?.[1] || content - const text = - this.extractXmlValue(source, 'plain') || - this.extractXmlValue(source, 'text') || - '' - return this.stripSenderPrefix(text).replace(/\s+/g, ' ').trim() + return extractReadableSystemMessageTextValue(content) } private stripSenderPrefix(content: string): string { diff --git a/electron/services/config.ts b/electron/services/config.ts index 618d9085..3c0bbae7 100644 --- a/electron/services/config.ts +++ b/electron/services/config.ts @@ -58,6 +58,7 @@ interface ConfigSchema { autoTranscribeVoice: boolean transcribeLanguages: string[] exportDefaultConcurrency: number + exportEngine: 'auto' | 'rust' | 'typescript' analyticsExcludedUsernames: string[] // 安全相关 @@ -198,6 +199,7 @@ export class ConfigService { autoTranscribeVoice: false, transcribeLanguages: ['zh'], exportDefaultConcurrency: 4, + exportEngine: 'auto', analyticsExcludedUsernames: [], authEnabled: false, authPassword: '', diff --git a/electron/services/export/exportEngineRouter.test.ts b/electron/services/export/exportEngineRouter.test.ts new file mode 100644 index 00000000..3e0f8957 --- /dev/null +++ b/electron/services/export/exportEngineRouter.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from 'vitest' +import { chooseExportEngine, getRustExportDisabledReason, isRustSupportedFormat, isTextOnlyExport } from './exportEngineRouter' + +describe('export engine routing', () => { + it('routes auto text-only supported formats to rust', () => { + expect(chooseExportEngine({ format: 'txt' })).toBe('rust') + expect(chooseExportEngine({ format: 'html', contentType: 'text' })).toBe('rust') + expect(chooseExportEngine({ format: 'json' })).toBe('rust') + expect(chooseExportEngine({ format: 'weclone' })).toBe('rust') + expect(chooseExportEngine({ format: 'chatlab-jsonl' })).toBe('rust') + }) + + it('routes unsupported formats and media-heavy options to typescript in auto mode', () => { + expect(chooseExportEngine({ format: 'excel' })).toBe('typescript') + expect(chooseExportEngine({ format: 'txt', exportMedia: true, exportImages: true })).toBe('typescript') + expect(chooseExportEngine({ format: 'json', exportMedia: true, exportImages: true })).toBe('typescript') + expect(chooseExportEngine({ format: 'html', exportAvatars: true })).toBe('typescript') + expect(chooseExportEngine({ format: 'txt', exportVoiceAsText: true })).toBe('typescript') + expect(chooseExportEngine({ format: 'txt', contentType: 'image' })).toBe('typescript') + }) + + it('honors explicit engine requests', () => { + expect(chooseExportEngine({ format: 'excel', engine: 'rust' })).toBe('rust') + expect(chooseExportEngine({ format: 'txt', engine: 'typescript' })).toBe('typescript') + expect(chooseExportEngine({ format: 'txt', engine: 'auto' })).toBe('rust') + }) + + it('exposes narrow predicates for bridge fallback decisions', () => { + expect(isRustSupportedFormat('txt')).toBe(true) + expect(isRustSupportedFormat('json')).toBe(true) + expect(isRustSupportedFormat('chatlab')).toBe(false) + expect(isTextOnlyExport({ format: 'txt' })).toBe(true) + expect(isTextOnlyExport({ format: 'txt', exportFiles: true })).toBe(false) + }) + + it('explains why rust is disabled', () => { + expect(getRustExportDisabledReason({ format: 'chatlab' })).toContain('暂不支持 Rust') + expect(getRustExportDisabledReason({ format: 'json', exportMedia: true })).toBe('媒体导出已开启') + expect(getRustExportDisabledReason({ format: 'txt', exportAvatars: true })).toBe('头像导出已开启') + expect(getRustExportDisabledReason({ format: 'txt' })).toBeNull() + }) +}) diff --git a/electron/services/export/exportEngineRouter.ts b/electron/services/export/exportEngineRouter.ts new file mode 100644 index 00000000..226fb67c --- /dev/null +++ b/electron/services/export/exportEngineRouter.ts @@ -0,0 +1,91 @@ +export type ExportEngine = 'auto' | 'typescript' | 'rust' + +export type ExportFormat = + | 'chatlab' + | 'chatlab-jsonl' + | 'json' + | 'arkme-json' + | 'html' + | 'txt' + | 'excel' + | 'weclone' + | 'sql' + +export interface ExportEngineOptions { + format: ExportFormat | string + engine?: ExportEngine + contentType?: 'text' | 'voice' | 'image' | 'video' | 'emoji' | 'file' | string + exportMedia?: boolean + exportAvatars?: boolean + exportImages?: boolean + exportVoices?: boolean + exportVideos?: boolean + exportEmojis?: boolean + exportFiles?: boolean + exportVoiceAsText?: boolean +} + +const RUST_SUPPORTED_FORMATS = new Set([ + 'json', + 'txt', + 'html', + 'weclone', + 'chatlab-jsonl' +]) + +const TYPESCRIPT_STREAMING_SUPPORTED_FORMATS = new Set([ + 'txt', + 'html', + 'chatlab-jsonl' +]) + +export type ResolvedExportEngine = Exclude + +export function isRustSupportedFormat(format: ExportFormat | string): boolean { + return RUST_SUPPORTED_FORMATS.has(format) +} + +export function isTypeScriptStreamingSupportedFormat(format: ExportFormat | string): boolean { + return TYPESCRIPT_STREAMING_SUPPORTED_FORMATS.has(format) +} + +export function isTextOnlyExport(options: ExportEngineOptions): boolean { + if (options.contentType && options.contentType !== 'text') return false + if (options.exportMedia === true) return false + if (options.exportAvatars === true) return false + if (options.exportImages === true) return false + if (options.exportVoices === true) return false + if (options.exportVideos === true) return false + if (options.exportEmojis === true) return false + if (options.exportFiles === true) return false + if (options.exportVoiceAsText === true) return false + return true +} + +export function canUseRustExportEngine(options: ExportEngineOptions): boolean { + return isRustSupportedFormat(options.format) && isTextOnlyExport(options) +} + +export function canUseTypeScriptStreamingEngine(options: ExportEngineOptions): boolean { + return isTypeScriptStreamingSupportedFormat(options.format) && isTextOnlyExport(options) +} + +export function getRustExportDisabledReason(options: ExportEngineOptions): string | null { + if (!isRustSupportedFormat(options.format)) return `格式 ${options.format} 暂不支持 Rust` + if (options.contentType && options.contentType !== 'text') return `内容类型 ${options.contentType} 不是纯文本` + if (options.exportMedia === true) return '媒体导出已开启' + if (options.exportAvatars === true) return '头像导出已开启' + if (options.exportImages === true) return '图片导出已开启' + if (options.exportVoices === true) return '语音导出已开启' + if (options.exportVideos === true) return '视频导出已开启' + if (options.exportEmojis === true) return '表情导出已开启' + if (options.exportFiles === true) return '文件导出已开启' + if (options.exportVoiceAsText === true) return '语音转文字已开启' + return null +} + +export function chooseExportEngine(options: ExportEngineOptions): ResolvedExportEngine { + if (options.engine === 'rust') return 'rust' + if (options.engine === 'typescript') return 'typescript' + return canUseRustExportEngine(options) ? 'rust' : 'typescript' +} diff --git a/electron/services/export/messageStream.test.ts b/electron/services/export/messageStream.test.ts new file mode 100644 index 00000000..222c675b --- /dev/null +++ b/electron/services/export/messageStream.test.ts @@ -0,0 +1,98 @@ +import { describe, expect, it } from 'vitest' +import { createMessageStream } from './messageStream' + +describe('message stream', () => { + it('streams normalized rows across batches and closes the cursor', async () => { + const closed: number[] = [] + const source = { + openMessageCursorLite: async () => ({ success: true, cursor: 42 }), + openMessageCursor: async () => ({ success: true, cursor: 99 }), + fetchMessageBatch: async (cursor: number) => { + expect(cursor).toBe(42) + const calls = closed.length + if (calls === 0) { + closed.push(-1) + return { + success: true, + rows: [ + { local_id: 1, create_time: 10, local_type: 1, message_content: 'old', is_send: 0, sender_username: 'a' }, + { local_id: 2, create_time: 20, local_type: 1, message_content: 'hello', is_send: 1, sender_username: 'ignored' } + ], + hasMore: true + } + } + return { + success: true, + rows: [ + { local_id: 3, create_time: 30, local_type: 1, message_content: 'world', is_send: 0, sender_username: 'b' } + ], + hasMore: false + } + }, + closeMessageCursor: async (cursor: number) => { + closed.push(cursor) + } + } + + const rows = [] + for await (const row of createMessageStream({ + source, + sessionId: 'room', + cleanedMyWxid: 'me', + dateRange: { start: 15, end: 35 }, + batchSize: 2, + useLiteCursor: true, + decodeContent: (row) => String(row.message_content || '') + })) { + rows.push(row) + } + + expect(rows).toEqual([ + { localId: 2, serverId: 0, createTime: 20, localType: 1, content: 'hello', senderUsername: 'me', isSend: true }, + { localId: 3, serverId: 0, createTime: 30, localType: 1, content: 'world', senderUsername: 'b', isSend: false } + ]) + expect(closed).toContain(42) + }) + + it('throws when cancellation is requested during streaming', async () => { + const source = { + openMessageCursorLite: async () => ({ success: true, cursor: 7 }), + openMessageCursor: async () => ({ success: true, cursor: 7 }), + fetchMessageBatch: async () => ({ success: true, rows: [{ create_time: 1, message_content: 'x' }], hasMore: true }), + closeMessageCursor: async () => {} + } + let shouldStop = false + const iterator = createMessageStream({ + source, + sessionId: 's', + cleanedMyWxid: 'me', + decodeContent: () => 'x', + control: { shouldStop: () => shouldStop } + })[Symbol.asyncIterator]() + + await iterator.next() + shouldStop = true + await expect(iterator.next()).rejects.toThrow(/导出任务已停止/) + }) + + it('marks pause requests with the shared pause code', async () => { + const source = { + openMessageCursorLite: async () => ({ success: true, cursor: 7 }), + openMessageCursor: async () => ({ success: true, cursor: 7 }), + fetchMessageBatch: async () => ({ success: true, rows: [{ create_time: 1, message_content: 'x' }], hasMore: false }), + closeMessageCursor: async () => {} + } + + const iterator = createMessageStream({ + source, + sessionId: 's', + cleanedMyWxid: 'me', + decodeContent: () => 'x', + control: { shouldPause: () => true } + })[Symbol.asyncIterator]() + + await expect(iterator.next()).rejects.toMatchObject({ + code: 'WEFLOW_EXPORT_PAUSE_REQUESTED' + }) + }) +}) diff --git a/electron/services/export/messageStream.ts b/electron/services/export/messageStream.ts new file mode 100644 index 00000000..b1888930 --- /dev/null +++ b/electron/services/export/messageStream.ts @@ -0,0 +1,248 @@ +export interface MessageStreamRow { + localId: number + serverId: number + serverIdRaw?: string + createTime: number + localType: number + content: string + senderUsername: string + isSend: boolean + emojiMd5?: string + emojiCdnUrl?: string + emojiCaption?: string + locationLat?: number + locationLng?: number + locationPoiname?: string + locationLabel?: string +} + +export interface MessageCursorSource { + openMessageCursor: ( + sessionId: string, + batchSize: number, + ascending: boolean, + beginTimestamp: number, + endTimestamp: number + ) => Promise<{ success: boolean; cursor?: number; error?: string }> + openMessageCursorLite?: ( + sessionId: string, + batchSize: number, + ascending: boolean, + beginTimestamp: number, + endTimestamp: number + ) => Promise<{ success: boolean; cursor?: number; error?: string }> + fetchMessageBatch: (cursor: number) => Promise<{ success: boolean; rows?: any[]; hasMore?: boolean; error?: string }> + closeMessageCursor: (cursor: number) => Promise +} + +export interface MessageStreamControl { + shouldStop?: () => boolean + shouldPause?: () => boolean +} + +export const MESSAGE_STREAM_STOP_CODE = 'WEFLOW_EXPORT_STOP_REQUESTED' +export const MESSAGE_STREAM_PAUSE_CODE = 'WEFLOW_EXPORT_PAUSE_REQUESTED' + +export interface MessageStreamOptions { + source: MessageCursorSource + sessionId: string + cleanedMyWxid: string + dateRange?: { start: number; end: number } | null + senderUsername?: string + batchSize?: number + ascending?: boolean + useLiteCursor?: boolean + control?: MessageStreamControl + decodeContent?: (row: any, localType: number) => string +} + +export function normalizeTimestampSeconds(value: unknown): number { + const raw = Number(value) + if (!Number.isFinite(raw) || raw <= 0) return 0 + let normalized = Math.floor(raw) + while (normalized > 10000000000) { + normalized = Math.floor(normalized / 1000) + } + return normalized +} + +export function normalizeMessageStreamRow( + row: any, + options: Pick +): MessageStreamRow { + const localType = getIntFromRow(row, ['local_type', 'localType', 'type', 'msg_type', 'msgType', 'WCDB_CT_local_type'], 1) + const createTime = normalizeTimestampSeconds(getRowField(row, ['create_time', 'createTime', 'timestamp', 'msgCreateTime', 'WCDB_CT_create_time'])) + const localId = getIntFromRow(row, [ + 'local_id', 'localId', 'LocalId', + 'msg_local_id', 'msgLocalId', 'MsgLocalId', + 'msg_id', 'msgId', 'MsgId', 'id', + 'WCDB_CT_local_id' + ], 0) + const rawServerIdValue = getRowField(row, [ + 'server_id', 'serverId', 'ServerId', + 'msg_server_id', 'msgServerId', 'MsgServerId', + 'svr_id', 'svrId', 'msg_svr_id', 'msgSvrId', 'MsgSvrId', + 'WCDB_CT_server_id' + ]) + const serverIdRaw = normalizeUnsignedIntToken(rawServerIdValue) + const serverId = getIntFromRow(row, [ + 'server_id', 'serverId', 'ServerId', + 'msg_server_id', 'msgServerId', 'MsgServerId', + 'svr_id', 'svrId', 'msg_svr_id', 'msgSvrId', 'MsgSvrId', + 'WCDB_CT_server_id' + ], 0) + const isSend = Number.parseInt(String(row?.computed_is_send ?? row?.is_send ?? row?.isSend ?? '0'), 10) === 1 + const senderUsername = isSend + ? options.cleanedMyWxid + : String(row?.sender_username || row?.senderUsername || options.sessionId || '').trim() + const content = options.decodeContent + ? options.decodeContent(row, localType) + : String(row?.content || row?.message_content || row?.messageContent || '') + const emojiMd5 = getStringFromRow(row, ['emoji_md5', 'emojiMd5']) + const emojiCdnUrl = getStringFromRow(row, ['emoji_cdn_url', 'emojiCdnUrl']) + const emojiCaption = getStringFromRow(row, ['emoji_caption', 'emojiCaption']) + const locationLat = getNumberFromRow(row, ['location_lat', 'locationLat']) + const locationLng = getNumberFromRow(row, ['location_lng', 'locationLng']) + const locationPoiname = getStringFromRow(row, ['location_poiname', 'locationPoiname']) + const locationLabel = getStringFromRow(row, ['location_label', 'locationLabel']) + + const normalized: MessageStreamRow = { + localId, + serverId, + serverIdRaw: serverIdRaw !== '0' ? serverIdRaw : undefined, + createTime, + localType, + content, + senderUsername, + isSend + } + if (emojiMd5) normalized.emojiMd5 = emojiMd5 + if (emojiCdnUrl) normalized.emojiCdnUrl = emojiCdnUrl + if (emojiCaption) normalized.emojiCaption = emojiCaption + if (typeof locationLat === 'number') normalized.locationLat = locationLat + if (typeof locationLng === 'number') normalized.locationLng = locationLng + if (locationPoiname) normalized.locationPoiname = locationPoiname + if (locationLabel) normalized.locationLabel = locationLabel + return normalized +} + +export async function* createMessageStream(options: MessageStreamOptions): AsyncGenerator { + const batchSize = Math.max(1, Math.floor(options.batchSize || 2000)) + const ascending = options.ascending !== false + const range = normalizeDateRange(options.dateRange) + const begin = range?.start || 0 + const end = range?.end || 0 + const useLite = options.useLiteCursor === true && Boolean(options.source.openMessageCursorLite) + throwIfMessageStreamControlRequested(options.control) + const opened = useLite + ? await options.source.openMessageCursorLite!(options.sessionId, batchSize, ascending, begin, end) + : await options.source.openMessageCursor(options.sessionId, batchSize, ascending, begin, end) + if (!opened.success || !opened.cursor) { + throw new Error(opened.error || 'open message cursor failed') + } + + try { + let hasMore = true + while (hasMore) { + throwIfMessageStreamControlRequested(options.control) + const batch = await options.source.fetchMessageBatch(opened.cursor) + if (!batch.success) { + throw new Error(batch.error || 'fetch message batch failed') + } + + for (const rawRow of batch.rows || []) { + throwIfMessageStreamControlRequested(options.control) + const row = normalizeMessageStreamRow(rawRow, options) + if (range) { + if (row.createTime > 0 && range.start > 0 && row.createTime < range.start) continue + if (row.createTime > 0 && range.end > 0 && row.createTime > range.end) continue + } + if (options.senderUsername && row.senderUsername !== options.senderUsername) continue + yield row + } + + hasMore = batch.hasMore === true + } + } finally { + await options.source.closeMessageCursor(opened.cursor) + } +} + +function normalizeDateRange(dateRange?: { start: number; end: number } | null): { start: number; end: number } | null { + if (!dateRange) return null + let start = normalizeTimestampSeconds(dateRange.start) + let end = normalizeTimestampSeconds(dateRange.end) + if (start > 0 && end > 0 && start > end) { + const tmp = start + start = end + end = tmp + } + if (start <= 0 && end <= 0) return null + return { start, end } +} + +export function throwIfMessageStreamControlRequested(control?: MessageStreamControl): void { + if (control?.shouldStop?.()) throw createMessageStreamControlError('stop') + if (control?.shouldPause?.()) throw createMessageStreamControlError('pause') +} + +export function isMessageStreamStopError(error: unknown): boolean { + return hasMessageStreamControlError(error, MESSAGE_STREAM_STOP_CODE, '导出任务已停止') +} + +export function isMessageStreamPauseError(error: unknown): boolean { + return hasMessageStreamControlError(error, MESSAGE_STREAM_PAUSE_CODE, '导出任务已暂停') +} + +function createMessageStreamControlError(type: 'stop' | 'pause'): Error { + const error = new Error(type === 'stop' ? '导出任务已停止' : '导出任务已暂停') + ;(error as Error & { code?: string }).code = type === 'stop' + ? MESSAGE_STREAM_STOP_CODE + : MESSAGE_STREAM_PAUSE_CODE + return error +} + +function hasMessageStreamControlError(error: unknown, code: string, message: string): boolean { + if (!error) return false + if (typeof error === 'string') return error.includes(code) || error.includes(message) + if (error instanceof Error) { + const errorCode = (error as Error & { code?: string }).code + return errorCode === code || error.message.includes(code) || error.message.includes(message) + } + return false +} + +function getRowField(row: any, names: string[]): unknown { + for (const name of names) { + if (row && row[name] !== undefined && row[name] !== null) return row[name] + } + return undefined +} + +function getIntFromRow(row: any, names: string[], fallback: number): number { + const value = getRowField(row, names) + const parsed = Number(value) + if (!Number.isFinite(parsed)) return fallback + return Math.floor(parsed) +} + +function getNumberFromRow(row: any, names: string[]): number | undefined { + const value = getRowField(row, names) + const parsed = Number(value) + return Number.isFinite(parsed) ? parsed : undefined +} + +function getStringFromRow(row: any, names: string[]): string | undefined { + const value = getRowField(row, names) + const text = String(value ?? '').trim() + return text || undefined +} + +function normalizeUnsignedIntToken(value: unknown): string { + const text = String(value ?? '').trim() + if (!text) return '0' + if (/^\d+$/.test(text)) return text.replace(/^0+(?=\d)/, '') || '0' + const numberValue = Number(text) + if (!Number.isFinite(numberValue) || numberValue < 0) return '0' + return String(Math.floor(numberValue)) +} diff --git a/electron/services/export/rustExportBridge.test.ts b/electron/services/export/rustExportBridge.test.ts new file mode 100644 index 00000000..b98aef4e --- /dev/null +++ b/electron/services/export/rustExportBridge.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it } from 'vitest' +import { + parseRustExportEventLine, + resolveRustExporterExecutableName +} from './rustExportBridge' + +describe('rust export bridge protocol helpers', () => { + it('parses known NDJSON events', () => { + expect(parseRustExportEventLine('{"type":"createdFile","path":"C:/tmp/a.txt"}')).toEqual({ + type: 'createdFile', + path: 'C:/tmp/a.txt' + }) + expect(parseRustExportEventLine('{"type":"result","success":true,"successCount":1,"failCount":0}')).toEqual({ + type: 'result', + success: true, + successCount: 1, + failCount: 0 + }) + }) + + it('rejects invalid and unknown event lines with a readable error', () => { + expect(() => parseRustExportEventLine('not-json')).toThrow(/Invalid Rust exporter event/) + expect(() => parseRustExportEventLine('{"type":"surprise"}')).toThrow(/Unknown Rust exporter event/) + }) + + it('uses platform executable naming without leaking request fields into args', () => { + expect(resolveRustExporterExecutableName('win32')).toBe('weflow-exporter.exe') + expect(resolveRustExporterExecutableName('linux')).toBe('weflow-exporter') + }) +}) diff --git a/electron/services/export/rustExportBridge.ts b/electron/services/export/rustExportBridge.ts new file mode 100644 index 00000000..0ac0de74 --- /dev/null +++ b/electron/services/export/rustExportBridge.ts @@ -0,0 +1,72 @@ +import * as path from 'path' + +export type RustExportEvent = + | { type: 'progress'; data?: Record; [key: string]: unknown } + | { type: 'createdFile'; path: string } + | { type: 'createdDir'; path: string } + | { type: 'result'; success: boolean; successCount?: number; failCount?: number; [key: string]: unknown } + | { type: 'error'; error: string } + +export interface RustExporterPathConfig { + resourcesPath: string + platform?: NodeJS.Platform + arch?: string + executablePath?: string +} + +export function resolveRustExporterExecutableName(platform: NodeJS.Platform = process.platform): string { + return platform === 'win32' ? 'weflow-exporter.exe' : 'weflow-exporter' +} + +export function resolveRustExporterPlatformDir(platform: NodeJS.Platform = process.platform): string { + if (platform === 'darwin') return 'macos' + return platform +} + +export function resolveRustExporterArchDir(platform: NodeJS.Platform = process.platform, arch: string = process.arch): string { + if (platform === 'darwin') return 'universal' + return arch +} + +export function resolveRustExporterPath(config: RustExporterPathConfig): string { + if (config.executablePath) return config.executablePath + return path.join( + config.resourcesPath, + 'exporter', + resolveRustExporterPlatformDir(config.platform), + resolveRustExporterArchDir(config.platform, config.arch), + resolveRustExporterExecutableName(config.platform) + ) +} + +export function parseRustExportEventLine(line: string): RustExportEvent { + let parsed: unknown + try { + parsed = JSON.parse(line) + } catch (error) { + throw new Error(`Invalid Rust exporter event: ${error instanceof Error ? error.message : String(error)}`) + } + + if (!parsed || typeof parsed !== 'object') { + throw new Error('Invalid Rust exporter event: expected object') + } + + const event = parsed as Record + const type = String(event.type || '') + if (type === 'progress') return event as RustExportEvent + if (type === 'createdFile') { + const filePath = String(event.path || '').trim() + if (!filePath) throw new Error('Invalid Rust exporter event: createdFile.path is required') + return { type, path: filePath } + } + if (type === 'createdDir') { + const dirPath = String(event.path || '').trim() + if (!dirPath) throw new Error('Invalid Rust exporter event: createdDir.path is required') + return { type, path: dirPath } + } + if (type === 'result') return event as RustExportEvent + if (type === 'error') { + return { type, error: String(event.error || 'Rust exporter failed') } + } + throw new Error(`Unknown Rust exporter event: ${type || ''}`) +} diff --git a/electron/services/export/rustStreamingExporter.ts b/electron/services/export/rustStreamingExporter.ts new file mode 100644 index 00000000..efb7ef58 --- /dev/null +++ b/electron/services/export/rustStreamingExporter.ts @@ -0,0 +1,656 @@ +import { spawn, type ChildProcessWithoutNullStreams } from 'child_process' +import * as readline from 'readline' +import { canUseRustExportEngine } from './exportEngineRouter' +import { + createMessageStream, + isMessageStreamPauseError, + isMessageStreamStopError, + throwIfMessageStreamControlRequested, + type MessageCursorSource, + type MessageStreamControl, + type MessageStreamRow +} from './messageStream' +import { parseRustExportEventLine, resolveRustExporterPath, type RustExportEvent } from './rustExportBridge' +import { exportService } from '../exportService' +import { extractReadableSystemMessageText } from '../systemMessageFormatter' + +export interface RustStreamingExportOptions { + format: 'txt' | 'html' | 'chatlab-jsonl' | 'weclone' | 'json' | string + dateRange?: { start: number; end: number } | null + senderUsername?: string + fileNameSuffix?: string + contentType?: string + exportMedia?: boolean + exportAvatars?: boolean + exportImages?: boolean + exportVoices?: boolean + exportVideos?: boolean + exportEmojis?: boolean + exportFiles?: boolean + exportVoiceAsText?: boolean + displayNamePreference?: 'group-nickname' | 'remark' | 'nickname' +} + +export interface RustStreamingExportRequest { + source: MessageCursorSource & { + open: (accountDir: string, decryptKey: string) => Promise + getDisplayNames: (usernames: string[]) => Promise<{ success: boolean; map?: Record; error?: string }> + getContact?: (username: string) => Promise<{ success: boolean; contact?: any; error?: string }> + getGroupNicknames?: (chatroomId: string) => Promise<{ success: boolean; nicknames?: Record; error?: string }> + } + sessionIds: string[] + outputDir: string + options: RustStreamingExportOptions + accountDir: string + decryptKey: string + cleanedMyWxid: string + resourcesPath: string + onProgress?: (progress: Record) => void + control?: MessageStreamControl & { + recordCreatedFile?: (filePath: string) => void + recordCreatedDir?: (dirPath: string) => void + } +} + +class RustWriterProcess { + private child: ChildProcessWithoutNullStreams | null = null + private resultPromise: Promise> | null = null + private settleResult: ((value: Record) => void) | null = null + private rejectResult: ((error: unknown) => void) | null = null + private stderr = '' + + constructor( + private readonly executablePath: string, + private readonly callbacks: { + onProgress?: (progress: Record) => void + onCreatedFile?: (filePath: string) => void + onCreatedDir?: (dirPath: string) => void + } + ) {} + + async start(request: Pick): Promise { + this.child = spawn(this.executablePath, [], { + stdio: ['pipe', 'pipe', 'pipe'], + windowsHide: true + }) + + this.resultPromise = new Promise((resolve, reject) => { + this.settleResult = resolve + this.rejectResult = reject + }) + + const rl = readline.createInterface({ input: this.child.stdout }) + rl.on('line', (line) => { + const trimmed = line.trim() + if (!trimmed) return + let event: RustExportEvent + try { + event = parseRustExportEventLine(trimmed) + } catch (error) { + this.rejectResult?.(error) + return + } + + if (event.type === 'progress') { + this.callbacks.onProgress?.((event.data ?? event) as Record) + } else if (event.type === 'createdFile') { + this.callbacks.onCreatedFile?.(event.path) + } else if (event.type === 'createdDir') { + this.callbacks.onCreatedDir?.(event.path) + } else if (event.type === 'result') { + this.settleResult?.(event as Record) + } else if (event.type === 'error') { + this.rejectResult?.(new Error(event.error)) + } + }) + + this.child.stderr.on('data', (chunk) => { + this.stderr += String(chunk || '') + }) + this.child.on('error', (error) => this.rejectResult?.(error)) + this.child.on('exit', (code) => { + if (code === 0) return + const suffix = this.stderr.trim() ? `: ${this.stderr.trim().slice(0, 500)}` : '' + this.rejectResult?.(new Error(`Rust writer exited before result (code ${code})${suffix}`)) + }) + + await this.writeEvent({ + type: 'writerRequest', + outputDir: request.outputDir, + options: request.options + }) + } + + async beginSession(sessionId: string, displayName: string, session?: Record): Promise { + await this.writeEvent({ type: 'beginSession', sessionId, displayName, session }) + } + + async writeMessage(row: MessageStreamRow, senderName: string, jsonMessage?: Record): Promise { + await this.writeEvent({ type: 'message', row, senderName, jsonMessage }) + } + + async endSession(): Promise { + await this.writeEvent({ type: 'endSession' }) + } + + async finish(): Promise> { + await this.writeEvent({ type: 'finish' }) + this.child?.stdin.end() + return await this.resultPromise! + } + + cancel(): void { + try { + if (this.child && !this.child.stdin.destroyed) { + this.child.stdin.write(`${JSON.stringify({ type: 'cancel' })}\n`) + } + } catch {} + try { + this.child?.kill() + } catch {} + } + + private async writeEvent(event: Record): Promise { + if (!this.child || this.child.stdin.destroyed) { + throw new Error('Rust writer is not running') + } + const line = `${JSON.stringify(event)}\n` + await new Promise((resolve, reject) => { + const onError = (error: Error) => { + cleanup() + reject(error) + } + const onDrain = () => { + cleanup() + resolve() + } + const cleanup = () => { + this.child?.stdin.off('error', onError) + this.child?.stdin.off('drain', onDrain) + } + this.child!.stdin.once('error', onError) + if (!this.child!.stdin.write(line)) { + this.child!.stdin.once('drain', onDrain) + } else { + cleanup() + resolve() + } + }) + } +} + +export function canUseRustStreamingExport(options: RustStreamingExportOptions): boolean { + return canUseRustExportEngine(options) +} + +export async function exportSessionsWithRustStreaming(request: RustStreamingExportRequest): Promise> { + if (!canUseRustStreamingExport(request.options)) { + return { success: false, successCount: 0, failCount: request.sessionIds.length, error: `Rust streaming exporter does not support format: ${request.options.format}` } + } + + const opened = await request.source.open(request.accountDir, request.decryptKey) + if (!opened) { + return { success: false, successCount: 0, failCount: request.sessionIds.length, error: 'WCDB 打开失败' } + } + + const executablePath = resolveRustExporterPath({ resourcesPath: request.resourcesPath }) + const writer = new RustWriterProcess(executablePath, { + onProgress: request.onProgress, + onCreatedFile: request.control?.recordCreatedFile, + onCreatedDir: request.control?.recordCreatedDir + }) + const successSessionIds: string[] = [] + let activeSessionIndex = 0 + + try { + await writer.start({ outputDir: request.outputDir, options: request.options }) + const sessionNames = await getDisplayNameMap(request.source, request.sessionIds) + const senderNameCache = new Map() + + for (let index = 0; index < request.sessionIds.length; index++) { + activeSessionIndex = index + throwIfMessageStreamControlRequested(request.control) + const sessionId = request.sessionIds[index] + const sessionName = sessionNames.get(sessionId) || sessionId + request.onProgress?.({ + current: index, + total: request.sessionIds.length, + currentSession: sessionName, + currentSessionId: sessionId, + phase: 'preparing', + phaseLabel: 'Rust 写入器准备导出' + }) + const detailedJsonContext = request.options.format === 'json' + ? await createDetailedJsonContext(request.source, sessionId, sessionName, request.cleanedMyWxid, request.options.displayNamePreference) + : null + await writer.beginSession(sessionId, sessionName, detailedJsonContext?.sessionPayload) + + const stream = createMessageStream({ + source: request.source, + sessionId, + cleanedMyWxid: request.cleanedMyWxid, + dateRange: request.options.dateRange, + senderUsername: request.options.senderUsername, + control: request.control, + decodeContent: decodeMessageContent + }) + + let exportedMessages = 0 + for await (const row of stream) { + throwIfMessageStreamControlRequested(request.control) + const senderName = await resolveSenderName(request.source, row, sessionId, sessionName, senderNameCache) + const messageIndex = exportedMessages + 1 + const jsonMessage = detailedJsonContext + ? await buildDetailedJsonMessage(row, messageIndex, detailedJsonContext) + : undefined + await writer.writeMessage(formatRustWriterRow(row), senderName, jsonMessage) + exportedMessages = messageIndex + if (exportedMessages % 1000 === 0) { + request.onProgress?.({ + current: index, + total: request.sessionIds.length, + currentSession: sessionName, + currentSessionId: sessionId, + phase: 'exporting', + exportedMessages + }) + } + } + + await writer.endSession() + successSessionIds.push(sessionId) + request.onProgress?.({ + current: index + 1, + total: request.sessionIds.length, + currentSession: sessionName, + currentSessionId: sessionId, + phase: 'complete', + exportedMessages, + writtenFiles: 1 + }) + } + + return await writer.finish() + } catch (error) { + writer.cancel() + if (isMessageStreamStopError(error) || isMessageStreamPauseError(error)) { + const stopped = isMessageStreamStopError(error) + const paused = isMessageStreamPauseError(error) + return { + success: true, + successCount: successSessionIds.length, + failCount: 0, + stopped: stopped || undefined, + paused: paused || undefined, + pendingSessionIds: request.sessionIds.slice(activeSessionIndex), + successSessionIds, + failedSessionIds: [], + failedSessionErrors: {}, + sessionOutputPaths: {} + } + } + throw error + } +} + +interface DetailedJsonContext { + sessionId: string + sessionName: string + isGroup: boolean + cleanedMyWxid: string + displayNamePreference: 'group-nickname' | 'remark' | 'nickname' + groupNicknamesMap: Map + contactCache: Map> + source: RustStreamingExportRequest['source'] + sessionPayload: Record +} + +async function createDetailedJsonContext( + source: RustStreamingExportRequest['source'], + sessionId: string, + sessionName: string, + cleanedMyWxid: string, + displayNamePreference?: RustStreamingExportOptions['displayNamePreference'] +): Promise { + const preference = displayNamePreference || 'remark' + const isGroup = sessionId.includes('@chatroom') + const contactCache = new Map>() + const groupNicknamesMap = isGroup ? await getGroupNicknamesMap(source, sessionId) : new Map() + const sessionContact = await getContactCached(source, contactCache, sessionId) + const sessionNickname = getContactNickname(sessionContact.contact) || sessionName + const sessionRemark = getContactRemark(sessionContact.contact) + const sessionGroupNickname = isGroup + ? callExportHelper('resolveGroupNicknameByCandidates', groupNicknamesMap, [sessionId]) || '' + : '' + const sessionDisplayName = getPreferredDisplayName(sessionId, sessionNickname, sessionRemark, sessionGroupNickname, preference) + + return { + sessionId, + sessionName, + isGroup, + cleanedMyWxid, + displayNamePreference: preference, + groupNicknamesMap, + contactCache, + source, + sessionPayload: { + wxid: sessionId, + nickname: sessionNickname, + remark: sessionRemark, + displayName: sessionDisplayName, + type: isGroup ? '群聊' : '私聊', + lastTimestamp: null, + messageCount: 0 + } + } +} + +async function buildDetailedJsonMessage( + row: MessageStreamRow, + localId: number, + context: DetailedJsonContext +): Promise> { + const sourceMatch = /[\s\S]*?<\/msgsource>/i.exec(row.content || '') + const source = sourceMatch ? sourceMatch[0] : '' + let content: string | null = parseMessageContent(row, context) + if (callExportHelper('isReadableSystemMessage', row.localType, row.content)) { + content = callExportHelper('extractReadableSystemMessageText', row.content) || content + } + + const quotedReplyDisplay = await resolveQuotedReplyDisplay(row, context) + if (quotedReplyDisplay) { + content = callExportHelper('buildQuotedReplyText', quotedReplyDisplay) || content + } + const appendedLinkContent = quotedReplyDisplay + ? null + : callExportHelper('formatLinkCardExportText', row.content, row.localType, 'append-url') + if (appendedLinkContent) { + content = appendedLinkContent + } + + const senderDisplayName = await resolveDetailedSenderDisplayName(row, context) + const message: Record = { + localId, + createTime: row.createTime, + formattedTime: formatTimestamp(row.createTime), + type: getMessageTypeName(row.localType, row.content), + localType: row.localType, + content, + isSend: row.isSend ? 1 : 0, + senderUsername: row.senderUsername, + senderDisplayName, + source, + senderAvatarKey: row.senderUsername + } + + if (row.localType === 47) { + if (row.emojiMd5) message.emojiMd5 = row.emojiMd5 + if (row.emojiCdnUrl) message.emojiCdnUrl = row.emojiCdnUrl + if (row.emojiCaption) message.emojiCaption = row.emojiCaption + } + + const platformMessageId = normalizeUnsignedIntToken(row.serverIdRaw ?? row.serverId) + if (platformMessageId !== '0') message.platformMessageId = platformMessageId + + const replyToMessageId = callExportHelper('getExportReplyToMessageId', row.content) + if (replyToMessageId) message.replyToMessageId = replyToMessageId + + const appMsgMeta = callExportHelper | null>('extractArkmeAppMessageMeta', row.content, row.localType) + if (appMsgMeta && (appMsgMeta.appMsgKind === 'quote' || appMsgMeta.appMsgKind === 'link')) { + Object.assign(message, appMsgMeta) + } + if (quotedReplyDisplay) { + if (quotedReplyDisplay.quotedSender) message.quotedSender = quotedReplyDisplay.quotedSender + if (quotedReplyDisplay.quotedPreview) message.quotedContent = quotedReplyDisplay.quotedPreview + } + + if (typeof message.content === 'string' && callExportHelper('isTransferExportContent', message.content) && row.content) { + const transferDesc = await resolveTransferDesc(row.content, context) + if (transferDesc) { + message.content = callExportHelper('appendTransferDesc', message.content, transferDesc) || message.content + } + } + + if (row.localType === 48) { + if (row.locationLat != null) message.locationLat = row.locationLat + if (row.locationLng != null) message.locationLng = row.locationLng + if (row.locationPoiname) message.locationPoiname = row.locationPoiname + if (row.locationLabel) message.locationLabel = row.locationLabel + } + + return message +} + +function parseMessageContent(row: MessageStreamRow, context: DetailedJsonContext): string | null { + const parsed = callExportHelper( + 'parseMessageContent', + row.content, + row.localType, + undefined, + undefined, + context.cleanedMyWxid, + row.senderUsername, + row.isSend, + row.emojiCaption + ) + return parsed ?? row.content ?? '' +} + +async function resolveQuotedReplyDisplay(row: MessageStreamRow, context: DetailedJsonContext): Promise { + if (!row.content || !/(|<refermsg>|('resolveQuotedReplyDisplayWithNames', { + content: row.content, + isGroup: context.isGroup, + displayNamePreference: context.displayNamePreference, + getContact: (username: string) => getContactCached(context.source, context.contactCache, username), + groupNicknamesMap: context.groupNicknamesMap, + cleanedMyWxid: context.cleanedMyWxid, + rawMyWxid: context.cleanedMyWxid, + myDisplayName: context.cleanedMyWxid + }) +} + +async function resolveTransferDesc(content: string, context: DetailedJsonContext): Promise { + return await callExportHelperAsync('resolveTransferDesc', content, context.cleanedMyWxid, context.groupNicknamesMap, async (username: string) => { + const contactResult = await getContactCached(context.source, context.contactCache, username) + return getContactRemark(contactResult.contact) || getContactNickname(contactResult.contact) || username + }) || '' +} + +async function resolveDetailedSenderDisplayName(row: MessageStreamRow, context: DetailedJsonContext): Promise { + const senderWxid = row.senderUsername || '' + const contactResult = senderWxid + ? await getContactCached(context.source, context.contactCache, senderWxid) + : { success: false as const } + const senderNickname = getContactNickname(contactResult.contact) || senderWxid + const senderRemark = getContactRemark(contactResult.contact) + const senderGroupNickname = context.isGroup + ? callExportHelper('resolveGroupNicknameByCandidates', context.groupNicknamesMap, [senderWxid]) || '' + : '' + return getPreferredDisplayName(senderWxid, senderNickname, senderRemark, senderGroupNickname, context.displayNamePreference) +} + +function getPreferredDisplayName( + wxid: string, + nickname: string, + remark: string, + groupNickname: string, + preference: 'group-nickname' | 'remark' | 'nickname' +): string { + return callExportHelper('getPreferredDisplayName', wxid, nickname, remark, groupNickname, preference) + || groupNickname + || remark + || nickname + || wxid +} + +async function getContactCached( + source: RustStreamingExportRequest['source'], + cache: Map>, + username: string +): Promise<{ success: boolean; contact?: any; error?: string }> { + const normalized = String(username || '').trim() + if (!normalized || !source.getContact) return { success: false } + const cached = cache.get(normalized) + if (cached) return await cached + const pending = source.getContact(normalized).catch((error) => ({ success: false as const, error: String(error) })) + cache.set(normalized, pending) + return await pending +} + +async function getGroupNicknamesMap( + source: RustStreamingExportRequest['source'], + sessionId: string +): Promise> { + if (!source.getGroupNicknames) return new Map() + try { + const result = await source.getGroupNicknames(sessionId) + if (!result.success || !result.nicknames) return new Map() + return new Map(Object.entries(result.nicknames).map(([key, value]) => [key, String(value || '')]).filter(([, value]) => value)) + } catch { + return new Map() + } +} + +function getContactNickname(contact: any): string { + return String(contact?.nickName || contact?.nick_name || contact?.nickname || contact?.displayName || '').trim() +} + +function getContactRemark(contact: any): string { + return String(contact?.remark || '').trim() +} + +function getMessageTypeName(localType: number, content: string): string { + return callExportHelper('getMessageTypeName', localType, content) || fallbackMessageTypeName(localType) +} + +function formatTimestamp(timestamp: number): string { + return callExportHelper('formatTimestamp', timestamp) || fallbackFormatTimestamp(timestamp) +} + +function normalizeUnsignedIntToken(value: unknown): string { + const text = String(value ?? '').trim() + if (!text) return '0' + if (/^\d+$/.test(text)) return text.replace(/^0+(?=\d)/, '') || '0' + const parsed = Number(text) + if (!Number.isFinite(parsed) || parsed < 0) return '0' + return String(Math.floor(parsed)) +} + +function fallbackMessageTypeName(localType: number): string { + const names: Record = { + 1: '文本消息', + 3: '图片消息', + 34: '语音消息', + 42: '名片消息', + 43: '视频消息', + 47: '动画表情', + 48: '位置消息', + 49: '链接消息', + 50: '通话消息', + 10000: '系统消息', + 244813135921: '引用消息' + } + return names[localType] || '其他消息' +} + +function fallbackFormatTimestamp(timestamp: number): string { + const date = new Date(timestamp * 1000) + const pad = (value: number) => String(value).padStart(2, '0') + return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}` +} + +function callExportHelper(name: string, ...args: any[]): T | undefined { + const helper = (exportService as any)[name] + if (typeof helper !== 'function') return undefined + try { + return helper.apply(exportService, args) as T + } catch { + return undefined + } +} + +async function callExportHelperAsync(name: string, ...args: any[]): Promise { + const helper = (exportService as any)[name] + if (typeof helper !== 'function') return undefined + try { + return await helper.apply(exportService, args) as T + } catch { + return undefined + } +} + +async function resolveSenderName( + source: RustStreamingExportRequest['source'], + row: MessageStreamRow, + sessionId: string, + sessionName: string, + senderNameCache: Map +): Promise { + if (row.isSend) return '我' + if (!sessionId.includes('@chatroom')) return sessionName + if (senderNameCache.has(row.senderUsername)) return senderNameCache.get(row.senderUsername)! + const nameMap = await getDisplayNameMap(source, [row.senderUsername]) + const name = nameMap.get(row.senderUsername) || row.senderUsername + senderNameCache.set(row.senderUsername, name) + return name +} + +function formatRustWriterRow(row: MessageStreamRow): MessageStreamRow { + if (row.localType !== 10000 && !/> { + const result = await source.getDisplayNames(usernames) + const map = new Map() + for (const username of usernames) { + map.set(username, result.success && result.map ? (result.map[username] || username) : username) + } + return map +} + +function decodeMessageContent(row: any): string { + const compressed = decodeMaybeEncoded(row?.compress_content ?? row?.compressContent) + if (compressed) return compressed + return decodeMaybeEncoded(row?.message_content ?? row?.messageContent ?? row?.content ?? '') +} + +function decodeMaybeEncoded(raw: unknown): string { + if (!raw || typeof raw !== 'string') return '' + const value = raw.trim() + if (!value) return '' + if (/^[0-9]+$/.test(value)) return value + if (value.length > 16 && /^[0-9a-fA-F]+$/.test(value) && value.length % 2 === 0) { + try { + return decodeBinaryContent(Buffer.from(value, 'hex')) + } catch { + return '' + } + } + if (value.length > 16 && /^[A-Za-z0-9+/]+={0,2}$/.test(value) && value.length % 4 === 0) { + try { + return decodeBinaryContent(Buffer.from(value, 'base64')) + } catch { + return value + } + } + return value +} + +function decodeBinaryContent(data: Buffer): string { + if (data.length >= 4 && data.readUInt32LE(0) === 0xFD2FB528) { + try { + const fzstd = require('fzstd') + const decompressed = fzstd.decompress(data) + return Buffer.from(decompressed).toString('utf-8') + } catch { + return '' + } + } + return data.toString('utf-8').replace(/\uFFFD/g, '') +} diff --git a/electron/services/export/streamingWriters.test.ts b/electron/services/export/streamingWriters.test.ts new file mode 100644 index 00000000..41a7b048 --- /dev/null +++ b/electron/services/export/streamingWriters.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it } from 'vitest' +import { writeChatLabJsonlStream, writeHtmlStream, writeTxtStream } from './streamingWriters' +import type { MessageStreamRow } from './messageStream' + +class MemorySink { + chunks: string[] = [] + async write(chunk: string): Promise { + this.chunks.push(chunk) + } + async end(): Promise {} + text(): string { + return this.chunks.join('') + } +} + +async function* rows(): AsyncGenerator { + yield { localId: 1, serverId: 11, createTime: 1, localType: 1, content: 'hello', senderUsername: 'me', isSend: true } + yield { localId: 2, serverId: 12, createTime: 2, localType: 1, content: '&"', senderUsername: 'you', isSend: false } +} + +async function* systemRows(): AsyncGenerator { + yield { + localId: 3, + serverId: 13, + createTime: 3, + localType: 10000, + content: '', + senderUsername: 'room', + isSend: false + } +} + +async function* qrcodeSystemRows(): AsyncGenerator { + yield { + localId: 4, + serverId: 14, + createTime: 4, + localType: 10000, + content: '', + senderUsername: 'room', + isSend: false + } +} + +describe('streaming writers', () => { + it('writes txt without buffering the full stream', async () => { + const sink = new MemorySink() + await writeTxtStream(rows(), sink, { + getSenderName: (row) => row.isSend ? '我' : '对方', + formatTimestamp: (ts) => `t${ts}`, + flushEvery: 1 + }) + expect(sink.chunks.length).toBeGreaterThan(1) + expect(sink.text()).toContain("t1 '我'\nhello\n\n") + expect(sink.text()).toContain("t2 '对方'\n&\"\n\n") + }) + + it('escapes html while writing message chunks', async () => { + const sink = new MemorySink() + await writeHtmlStream(rows(), sink, { + sessionName: 'A&B', + getSenderName: (row) => row.senderUsername, + formatTimestamp: (ts) => `t${ts}`, + flushEvery: 1 + }) + const text = sink.text() + expect(text).toContain('A&B') + expect(text).toContain('<b>&"') + expect(text).not.toContain('&"') + }) + + it('writes valid jsonl records split across chunks', async () => { + const sink = new MemorySink() + await writeChatLabJsonlStream(rows(), sink, { + sessionName: 'room', + getSenderName: (row) => row.senderUsername, + flushEvery: 1 + }) + const lines = sink.text().trim().split(/\r?\n/) + expect(lines.length).toBe(4) + expect(JSON.parse(lines[0])._type).toBe('chatlab') + expect(JSON.parse(lines[2]).content).toBe('hello') + expect(JSON.parse(lines[3]).content).toBe('&"') + }) + + it('expands system message templates while streaming', async () => { + const sink = new MemorySink() + await writeChatLabJsonlStream(systemRows(), sink, { + sessionName: 'room', + getSenderName: (row) => row.senderUsername, + flushEvery: 1 + }) + const message = JSON.parse(sink.text().trim().split(/\r?\n/)[2]) + expect(message.content).toBe('"张三"邀请"李四、王五"加入了群聊') + expect(message.content).not.toContain('$username$') + expect(message.content).not.toContain('$names$') + }) + + it('expands QR code join templates while streaming', async () => { + const sink = new MemorySink() + await writeTxtStream(qrcodeSystemRows(), sink, { + getSenderName: (row) => row.senderUsername, + flushEvery: 1 + }) + expect(sink.text()).toContain('"新成员"通过扫描"分享者"分享的二维码加入群聊') + expect(sink.text()).not.toContain('$adder$') + expect(sink.text()).not.toContain('$from$') + }) +}) diff --git a/electron/services/export/streamingWriters.ts b/electron/services/export/streamingWriters.ts new file mode 100644 index 00000000..cbebef60 --- /dev/null +++ b/electron/services/export/streamingWriters.ts @@ -0,0 +1,159 @@ +import type { MessageStreamRow } from './messageStream' +import { extractReadableSystemMessageText } from '../systemMessageFormatter' + +export interface TextSink { + write: (chunk: string) => Promise + end?: () => Promise +} + +export interface StreamingWriterOptions { + flushEvery?: number + getSenderName: (row: MessageStreamRow) => string | Promise + formatTimestamp?: (timestamp: number) => string +} + +export interface HtmlStreamingWriterOptions extends StreamingWriterOptions { + sessionName: string +} + +export interface ChatLabJsonlStreamingWriterOptions extends StreamingWriterOptions { + sessionName: string +} + +export async function writeTxtStream( + rows: AsyncIterable, + sink: TextSink, + options: StreamingWriterOptions +): Promise<{ messageCount: number }> { + let buffer: string[] = [] + let messageCount = 0 + const flushEvery = Math.max(1, Math.floor(options.flushEvery || 120)) + const flush = async () => { + if (buffer.length === 0) return + await sink.write(buffer.join('')) + buffer = [] + } + + for await (const row of rows) { + const senderName = await options.getSenderName(row) + const timestamp = formatTimestamp(row.createTime, options) + const content = formatStreamingContent(row) + buffer.push(`${timestamp} '${senderName}'\n${content}\n\n`) + messageCount++ + if (buffer.length >= flushEvery) { + await flush() + } + } + + await flush() + await sink.end?.() + return { messageCount } +} + +export async function writeHtmlStream( + rows: AsyncIterable, + sink: TextSink, + options: HtmlStreamingWriterOptions +): Promise<{ messageCount: number }> { + const flushEvery = Math.max(1, Math.floor(options.flushEvery || 100)) + let buffer: string[] = [] + let messageCount = 0 + const flush = async () => { + if (buffer.length === 0) return + await sink.write(buffer.join('')) + buffer = [] + } + + await sink.write(`${escapeHtml(options.sessionName)}
\n`) + for await (const row of rows) { + const senderName = await options.getSenderName(row) + const timestamp = formatTimestamp(row.createTime, options) + const content = formatStreamingContent(row) + buffer.push( + `
` + + `` + + `${escapeHtml(senderName)}` + + `

${escapeHtml(content).replace(/\r?\n/g, '
')}

` + + `
\n` + ) + messageCount++ + if (buffer.length >= flushEvery) { + await flush() + } + } + await flush() + await sink.write('
\n') + await sink.end?.() + return { messageCount } +} + +export async function writeChatLabJsonlStream( + rows: AsyncIterable, + sink: TextSink, + options: ChatLabJsonlStreamingWriterOptions +): Promise<{ messageCount: number }> { + const flushEvery = Math.max(1, Math.floor(options.flushEvery || 200)) + let buffer: string[] = [] + let messageCount = 0 + const flush = async () => { + if (buffer.length === 0) return + await sink.write(buffer.join('')) + buffer = [] + } + + await sink.write(`${JSON.stringify({ _type: 'chatlab', version: '1.0', generator: 'WeFlow' })}\n`) + await sink.write(`${JSON.stringify({ _type: 'meta', name: options.sessionName, platform: 'wechat' })}\n`) + + for await (const row of rows) { + const accountName = await options.getSenderName(row) + const content = formatStreamingContent(row) + buffer.push(`${JSON.stringify({ + _type: 'message', + sender: row.senderUsername, + accountName, + timestamp: row.createTime, + type: toChatLabType(row.localType), + content, + platformMessageId: row.serverIdRaw || String(row.serverId || row.localId) + })}\n`) + messageCount++ + if (buffer.length >= flushEvery) { + await flush() + } + } + + await flush() + await sink.end?.() + return { messageCount } +} + +function formatTimestamp(timestamp: number, options: Pick): string { + return options.formatTimestamp ? options.formatTimestamp(timestamp) : String(timestamp) +} + +function formatStreamingContent(row: MessageStreamRow): string { + if (row.localType === 10000 || //g, '>') + .replace(/"/g, '"') + .replace(/'/g, ''') +} + +function toChatLabType(localType: number): number { + if (localType === 1) return 0 + if (localType === 3) return 1 + if (localType === 34) return 2 + if (localType === 43) return 3 + if (localType === 47) return 5 + if (localType === 48) return 8 + if (localType === 10000) return 80 + return 99 +} diff --git a/electron/services/export/syntheticLargeExport.test.ts b/electron/services/export/syntheticLargeExport.test.ts new file mode 100644 index 00000000..471586db --- /dev/null +++ b/electron/services/export/syntheticLargeExport.test.ts @@ -0,0 +1,86 @@ +import * as fs from 'fs' +import * as os from 'os' +import * as path from 'path' +import { afterEach, describe, expect, it } from 'vitest' +import type { MessageStreamRow } from './messageStream' +import { writeTxtStream } from './streamingWriters' + +class FileSink { + private readonly stream: fs.WriteStream + + constructor(filePath: string) { + this.stream = fs.createWriteStream(filePath, { encoding: 'utf-8' }) + this.stream.setMaxListeners(0) + } + + async write(chunk: string): Promise { + await new Promise((resolve, reject) => { + this.stream.once('error', reject) + if (!this.stream.write(chunk)) { + this.stream.once('drain', resolve) + } else { + resolve() + } + }) + } + + async end(): Promise { + await new Promise((resolve, reject) => { + this.stream.once('error', reject) + this.stream.end(() => resolve()) + }) + } +} + +const createdFiles: string[] = [] + +afterEach(() => { + for (const filePath of createdFiles.splice(0)) { + try { fs.rmSync(filePath, { force: true }) } catch {} + } +}) + +describe('synthetic large streaming export', () => { + it('writes 550k txt messages with bounded heap growth', async () => { + const total = 550_000 + const outputPath = path.join(os.tmpdir(), `weflow-stream-${Date.now()}-${process.pid}.txt`) + createdFiles.push(outputPath) + + let peakHeap = process.memoryUsage().heapUsed + async function* generateRows(): AsyncGenerator { + for (let i = 1; i <= total; i++) { + if ((i % 5000) === 0) { + peakHeap = Math.max(peakHeap, process.memoryUsage().heapUsed) + await new Promise(resolve => setImmediate(resolve)) + } + yield { + localId: i, + serverId: i, + createTime: 1700000000 + i, + localType: 1, + content: `message ${i}`, + senderUsername: i % 2 === 0 ? 'me' : 'friend', + isSend: i % 2 === 0 + } + } + } + + const startHeap = process.memoryUsage().heapUsed + const startedAt = Date.now() + const result = await writeTxtStream(generateRows(), new FileSink(outputPath), { + getSenderName: row => row.isSend ? '我' : 'friend', + formatTimestamp: ts => String(ts), + flushEvery: 512 + }) + peakHeap = Math.max(peakHeap, process.memoryUsage().heapUsed) + + const stat = fs.statSync(outputPath) + const heapGrowthMb = (peakHeap - startHeap) / 1024 / 1024 + const durationMs = Date.now() - startedAt + console.info(`[syntheticLargeExport] messages=${result.messageCount} bytes=${stat.size} durationMs=${durationMs} heapGrowthMb=${heapGrowthMb.toFixed(1)}`) + + expect(result.messageCount).toBe(total) + expect(stat.size).toBeGreaterThan(10 * 1024 * 1024) + expect(heapGrowthMb).toBeLessThan(128) + }, 120_000) +}) diff --git a/electron/services/export/typescriptStreamingExporter.test.ts b/electron/services/export/typescriptStreamingExporter.test.ts new file mode 100644 index 00000000..81645ede --- /dev/null +++ b/electron/services/export/typescriptStreamingExporter.test.ts @@ -0,0 +1,113 @@ +import * as fs from 'fs' +import * as os from 'os' +import * as path from 'path' +import { afterEach, describe, expect, it } from 'vitest' +import { canUseTypeScriptStreamingExport, exportSessionsWithTypeScriptStreaming } from './typescriptStreamingExporter' + +const cleanupPaths: string[] = [] + +afterEach(() => { + for (const item of cleanupPaths.splice(0).sort((a, b) => b.length - a.length)) { + try { fs.rmSync(item, { recursive: true, force: true }) } catch {} + } +}) + +describe('typescript streaming exporter', () => { + it('gates streaming to text-only supported formats', () => { + expect(canUseTypeScriptStreamingExport({ format: 'txt' })).toBe(true) + expect(canUseTypeScriptStreamingExport({ format: 'html', contentType: 'text' })).toBe(true) + expect(canUseTypeScriptStreamingExport({ format: 'weclone' })).toBe(false) + expect(canUseTypeScriptStreamingExport({ format: 'txt', exportMedia: true })).toBe(false) + }) + + it('exports a session through cursor batches and records created paths', async () => { + const outputDir = path.join(os.tmpdir(), `weflow-ts-stream-${Date.now()}-${process.pid}`) + cleanupPaths.push(outputDir) + const createdFiles: string[] = [] + const createdDirs: string[] = [] + let fetchCount = 0 + let closedCursor = 0 + const source = { + open: async () => true, + getDisplayNames: async (usernames: string[]) => ({ + success: true, + map: Object.fromEntries(usernames.map(username => [username, username === 'room' ? '测试会话' : `name-${username}`])) + }), + openMessageCursor: async () => ({ success: true, cursor: 1 }), + openMessageCursorLite: async () => ({ success: true, cursor: 1 }), + fetchMessageBatch: async () => { + fetchCount++ + if (fetchCount === 1) { + return { + success: true, + rows: [ + { local_id: 1, server_id: 11, create_time: 1, local_type: 1, message_content: 'hello', is_send: 1 }, + { local_id: 2, server_id: 12, create_time: 2, local_type: 1, message_content: 'world', is_send: 0, sender_username: 'friend' } + ], + hasMore: false + } + } + return { success: true, rows: [], hasMore: false } + }, + closeMessageCursor: async (cursor: number) => { + closedCursor = cursor + } + } + + const result = await exportSessionsWithTypeScriptStreaming({ + source, + sessionIds: ['room'], + outputDir, + options: { format: 'txt' }, + accountDir: 'account', + decryptKey: 'key', + cleanedMyWxid: 'me', + control: { + recordCreatedFile: filePath => createdFiles.push(filePath), + recordCreatedDir: dirPath => createdDirs.push(dirPath) + } + }) + + expect(result.success).toBe(true) + expect(fetchCount).toBe(1) + expect(closedCursor).toBe(1) + expect(createdDirs).toEqual([outputDir]) + expect(createdFiles.length).toBe(1) + expect(fs.readFileSync(createdFiles[0], 'utf-8')).toContain('hello') + expect(fs.readFileSync(createdFiles[0], 'utf-8')).toContain('world') + }) + + it('returns a resumable paused result instead of failing the session', async () => { + const outputDir = path.join(os.tmpdir(), `weflow-ts-stream-paused-${Date.now()}-${process.pid}`) + cleanupPaths.push(outputDir) + const source = { + open: async () => true, + getDisplayNames: async (usernames: string[]) => ({ + success: true, + map: Object.fromEntries(usernames.map(username => [username, username])) + }), + openMessageCursor: async () => ({ success: true, cursor: 1 }), + openMessageCursorLite: async () => ({ success: true, cursor: 1 }), + fetchMessageBatch: async () => ({ success: true, rows: [], hasMore: false }), + closeMessageCursor: async () => {} + } + + const result = await exportSessionsWithTypeScriptStreaming({ + source, + sessionIds: ['room', 'next'], + outputDir, + options: { format: 'txt' }, + accountDir: 'account', + decryptKey: 'key', + cleanedMyWxid: 'me', + control: { + shouldPause: () => true + } + }) + + expect(result.success).toBe(true) + expect(result.paused).toBe(true) + expect(result.failedSessionIds).toEqual([]) + expect(result.pendingSessionIds).toEqual(['room', 'next']) + }) +}) diff --git a/electron/services/export/typescriptStreamingExporter.ts b/electron/services/export/typescriptStreamingExporter.ts new file mode 100644 index 00000000..4fd5089f --- /dev/null +++ b/electron/services/export/typescriptStreamingExporter.ts @@ -0,0 +1,309 @@ +import * as fs from 'fs' +import * as path from 'path' +import { canUseTypeScriptStreamingEngine } from './exportEngineRouter' +import { + createMessageStream, + isMessageStreamPauseError, + isMessageStreamStopError, + throwIfMessageStreamControlRequested, + type MessageCursorSource, + type MessageStreamControl, + type MessageStreamRow +} from './messageStream' +import { writeChatLabJsonlStream, writeHtmlStream, writeTxtStream, type TextSink } from './streamingWriters' + +export interface TypeScriptStreamingExportOptions { + format: 'txt' | 'html' | 'chatlab-jsonl' | string + dateRange?: { start: number; end: number } | null + senderUsername?: string + fileNameSuffix?: string + contentType?: string + exportMedia?: boolean + exportAvatars?: boolean + exportImages?: boolean + exportVoices?: boolean + exportVideos?: boolean + exportEmojis?: boolean + exportFiles?: boolean + exportVoiceAsText?: boolean +} + +export interface TypeScriptStreamingExportRequest { + source: MessageCursorSource & { + open: (accountDir: string, decryptKey: string) => Promise + getDisplayNames: (usernames: string[]) => Promise<{ success: boolean; map?: Record; error?: string }> + } + sessionIds: string[] + outputDir: string + options: TypeScriptStreamingExportOptions + accountDir: string + decryptKey: string + cleanedMyWxid: string + onProgress?: (progress: Record) => void + control?: MessageStreamControl & { + recordCreatedFile?: (filePath: string) => void + recordCreatedDir?: (dirPath: string) => void + } +} + +class FileSink implements TextSink { + private readonly stream: fs.WriteStream + + constructor(filePath: string) { + this.stream = fs.createWriteStream(filePath, { encoding: 'utf-8' }) + } + + async write(chunk: string): Promise { + await new Promise((resolve, reject) => { + const onError = (error: Error) => { + cleanup() + reject(error) + } + const onDrain = () => { + cleanup() + resolve() + } + const cleanup = () => { + this.stream.off('error', onError) + this.stream.off('drain', onDrain) + } + this.stream.once('error', onError) + if (!this.stream.write(chunk)) { + this.stream.once('drain', onDrain) + } else { + cleanup() + resolve() + } + }) + } + + async end(): Promise { + await new Promise((resolve, reject) => { + this.stream.once('error', reject) + this.stream.end(() => resolve()) + }) + } +} + +export function canUseTypeScriptStreamingExport(options: TypeScriptStreamingExportOptions): boolean { + return canUseTypeScriptStreamingEngine(options) +} + +export async function exportSessionsWithTypeScriptStreaming(request: TypeScriptStreamingExportRequest): Promise> { + if (!canUseTypeScriptStreamingExport(request.options)) { + return { success: false, successCount: 0, failCount: request.sessionIds.length, error: `streaming exporter does not support format: ${request.options.format}` } + } + + const opened = await request.source.open(request.accountDir, request.decryptKey) + if (!opened) { + return { success: false, successCount: 0, failCount: request.sessionIds.length, error: 'WCDB 打开失败' } + } + + await fs.promises.mkdir(request.outputDir, { recursive: true }) + request.control?.recordCreatedDir?.(request.outputDir) + const sessionNames = await getDisplayNameMap(request.source, request.sessionIds) + const senderNameCache = new Map() + const successSessionIds: string[] = [] + const failedSessionIds: string[] = [] + const failedSessionErrors: Record = {} + const sessionOutputPaths: Record = {} + + for (let index = 0; index < request.sessionIds.length; index++) { + const sessionId = request.sessionIds[index] + const sessionName = sessionNames.get(sessionId) || sessionId + request.onProgress?.({ + current: index, + total: request.sessionIds.length, + currentSession: sessionName, + currentSessionId: sessionId, + phase: 'preparing' + }) + + try { + throwIfMessageStreamControlRequested(request.control) + const outputPath = reserveOutputPath( + request.outputDir, + `${sanitizeFileName(sessionName)}${request.options.fileNameSuffix || ''}`, + extensionForFormat(request.options.format) + ) + request.control?.recordCreatedFile?.(outputPath) + const stream = createMessageStream({ + source: request.source, + sessionId, + cleanedMyWxid: request.cleanedMyWxid, + dateRange: request.options.dateRange, + senderUsername: request.options.senderUsername, + control: request.control, + decodeContent: decodeMessageContent + }) + const getSenderName = async (row: MessageStreamRow) => { + if (row.isSend) return '我' + if (!sessionId.includes('@chatroom')) return sessionName + if (senderNameCache.has(row.senderUsername)) return senderNameCache.get(row.senderUsername)! + const nameMap = await getDisplayNameMap(request.source, [row.senderUsername]) + const name = nameMap.get(row.senderUsername) || row.senderUsername + senderNameCache.set(row.senderUsername, name) + return name + } + const sink = new FileSink(outputPath) + const writerOptions = { + sessionName, + getSenderName, + formatTimestamp: formatTimestamp, + flushEvery: 256 + } + const result = request.options.format === 'txt' + ? await writeTxtStream(stream, sink, writerOptions) + : request.options.format === 'html' + ? await writeHtmlStream(stream, sink, writerOptions) + : await writeChatLabJsonlStream(stream, sink, writerOptions) + + successSessionIds.push(sessionId) + sessionOutputPaths[sessionId] = outputPath + request.onProgress?.({ + current: index + 1, + total: request.sessionIds.length, + currentSession: sessionName, + currentSessionId: sessionId, + phase: 'complete', + exportedMessages: result.messageCount, + writtenFiles: 1 + }) + } catch (error) { + const controlResult = buildControlResult( + error, + request.sessionIds.slice(index), + successSessionIds, + failedSessionIds, + failedSessionErrors, + sessionOutputPaths + ) + if (controlResult) return controlResult + failedSessionIds.push(sessionId) + failedSessionErrors[sessionId] = error instanceof Error ? error.message : String(error) + } + } + + const successCount = successSessionIds.length + const failCount = failedSessionIds.length + return { + success: successCount > 0 || failCount === 0, + successCount, + failCount, + successSessionIds, + failedSessionIds, + failedSessionErrors, + sessionOutputPaths, + error: successCount === 0 && failCount > 0 ? Object.values(failedSessionErrors).slice(0, 3).join(';') : undefined + } +} + +function buildControlResult( + error: unknown, + pendingSessionIds: string[], + successSessionIds: string[], + failedSessionIds: string[], + failedSessionErrors: Record, + sessionOutputPaths: Record +): Record | null { + const stopped = isMessageStreamStopError(error) + const paused = isMessageStreamPauseError(error) + if (!stopped && !paused) return null + + return { + success: true, + successCount: successSessionIds.length, + failCount: failedSessionIds.length, + stopped: stopped || undefined, + paused: paused || undefined, + pendingSessionIds, + successSessionIds, + failedSessionIds, + failedSessionErrors, + sessionOutputPaths + } +} + +async function getDisplayNameMap( + source: TypeScriptStreamingExportRequest['source'], + usernames: string[] +): Promise> { + const result = await source.getDisplayNames(usernames) + const map = new Map() + for (const username of usernames) { + map.set(username, result.success && result.map ? (result.map[username] || username) : username) + } + return map +} + +function extensionForFormat(format: string): string { + if (format === 'html') return '.html' + if (format === 'chatlab-jsonl') return '.jsonl' + return '.txt' +} + +function reserveOutputPath(outputDir: string, baseName: string, ext: string): string { + let candidate = path.join(outputDir, `${baseName}${ext}`) + let index = 1 + while (fs.existsSync(candidate)) { + candidate = path.join(outputDir, `${baseName} (${index})${ext}`) + index++ + } + return candidate +} + +function sanitizeFileName(value: string): string { + const cleaned = String(value || '') + .replace(/[<>:"/\\|?*\x00-\x1F]/g, '_') + .trim() + .replace(/^\.+|\.+$/g, '') + return cleaned || 'session' +} + +function formatTimestamp(timestamp: number): string { + const date = new Date(timestamp * 1000) + const pad = (value: number) => String(value).padStart(2, '0') + return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}` +} + +function decodeMessageContent(row: any): string { + const compressed = decodeMaybeEncoded(row?.compress_content ?? row?.compressContent) + if (compressed) return compressed + return decodeMaybeEncoded(row?.message_content ?? row?.messageContent ?? row?.content ?? '') +} + +function decodeMaybeEncoded(raw: unknown): string { + if (!raw) return '' + if (typeof raw !== 'string') return '' + const value = raw.trim() + if (!value) return '' + if (/^[0-9]+$/.test(value)) return value + if (value.length > 16 && /^[0-9a-fA-F]+$/.test(value) && value.length % 2 === 0) { + try { + return decodeBinaryContent(Buffer.from(value, 'hex')) + } catch { + return '' + } + } + if (value.length > 16 && /^[A-Za-z0-9+/]+={0,2}$/.test(value) && value.length % 4 === 0) { + try { + return decodeBinaryContent(Buffer.from(value, 'base64')) + } catch { + return value + } + } + return value +} + +function decodeBinaryContent(data: Buffer): string { + if (data.length >= 4 && data.readUInt32LE(0) === 0xFD2FB528) { + try { + const fzstd = require('fzstd') + const decompressed = fzstd.decompress(data) + return Buffer.from(decompressed).toString('utf-8') + } catch { + return '' + } + } + return data.toString('utf-8').replace(/\uFFFD/g, '') +} diff --git a/electron/services/exportService.ts b/electron/services/exportService.ts index 72198df9..90dbb004 100644 --- a/electron/services/exportService.ts +++ b/electron/services/exportService.ts @@ -15,6 +15,10 @@ import { voiceTranscribeService } from './voiceTranscribeService' import { exportRecordService } from './exportRecordService' import { EXPORT_HTML_STYLES } from './exportHtmlStyles' import { LRUCache } from '../utils/LRUCache.js' +import { + cleanSystemMessageContent, + extractReadableSystemMessageText as extractReadableSystemMessageTextValue +} from './systemMessageFormatter' // ChatLab 格式类型定义 interface ChatLabHeader { @@ -95,6 +99,7 @@ const FILE_APP_LOCAL_TYPE_SET = new Set(FILE_APP_LOCAL_TYPES) export interface ExportOptions { format: 'chatlab' | 'chatlab-jsonl' | 'json' | 'arkme-json' | 'html' | 'txt' | 'excel' | 'weclone' | 'sql' + engine?: 'auto' | 'typescript' | 'rust' contentType?: 'text' | 'voice' | 'image' | 'video' | 'emoji' | 'file' dateRange?: { start: number; end: number } | null senderUsername?: string @@ -165,6 +170,8 @@ export interface ExportProgress { total: number currentSession: string currentSessionId?: string + exportEngine?: 'rust' | 'typescript' + exportEngineLabel?: string phase: 'preparing' | 'exporting' | 'exporting-media' | 'exporting-voice' | 'writing' | 'complete' phaseProgress?: number phaseTotal?: number @@ -527,6 +534,25 @@ class ExportService { } } + private isExportPerfLogEnabled(): boolean { + return this.configService.get('logEnabled') === true + } + + private getMemorySnapshotMb(): { rss: number; heapUsed: number; external: number } { + const memory = process.memoryUsage() + const toMb = (value: number) => Math.round(value / 1024 / 1024) + return { + rss: toMb(memory.rss), + heapUsed: toMb(memory.heapUsed), + external: toMb(memory.external) + } + } + + private logExportPerf(label: string, fields: Record): void { + if (!this.isExportPerfLogEnabled()) return + console.info(`[ExportPerf] ${label}`, fields) + } + private async ensureExportDir(dirPath: string, control?: ExportTaskControl, dirCache?: Set): Promise { if (dirCache?.has(dirPath)) return const existed = await this.pathExists(dirPath) @@ -3043,64 +3069,11 @@ class ExportService { } private cleanSystemMessage(content: string): string { - if (!content) return '[系统消息]' - - // 先尝试提取特定的系统消息内容 - // 1. 提取 sysmsg 中的文本内容 - const sysmsgTextMatch = /]*>([\s\S]*?)<\/sysmsg>/i.exec(content) - if (sysmsgTextMatch) { - content = sysmsgTextMatch[1] - } - - // 2. 提取 revokemsg 撤回消息 - const revokeMatch = /<\/replacemsg>/i.exec(content) - if (revokeMatch) { - return revokeMatch[1].trim() - } - - // 3. 提取 pat 拍一拍消息(sysmsg 内的 template 格式) - const patMatch = /