diff --git a/src/indexing/BaseEventIndexManager.ts b/src/indexing/BaseEventIndexManager.ts index 7ec65a5f848..8f0e5a1651f 100644 --- a/src/indexing/BaseEventIndexManager.ts +++ b/src/indexing/BaseEventIndexManager.ts @@ -15,10 +15,23 @@ import { // The following interfaces take their names and member names from seshat and the spec /* eslint-disable camelcase */ + +/** A record of a place to resume crawling events in a given room. */ export interface ICrawlerCheckpoint { + /** The room to be indexed */ roomId: string; - token: string | null; + + /** The pagination index to resume crawling from. */ + token: string; + + /** + * If `fullCrawl` is false (or absent) and we find that we have already indexed the events we find, then we stop crawling. + * + * If `fullCrawl` is true, then we keep going until we reach the end of the room history. + */ fullCrawl?: boolean; + + /** Whether we should crawl in the forward or backward direction. */ direction: Direction; } diff --git a/src/indexing/EventIndex.ts b/src/indexing/EventIndex.ts index 235d68fffdb..ff066026872 100644 --- a/src/indexing/EventIndex.ts +++ b/src/indexing/EventIndex.ts @@ -26,7 +26,7 @@ import { type IMatrixProfile, type IResultRoomEvents, type SyncStateData, - type SyncState, + SyncState, type TimelineIndex, type TimelineWindow, } from "matrix-js-sdk/src/matrix"; @@ -46,6 +46,7 @@ import { type ISearchArgs, } from "./BaseEventIndexManager"; import { asyncFilter } from "../utils/arrays.ts"; +import { logErrorAndShowErrorDialog } from "../utils/ErrorUtils.tsx"; // The time in ms that the crawler will wait loop iterations if there // have not been any checkpoints to consume in the last iteration. @@ -58,20 +59,49 @@ interface ICrawler { cancel(): void; } -/* +/** * Event indexing class that wraps the platform specific event indexing. */ export default class EventIndex extends EventEmitter { - private crawlerCheckpoints: ICrawlerCheckpoint[] = []; private crawler: ICrawler | null = null; + + /** + * A list of checkpoints which are awaiting processing by the crawler, once it has done with `currentCheckpoint`. + */ + private crawlerCheckpoints: ICrawlerCheckpoint[] = []; + + /** + * The current checkpoint that the crawler is working on. + */ private currentCheckpoint: ICrawlerCheckpoint | null = null; + /** + * True if we need to add the initial checkpoints for encrypted rooms, once we've completed a sync. + * This is set if the database is empty when the indexer is first initialized. + */ + private needsInitialCheckpoints = false; + + private readonly logger; + + public constructor() { + super(); + + this.logger = logger.getChild("EventIndex"); + } + public async init(): Promise { const indexManager = PlatformPeg.get()?.getEventIndexingManager(); if (!indexManager) return; + // If the index is empty, set a flag so that we add the initial checkpoints once we sync. + // We do this check here rather than in `onSync` because, by the time `onSync` is called, there will + // have been a few events added to the index. + if (await indexManager.isEventIndexEmpty()) { + this.needsInitialCheckpoints = true; + } + this.crawlerCheckpoints = await indexManager.loadCheckpoints(); - logger.log("EventIndex: Loaded checkpoints", this.crawlerCheckpoints); + this.logger.debug("Loaded checkpoints", JSON.stringify(this.crawlerCheckpoints)); this.registerListeners(); } @@ -102,9 +132,11 @@ export default class EventIndex extends EventEmitter { } /** - * Get crawler checkpoints for the encrypted rooms and store them in the index. + * Add crawler checkpoints for all of the encrypted rooms the user is in. */ public async addInitialCheckpoints(): Promise { + this.needsInitialCheckpoints = false; + const indexManager = PlatformPeg.get()?.getEventIndexingManager(); if (!indexManager) return; const client = MatrixClientPeg.safeGet(); @@ -116,7 +148,7 @@ export default class EventIndex extends EventEmitter { Boolean(await client.getCrypto()?.isEncryptionEnabledInRoom(room.roomId)), ); - logger.log("EventIndex: Adding initial crawler checkpoints"); + this.logger.debug("addInitialCheckpoints: starting"); // Gather the prev_batch tokens and create checkpoints for // our message crawler. @@ -125,6 +157,12 @@ export default class EventIndex extends EventEmitter { const timeline = room.getLiveTimeline(); const token = timeline.getPaginationToken(Direction.Backward); + if (!token) { + this.logger.debug(`addInitialCheckpoints: No back-pagination token for room ${room.roomId}"`); + return; + } + this.logger.debug(`addInitialCheckpoints: Adding initial checkpoints for room ${room.roomId}`); + const backCheckpoint: ICrawlerCheckpoint = { roomId: room.roomId, token: token, @@ -139,57 +177,47 @@ export default class EventIndex extends EventEmitter { }; try { - if (backCheckpoint.token) { - await indexManager.addCrawlerCheckpoint(backCheckpoint); - this.crawlerCheckpoints.push(backCheckpoint); - } + await indexManager.addCrawlerCheckpoint(backCheckpoint); + this.crawlerCheckpoints.push(backCheckpoint); - if (forwardCheckpoint.token) { - await indexManager.addCrawlerCheckpoint(forwardCheckpoint); - this.crawlerCheckpoints.push(forwardCheckpoint); - } + await indexManager.addCrawlerCheckpoint(forwardCheckpoint); + this.crawlerCheckpoints.push(forwardCheckpoint); } catch (e) { - logger.log( - "EventIndex: Error adding initial checkpoints for room", - room.roomId, - backCheckpoint, - forwardCheckpoint, + this.logger.warn( + `addInitialCheckpoints: Error adding initial checkpoints for room ${room.roomId}`, e, ); } }), ); + this.logger.debug("addInitialCheckpoints: done"); } - /* + /** * The sync event listener. - * - * The listener has two cases: - * - First sync after start up, check if the index is empty, add - * initial checkpoints, if so. Start the crawler background task. - * - Every other sync, tell the event index to commit all the queued up - * live events */ - private onSync = async (state: SyncState, prevState: SyncState | null, data?: SyncStateData): Promise => { - const indexManager = PlatformPeg.get()?.getEventIndexingManager(); - if (!indexManager) return; + private onSync = (state: SyncState, prevState: SyncState | null, data?: SyncStateData): void => { + if (state != SyncState.Syncing) return; - if (prevState === "PREPARED" && state === "SYNCING") { - // If our indexer is empty we're most likely running Element the - // first time with indexing support or running it with an - // initial sync. Add checkpoints to crawl our encrypted rooms. - const eventIndexWasEmpty = await indexManager.isEventIndexEmpty(); - if (eventIndexWasEmpty) await this.addInitialCheckpoints(); + const onSyncInner = async (): Promise => { + const indexManager = PlatformPeg.get()?.getEventIndexingManager(); + if (!indexManager) return; + + // If the index was empty when we first started up, add the initial checkpoints, to back-populate the index. + if (this.needsInitialCheckpoints) { + await this.addInitialCheckpoints(); + } + // Start the crawler if it's not already running. this.startCrawler(); - return; - } - if (prevState === "SYNCING" && state === "SYNCING") { - // A sync was done, presumably we queued up some live events, - // commit them now. + // Commit any queued up live events await indexManager.commitLiveEvents(); - } + }; + + onSyncInner().catch((e) => { + logErrorAndShowErrorDialog("Event indexer threw an unexpected error", e); + }); }; /* @@ -232,7 +260,7 @@ export default class EventIndex extends EventEmitter { if (!MatrixClientPeg.safeGet().isRoomEncrypted(state.roomId)) return; if (ev.getType() === EventType.RoomEncryption && !(await this.isRoomIndexed(state.roomId))) { - logger.log("EventIndex: Adding a checkpoint for a newly encrypted room", state.roomId); + this.logger.debug("Adding a checkpoint for a newly encrypted room", state.roomId); this.addRoomCheckpoint(state.roomId, true); } }; @@ -251,7 +279,7 @@ export default class EventIndex extends EventEmitter { try { await indexManager.deleteEvent(associatedId); } catch (e) { - logger.log("EventIndex: Error deleting event from index", e); + this.logger.warn("Error deleting event from index", e); } }; @@ -265,7 +293,7 @@ export default class EventIndex extends EventEmitter { if (!room) return; if (!MatrixClientPeg.safeGet().isRoomEncrypted(room.roomId)) return; - logger.log("EventIndex: Adding a checkpoint because of a limited timeline", room.roomId); + this.logger.debug("Adding a checkpoint because of a limited timeline", room.roomId); this.addRoomCheckpoint(room.roomId, false); }; @@ -394,12 +422,12 @@ export default class EventIndex extends EventEmitter { direction: Direction.Backward, }; - logger.log("EventIndex: Adding checkpoint", checkpoint); + this.logger.debug("Adding checkpoint", JSON.stringify(checkpoint)); try { await indexManager.addCrawlerCheckpoint(checkpoint); } catch (e) { - logger.log("EventIndex: Error adding new checkpoint for room", room.roomId, checkpoint, e); + this.logger.warn(`Error adding new checkpoint for room ${room.roomId}`, e); } this.crawlerCheckpoints.push(checkpoint); @@ -460,6 +488,7 @@ export default class EventIndex extends EventEmitter { continue; } + this.logger.debug(`Processing checkpoint ${JSON.stringify(checkpoint)}`); this.currentCheckpoint = checkpoint; this.emitNewCheckpoint(); @@ -481,15 +510,15 @@ export default class EventIndex extends EventEmitter { ); } catch (e) { if (e instanceof HTTPError && e.httpStatus === 403) { - logger.log( - "EventIndex: Removing checkpoint as we don't have ", + this.logger.debug( + "Removing checkpoint as we don't have ", "permissions to fetch messages from this room.", - checkpoint, + JSON.stringify(checkpoint), ); try { await indexManager.removeCrawlerCheckpoint(checkpoint); } catch (e) { - logger.log("EventIndex: Error removing checkpoint", checkpoint, e); + this.logger.warn(`Error removing checkpoint ${JSON.stringify(checkpoint)}:`, e); // We don't push the checkpoint here back, it will // hopefully be removed after a restart. But let us // ignore it for now as we don't want to hammer the @@ -498,7 +527,7 @@ export default class EventIndex extends EventEmitter { continue; } - logger.log("EventIndex: Error crawling using checkpoint:", checkpoint, ",", e); + this.logger.warn(`Error crawling using checkpoint ${JSON.stringify(checkpoint)}:`, e); this.crawlerCheckpoints.push(checkpoint); continue; } @@ -509,13 +538,13 @@ export default class EventIndex extends EventEmitter { } if (res.chunk.length === 0) { - logger.log("EventIndex: Done with the checkpoint", checkpoint); + this.logger.debug("Done with the checkpoint", JSON.stringify(checkpoint)); // We got to the start/end of our timeline, lets just // delete our checkpoint and go back to sleep. try { await indexManager.removeCrawlerCheckpoint(checkpoint); } catch (e) { - logger.log("EventIndex: Error removing checkpoint", checkpoint, e); + this.logger.warn("Error removing checkpoint", JSON.stringify(checkpoint), e); } continue; } @@ -593,7 +622,7 @@ export default class EventIndex extends EventEmitter { if (eventId) { await indexManager.deleteEvent(eventId); } else { - logger.warn("EventIndex: Redaction event doesn't contain a valid associated event id", ev); + this.logger.warn("Redaction event doesn't contain a valid associated event id", ev); } } @@ -602,10 +631,9 @@ export default class EventIndex extends EventEmitter { // We didn't get a valid new checkpoint from the server, nothing // to do here anymore. if (!newCheckpoint) { - logger.log( - "EventIndex: The server didn't return a valid ", - "new checkpoint, not continuing the crawl.", - checkpoint, + this.logger.debug( + "The server didn't return a valid new checkpoint, not continuing the crawl.", + JSON.stringify(checkpoint), ); continue; } @@ -615,31 +643,29 @@ export default class EventIndex extends EventEmitter { // Let us delete the checkpoint in that case, otherwise push // the new checkpoint to be used by the crawler. if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) { - logger.log( - "EventIndex: Checkpoint had already all events", + this.logger.debug( + "Checkpoint had already all events", "added, stopping the crawl", - checkpoint, + JSON.stringify(checkpoint), ); await indexManager.removeCrawlerCheckpoint(newCheckpoint); } else { if (eventsAlreadyAdded === true) { - logger.log( - "EventIndex: Checkpoint had already all events", + this.logger.debug( + "Checkpoint had already all events", "added, but continuing due to a full crawl", - checkpoint, + JSON.stringify(checkpoint), ); } this.crawlerCheckpoints.push(newCheckpoint); } } catch (e) { - logger.log("EventIndex: Error during a crawl", e); + this.logger.warn("Error during a crawl", e); // An error occurred, put the checkpoint back so we // can retry. this.crawlerCheckpoints.push(checkpoint); } } - - this.crawler = null; } /** @@ -647,7 +673,14 @@ export default class EventIndex extends EventEmitter { */ public startCrawler(): void { if (this.crawler !== null) return; - this.crawlerFunc(); + this.logger.debug("Starting crawler"); + this.crawlerFunc() + .finally(() => { + this.crawler = null; + }) + .catch((e) => { + this.logger.error("Error in crawler function", e); + }); } /** @@ -655,6 +688,7 @@ export default class EventIndex extends EventEmitter { */ public stopCrawler(): void { if (this.crawler === null) return; + this.logger.debug("Stopping crawler"); this.crawler.cancel(); } @@ -732,7 +766,7 @@ export default class EventIndex extends EventEmitter { try { events = await indexManager.loadFileEvents(loadArgs); } catch (e) { - logger.log("EventIndex: Error getting file events", e); + this.logger.debug("Error getting file events", e); return []; } @@ -842,11 +876,8 @@ export default class EventIndex extends EventEmitter { ret = true; } - logger.log( - "EventIndex: Populating file panel with", - matrixEvents.length, - "events and setting the pagination token to", - paginationToken, + this.logger.debug( + `Populating file panel with ${matrixEvents.length} events and setting the pagination token to ${paginationToken}`, ); timeline.setPaginationToken(paginationToken, EventTimeline.BACKWARDS); @@ -961,7 +992,10 @@ export default class EventIndex extends EventEmitter { } public crawlingRooms(): { + /** The rooms that we are currently crawling. */ crawlingRooms: Set; + + /** All the encrypted rooms known by the MatrixClient. */ totalRooms: Set; } { const totalRooms = new Set(); diff --git a/test/unit-tests/indexing/EventIndex-test.ts b/test/unit-tests/indexing/EventIndex-test.ts new file mode 100644 index 00000000000..3e33beaf3b9 --- /dev/null +++ b/test/unit-tests/indexing/EventIndex-test.ts @@ -0,0 +1,186 @@ +/* +Copyright 2025 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { type Mocked } from "jest-mock"; +import { + Direction, + type MatrixClient, + type IEvent, + MatrixEvent, + type Room, + ClientEvent, + SyncState, +} from "matrix-js-sdk/src/matrix"; + +import EventIndex from "../../../src/indexing/EventIndex.ts"; +import { emitPromise, getMockClientWithEventEmitter, mockClientMethodsRooms, mockPlatformPeg } from "../../test-utils"; +import type BaseEventIndexManager from "../../../src/indexing/BaseEventIndexManager.ts"; +import { type ICrawlerCheckpoint } from "../../../src/indexing/BaseEventIndexManager.ts"; +import SettingsStore from "../../../src/settings/SettingsStore.ts"; + +afterEach(() => { + jest.restoreAllMocks(); +}); + +describe("EventIndex", () => { + it("crawls through the loaded checkpoints", async () => { + const mockIndexingManager = { + loadCheckpoints: jest.fn(), + removeCrawlerCheckpoint: jest.fn(), + isEventIndexEmpty: jest.fn().mockResolvedValue(false), + } as any as Mocked; + mockPlatformPeg({ getEventIndexingManager: () => mockIndexingManager }); + + const room1 = { roomId: "!room1:id" } as any as Room; + const room2 = { roomId: "!room2:id" } as any as Room; + const mockClient = getMockClientWithEventEmitter({ + getEventMapper: () => (obj: Partial) => new MatrixEvent(obj), + createMessagesRequest: jest.fn(), + ...mockClientMethodsRooms([room1, room2]), + }); + + jest.spyOn(SettingsStore, "getValueAt").mockImplementation((_level, settingName): any => { + if (settingName === "crawlerSleepTime") return 0; + return undefined; + }); + + mockIndexingManager.loadCheckpoints.mockResolvedValue([ + { roomId: "!room1:id", token: "token1", direction: Direction.Backward } as ICrawlerCheckpoint, + { roomId: "!room2:id", token: "token2", direction: Direction.Forward } as ICrawlerCheckpoint, + ]); + + const indexer = new EventIndex(); + await indexer.init(); + let changedCheckpointPromise = emitPromise(indexer, "changedCheckpoint") as Promise; + + indexer.startCrawler(); + + // Mock out the /messags request, and wait for the crawler to hit the first room + const mock1 = mockCreateMessagesRequest(mockClient); + let changedCheckpoint = await changedCheckpointPromise; + expect(changedCheckpoint.roomId).toEqual("!room1:id"); + + await mock1.called; + expect(mockClient.createMessagesRequest).toHaveBeenCalledWith("!room1:id", "token1", 100, "b"); + + // Continue, and wait for the crawler to hit the second room + changedCheckpointPromise = emitPromise(indexer, "changedCheckpoint") as Promise; + mock1.resolve({ chunk: [] }); + changedCheckpoint = await changedCheckpointPromise; + expect(changedCheckpoint.roomId).toEqual("!room2:id"); + + // Mock out the /messages request again, and wait for it to be called + const mock2 = mockCreateMessagesRequest(mockClient); + await mock2.called; + expect(mockClient.createMessagesRequest).toHaveBeenCalledWith("!room2:id", "token2", 100, "f"); + }); + + it("adds checkpoints for the encrypted rooms after the first sync", async () => { + const mockIndexingManager = { + loadCheckpoints: jest.fn().mockResolvedValue([]), + isEventIndexEmpty: jest.fn().mockResolvedValue(true), + addCrawlerCheckpoint: jest.fn(), + removeCrawlerCheckpoint: jest.fn(), + commitLiveEvents: jest.fn(), + } as any as Mocked; + mockPlatformPeg({ getEventIndexingManager: () => mockIndexingManager }); + + const room1 = { + roomId: "!room1:id", + getLiveTimeline: () => ({ + getPaginationToken: () => "token1", + }), + } as any as Room; + const room2 = { + roomId: "!room2:id", + getLiveTimeline: () => ({ + getPaginationToken: () => "token2", + }), + } as any as Room; + const mockCrypto = { + isEncryptionEnabledInRoom: jest.fn().mockResolvedValue(true), + }; + const mockClient = getMockClientWithEventEmitter({ + getEventMapper: () => (obj: Partial) => new MatrixEvent(obj), + createMessagesRequest: jest.fn(), + getCrypto: () => mockCrypto as any, + ...mockClientMethodsRooms([room1, room2]), + }); + + const commitLiveEventsCalled = Promise.withResolvers(); + mockIndexingManager.commitLiveEvents.mockImplementation(async () => { + commitLiveEventsCalled.resolve(); + }); + + const indexer = new EventIndex(); + await indexer.init(); + + // During the first sync, some events are added to the index, meaning that `isEventIndexEmpty` will now be false. + mockIndexingManager.isEventIndexEmpty.mockResolvedValue(false); + + // The first sync completes: + mockClient.emit(ClientEvent.Sync, SyncState.Syncing, null, {}); + + // Wait for `commitLiveEvents` to be called, by which time the checkpoints should have been added. + await commitLiveEventsCalled.promise; + expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledTimes(4); + expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({ + roomId: "!room1:id", + token: "token1", + direction: Direction.Backward, + fullCrawl: true, + }); + expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({ + roomId: "!room1:id", + token: "token1", + direction: Direction.Forward, + }); + expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({ + roomId: "!room2:id", + token: "token2", + direction: Direction.Backward, + fullCrawl: true, + }); + expect(mockIndexingManager.addCrawlerCheckpoint).toHaveBeenCalledWith({ + roomId: "!room2:id", + token: "token2", + direction: Direction.Forward, + }); + }); +}); + +/** + * Mock out the `createMessagesRequest` method on the client, with an implementation that will block until a resolver is called. + * + * @returns An object with the following properties: + * * `called`: A promise that resolves when `createMessagesRequest` is called. + * * `resolve`: A function that can be called to allow `createMessagesRequest` to complete. + */ +function mockCreateMessagesRequest(mockClient: Mocked): { + called: Promise; + resolve: (result: any) => void; +} { + const messagesCalledPromise = Promise.withResolvers(); + const messagesResultPromise = Promise.withResolvers(); + mockClient.createMessagesRequest.mockImplementationOnce(() => { + messagesCalledPromise.resolve(); + return messagesResultPromise.promise as any; + }); + return { + called: messagesCalledPromise.promise, + resolve: messagesResultPromise.resolve, + }; +}