diff --git a/src/clients/SpokePoolClient.ts b/src/clients/SpokePoolClient.ts index 54370c47c1..77ea07b48c 100644 --- a/src/clients/SpokePoolClient.ts +++ b/src/clients/SpokePoolClient.ts @@ -179,29 +179,20 @@ export function SpokeListener>( const at = "SpokePoolClient#removeEvent"; const eventIdx = this._queryableEventNames().indexOf(event.event); const pendingEvents = this.#pendingEvents[eventIdx]; - const { event: eventName, blockNumber, blockHash, transactionHash, transactionIndex, logIndex } = event; + const { event: eventName, blockNumber, blockHash, transactionHash } = event; // First check for removal from any pending events. - const pendingEventIdx = pendingEvents.findIndex( - (pending) => - pending.logIndex === logIndex && - pending.transactionIndex === transactionIndex && - pending.transactionHash === transactionHash && - pending.blockHash === blockHash - ); - - let handled = false; - if (pendingEventIdx !== -1) { - // Drop the relevant event. - pendingEvents.splice(pendingEventIdx, 1); - handled = true; - - this.logger.debug({ - at: "SpokePoolClient#removeEvent", - message: `Removed 1 pre-ingested ${this.#chain} ${eventName} event for block ${blockNumber}.`, - event, - }); - } + let removed = false; + let removedEventIdx: number; + do { + removedEventIdx = pendingEvents.findIndex(({ blockHash }) => blockHash === event.blockHash); + if (removedEventIdx !== -1) { + removed = true; + pendingEvents.splice(removedEventIdx, 1); // Drop the relevant event. + const message = `Removed pre-ingested ${this.#chain} ${eventName} event for block ${blockNumber}.`; + this.logger.debug({ at, message, event }); + } + } while (removedEventIdx !== -1); // Back out any events that were previously ingested via update(). This is best-effort and may help to save the // relayer from filling a deposit where it must wait for additional deposit confirmations. Note that this is @@ -211,12 +202,13 @@ export function SpokeListener>( const { depositId } = event.args; assert(isDefined(depositId)); - const result = Object.entries(this.depositHashes).find(([, deposit]) => deposit.txnRef === transactionHash); - if (isDefined(result)) { - const [depositKey, deposit] = result; - delete this.depositHashes[depositKey]; - handled = true; - this.logger.warn({ at, message: `Removed 1 ${this.#chain} ${eventName} event.`, deposit }); + const deposits = Object.entries(this.depositHashes).filter(([, deposit]) => deposit.txnRef === transactionHash); + if (deposits.length > 0) { + deposits.forEach(([depositKey, deposit]) => { + delete this.depositHashes[depositKey]; + removed = true; + this.logger.warn({ at, message: `Removed 1 ${this.#chain} ${eventName} event.`, deposit }); + }); } else { this.logger.warn({ at, @@ -232,12 +224,12 @@ export function SpokeListener>( } else { // Retaining any remaining event types should be non-critical for relayer operation. They may // produce sub-optimal decisions, but should not affect the correctness of relayer operation. - handled = true; + removed = true; const message = `Detected re-org affecting pre-ingested ${this.#chain} ${eventName} events. Ignoring.`; this.logger.debug({ at, message, transactionHash, blockHash }); } - return handled; + return removed; } async _update(eventsToQuery: string[]): Promise { diff --git a/test/Relayer.IndexedSpokePoolClient.ts b/test/Relayer.IndexedSpokePoolClient.ts index 35bea38827..aca2b70e92 100644 --- a/test/Relayer.IndexedSpokePoolClient.ts +++ b/test/Relayer.IndexedSpokePoolClient.ts @@ -6,7 +6,7 @@ import { constants, utils as sdkUtils } from "@across-protocol/sdk"; import { SpokeListener, EVMSpokePoolClient } from "../src/clients"; import { Log } from "../src/interfaces"; import { EventSearchConfig, sortEventsAscending, sortEventsAscendingInPlace } from "../src/utils"; -import { SpokePoolClientMessage } from "../src/clients/SpokePoolClient"; +import { ListenerMessage } from "../src/libexec/types"; import { assertPromiseError, createSpyLogger, deploySpokePoolWithToken, expect, randomAddress } from "./utils"; type Constructor = new (...args: any[]) => T; @@ -35,30 +35,31 @@ describe("IndexedSpokePoolClient: Update", async function () { const chainId = CHAIN_IDs.MAINNET; const randomNumber = (ceil = 1_000_000) => Math.floor(Math.random() * ceil); - const makeHash = () => ethersUtils.id(randomNumber().toString()); - const makeTopic = () => ethersUtils.id(randomNumber().toString()).slice(0, 40); + const makeHash = (seed?: number) => ethersUtils.id((seed ?? randomNumber()).toString()); + const makeTopic = (signature?: string) => ethersUtils.id(signature ?? randomNumber().toString()).slice(0, 40); let blockNumber = 100; - const generateEvent = (event: string, blockNumber: number): Log => { + const generateEvent = (event: string, blockNumber: number, transactionHash?: string): Log => { + transactionHash ??= makeHash(); return { blockNumber, transactionIndex: randomNumber(100), logIndex: randomNumber(100), - transactionHash: makeHash(), + transactionHash, removed: false, address: randomAddress(), data: ethersUtils.id(`EventManager-random-txndata-${randomNumber()}`), topics: [makeTopic()], args: [] as Result, - blockHash: makeHash(), + blockHash: makeHash(blockNumber), event, }; }; let depositId: number; - const getDepositEvent = (blockNumber: number): Log => { - const event = generateEvent("FundsDeposited", blockNumber); + const getDepositEvent = (blockNumber: number, transactionHash?: string): Log => { + const event = generateEvent("FundsDeposited", blockNumber, transactionHash); const args = { depositor: randomAddress(), recipient: randomAddress(), @@ -98,7 +99,7 @@ describe("IndexedSpokePoolClient: Update", async function () { * instance is immediately accessible and the message handler callback is called directly. */ const postEvents = (blockNumber: number, currentTime: number, events: Log[]): void => { - const message: SpokePoolClientMessage = { + const message: ListenerMessage = { blockNumber, currentTime, nEvents: events.length, @@ -179,6 +180,24 @@ describe("IndexedSpokePoolClient: Update", async function () { expect(droppedDeposit).to.not.exist; }); + it("Correctly removes all pending events for a given blockHash", async function () { + const events: Log[] = []; + for (let i = 0; i < 25; ++i) { + events.push(getDepositEvent(blockNumber)); + } + + postEvents(blockNumber, currentTime, events); + + const [droppedEvent] = events; + removeEvent(droppedEvent); + + await spokePoolClient.update(); + + // All events should have been dropped before SpokePoolClient update. + const deposits = spokePoolClient.getDeposits(); + expect(deposits.length).to.equal(0); + }); + it("Correctly removes pending events that are dropped after update", async function () { const events: Log[] = []; for (let i = 0; i < 25; ++i) { @@ -202,6 +221,29 @@ describe("IndexedSpokePoolClient: Update", async function () { expect(droppedDeposit).to.not.exist; }); + it("Correctly removes multiple deposits within the same transactionHash after update", async function () { + const events: Log[] = []; + const deposit = getDepositEvent(blockNumber); + const { transactionHash } = deposit; + for (let i = 0; i < 25; ++i) { + events.push(getDepositEvent(blockNumber, transactionHash)); + } + sortEventsAscendingInPlace(events); + + postEvents(blockNumber, currentTime, events); + await spokePoolClient.update(); + + let deposits = spokePoolClient.getDeposits(); + expect(deposits.length).to.equal(events.length); + + // Drop a single event and verify that all related events w/ same transactionHash are also dropped. + removeEvent(deposit); + + await spokePoolClient.update(); + deposits = spokePoolClient.getDeposits(); + expect(deposits.length).to.equal(0); + }); + it("Throws on post-ingested dropped EnabledDepositRoute events", async function () { const events: Log[] = []; for (let i = 0; i < 25; ++i) {