diff --git a/bin/dry-run.js b/bin/dry-run.js index ec181b79..9c8dbab4 100644 --- a/bin/dry-run.js +++ b/bin/dry-run.js @@ -142,7 +142,8 @@ const { ignoredErrors } = await evaluate({ ieContractWithSigner, logger: console, recordTelemetry, - createPgClient + createPgClient, + stuckTransactionsCanceller: { addPending: () => {} } }) console.log('Duration: %sms', Date.now() - started) diff --git a/bin/fetch-recent-miner-measurements.js b/bin/fetch-recent-miner-measurements.js index 58c8ffad..90173f74 100644 --- a/bin/fetch-recent-miner-measurements.js +++ b/bin/fetch-recent-miner-measurements.js @@ -216,7 +216,8 @@ async function processRound (roundIndex, measurementCids, resultCounts) { fetchRoundDetails, recordTelemetry, logger: { log: debug, error: debug }, - ieContractWithSigner + ieContractWithSigner, + stuckTransactionsCanceller: { addPending: () => {} } }) for (const m of round.measurements) { diff --git a/bin/spark-evaluate.js b/bin/spark-evaluate.js index 2ef695d4..70319bfd 100644 --- a/bin/spark-evaluate.js +++ b/bin/spark-evaluate.js @@ -10,7 +10,7 @@ import { fetchMeasurements } from '../lib/preprocess.js' import { migrateWithPgConfig } from '../lib/migrate.js' import pg from 'pg' import { createMeridianContract } from '../lib/ie-contract.js' -import { startCancelStuckTxs } from '../lib/cancel-stuck-txs.js' +import { createStuckTransactionsCanceller, startCancelStuckTransactions } from '../lib/cancel-stuck-transactions.js' const { SENTRY_ENVIRONMENT = 'development', @@ -43,6 +43,13 @@ const createPgClient = async () => { return pgClient } +const pgClient = await createPgClient() +const stuckTransactionsCanceller = createStuckTransactionsCanceller({ + pgClient, + // Bypass NonceManager as we need to cancel transactions with the same nonce + signer: wallet +}) + await Promise.all([ startEvaluate({ ieContract, @@ -51,12 +58,8 @@ await Promise.all([ fetchRoundDetails, recordTelemetry, createPgClient, - logger: console + logger: console, + stuckTransactionsCanceller }), - startCancelStuckTxs({ - walletDelegatedAddress, - address: wallet.address, - // Bypass NonceManager as we need to cancel transactions with the same nonce - signer: wallet - }) + startCancelStuckTransactions(stuckTransactionsCanceller) ]) diff --git a/index.js b/index.js index ffc653ae..14f8dbbf 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,8 @@ export const startEvaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) => { assert(typeof createPgClient === 'function', 'createPgClient must be a function') @@ -115,7 +116,8 @@ export const startEvaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }).catch(err => { console.error('CANNOT EVALUATE ROUND %s:', evaluatedRoundIndex, err) Sentry.captureException(err, { diff --git a/lib/cancel-stuck-transactions.js b/lib/cancel-stuck-transactions.js new file mode 100644 index 00000000..66125c7d --- /dev/null +++ b/lib/cancel-stuck-transactions.js @@ -0,0 +1,68 @@ +import * as Sentry from '@sentry/node' +import { StuckTransactionsCanceller } from 'cancel-stuck-transactions' +import ms from 'ms' +import timers from 'node:timers/promises' + +const ROUND_LENGTH_MS = ms('20 minutes') +const CHECK_STUCK_TXS_DELAY = ms('1 minute') + +export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { + return new StuckTransactionsCanceller({ + store: { + async set ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { + await pgClient.query(` + INSERT INTO transactions_pending (hash, timestamp, from_address, max_priority_fee_per_gas, nonce) + VALUES ($1, $2, $3, $4, $5) + `, + [hash, timestamp, from, maxPriorityFeePerGas, nonce] + ) + }, + async list () { + const { rows } = await pgClient.query(` + SELECT + hash, + timestamp, + from_address as "from", + max_priority_fee_per_gas as "maxPriorityFeePerGas", + nonce + FROM transactions_pending + `) + return rows + }, + async remove (hash) { + await pgClient.query( + 'DELETE FROM transactions_pending WHERE hash = $1', + [hash] + ) + } + }, + log (str) { + console.log(str) + }, + sendTransaction (tx) { + return signer.sendTransaction(tx) + } + }) +} + +export const startCancelStuckTransactions = async stuckTransactionsCanceller => { + while (true) { + try { + const res = await stuckTransactionsCanceller.cancelOlderThan( + 2 * ROUND_LENGTH_MS + ) + if (res !== undefined) { + for (const { status, reason } of res) { + if (status === 'rejected') { + console.error('Failed to cancel transaction:', reason) + Sentry.captureException(reason) + } + } + } + } catch (err) { + console.error(err) + Sentry.captureException(err) + } + await timers.setTimeout(CHECK_STUCK_TXS_DELAY) + } +} diff --git a/lib/cancel-stuck-txs.js b/lib/cancel-stuck-txs.js deleted file mode 100644 index d3bb4ee7..00000000 --- a/lib/cancel-stuck-txs.js +++ /dev/null @@ -1,184 +0,0 @@ -import pRetry from 'p-retry' -import assert from 'node:assert' -import * as Sentry from '@sentry/node' -import ms from 'ms' -import timers from 'node:timers/promises' - -const ROUND_LENGTH_MS = ms('20 minutes') -const CHECK_STUCK_TXS_DELAY = ms('1 minute') - -/** - * @param {string} f4addr - * @returns 1000 oldest messages - */ -export async function getMessagesInMempool (f4addr) { - const res = await pRetry( - async () => { - const res = await fetch(`https://filfox.info/api/v1/message/mempool/filtered-list?address=${f4addr}&pageSize=1000`) - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - return res - }, - { - async onFailedAttempt (error) { - console.warn(error) - console.warn('Filfox request failed. Retrying...') - } - } - ) - - /** @type {{ - messages: { - cid: string; - from: string; - to: string; - nonce: number; - value: string; - gasLimit: number; - gasFeeCap: string; - gasPremium: string; - method: string; - methodNumber: number; - evmMethod: string; - createTimestamp: number; - }[]; - }} - */ - const { messages } = /** @type {any} */(await res.json()) - return messages -} - -/** - * @returns {Promise<{ - cid: string; - height: number; - timestamp: number; - gasLimit: number; - gasFeeCap: string; - gasPremium: string; - method: string; - methodNumber: number; - receipt: { - exitCode: number; - return: string; - gasUsed: number; - }, - size: number; - error: string; - baseFee: string; - fee: { - baseFeeBurn: string; - overEstimationBurn: string; - minerPenalty: string; - minerTip: string; - refund: string; - }, -}>} -*/ -export async function getRecentSendMessage () { - let res = await fetch('https://filfox.info/api/v1/message/list?method=Send') - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - const body = /** @type {any} */(await res.json()) - assert(body.messages.length > 0, '/message/list returned an empty list') - const sendMsg = body.messages.find(m => m.method === 'Send') - assert(!!sendMsg, 'No Send message found in the recent committed messages') - const cid = sendMsg.cid - - res = await fetch(`https://filfox.info/api/v1/message/${cid}`) - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - - return /** @type {any} */(await res.json()) -} - -const cancelTx = async ({ tx, address, signer, recentGasUsed, recentGasFeeCap }) => { - const oldGasPremium = Number(tx.gasPremium) - const nonce = tx.nonce - // Increase by 25% + 1 attoFIL (easier: 25.2%) - const maxPriorityFeePerGas = Math.ceil(oldGasPremium * 1.252) - - console.log(`Replacing ${tx.cid}...`) - try { - const replacementTx = await signer.sendTransaction({ - to: address, - value: 0, - nonce, - gasLimit: Math.ceil(recentGasUsed * 1.1), - // priorityFee cannot be more than maxFee - maxFeePerGas: Math.max(recentGasFeeCap, maxPriorityFeePerGas), - maxPriorityFeePerGas - }) - console.log( - `Waiting for receipt of replacement ${replacementTx.hash} for ${tx.cid}` - ) - await replacementTx.wait() - console.log(`Replaced ${tx.cid} with ${replacementTx.hash}`) - } catch (err) { - console.error(err) - Sentry.captureException(err) - } -} - -const cancelStuckTxs = async ({ walletDelegatedAddress, address, signer }) => { - console.log('Checking for stuck transactions...') - - const messages = await getMessagesInMempool(walletDelegatedAddress) - const txsToCancel = messages.filter(m => { - return m.createTimestamp * 1000 < Date.now() - (2 * ROUND_LENGTH_MS) - }) - if (txsToCancel.length === 0) { - console.log('No transactions to cancel') - return false - } - - console.log('Transactions to cancel:') - for (const tx of txsToCancel) { - console.log( - '-', - tx.cid, - `(age ${ms(Date.now() - (tx.createTimestamp * 1000))})` - ) - } - - const recentSendMessage = await getRecentSendMessage() - console.log('Calculating gas fees from the recent Send message %s (created at %s)', - recentSendMessage.cid, - new Date(recentSendMessage.timestamp * 1000).toISOString() - ) - - const recentGasUsed = recentSendMessage.receipt.gasUsed - const recentGasFeeCap = Number(recentSendMessage.gasFeeCap) - - await Promise.all(txsToCancel.map(async tx => { - await cancelTx({ tx, address, signer, recentGasUsed, recentGasFeeCap }) - })) - - return true -} - -export const startCancelStuckTxs = async ({ - walletDelegatedAddress, - address, - signer -}) => { - while (true) { - let didCancelTxs = false - try { - didCancelTxs = await cancelStuckTxs({ - walletDelegatedAddress, - address, - signer - }) - } catch (err) { - console.error(err) - Sentry.captureException(err) - } - if (!didCancelTxs) { - await timers.setTimeout(CHECK_STUCK_TXS_DELAY) - } - } -} diff --git a/lib/evaluate.js b/lib/evaluate.js index b5d817d9..2431a2b5 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -55,6 +55,7 @@ export const createSetScoresBuckets = participants => { * @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry * @param {import('./typings.js').CreatePgClient} [args.createPgClient] * @param {Pick} args.logger + * @param {import('cancel-stuck-transactions').StuckTransactionsCanceller} args.stuckTransactionsCanceller */ export const evaluate = async ({ round, @@ -64,7 +65,8 @@ export const evaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) => { requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE @@ -161,6 +163,11 @@ export const evaluate = async ({ Number(bucketIndex) + 1, buckets.length ) + await stuckTransactionsCanceller.addPending(tx) + ;(async () => { + await tx.wait() + stuckTransactionsCanceller.removeConfirmed(tx) + })().catch(console.error) } catch (err) { logger.error('CANNOT SUBMIT SCORES FOR ROUND %s (CALL %s/%s): %s', roundIndex, diff --git a/migrations/017.do.cancel-stuck-transactions.sql b/migrations/017.do.cancel-stuck-transactions.sql new file mode 100644 index 00000000..aba9abfa --- /dev/null +++ b/migrations/017.do.cancel-stuck-transactions.sql @@ -0,0 +1,7 @@ +CREATE TABLE transactions_pending ( + hash TEXT NOT NULL PRIMARY KEY, + timestamp TIMESTAMPTZ NOT NULL, + from_address TEXT NOT NULL, + max_priority_fee_per_gas BIGINT NOT NULL, + nonce INT NOT NULL +); diff --git a/package-lock.json b/package-lock.json index aab5a046..1f5921ef 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.31.0", "@web3-storage/car-block-validator": "^1.2.0", + "cancel-stuck-transactions": "^1.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", @@ -1869,6 +1870,14 @@ "node": ">=6" } }, + "node_modules/cancel-stuck-transactions": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/cancel-stuck-transactions/-/cancel-stuck-transactions-1.0.0.tgz", + "integrity": "sha512-MkJggKgtzCHKKvYshCxwfMv18ELapS48p+uXhi27frokV714/QugQ/hzcsvxjjYQ30IAdp9CIzUMtUekBT/LHw==", + "dependencies": { + "ms": "^2.1.3" + } + }, "node_modules/cborg": { "version": "4.0.5", "resolved": "https://registry.npmjs.org/cborg/-/cborg-4.0.5.tgz", diff --git a/package.json b/package.json index 0484a48b..f3f761af 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.31.0", "@web3-storage/car-block-validator": "^1.2.0", + "cancel-stuck-transactions": "^1.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", diff --git a/test/cancel-stuck-txs.test.js b/test/cancel-stuck-txs.test.js deleted file mode 100644 index 56f2a11c..00000000 --- a/test/cancel-stuck-txs.test.js +++ /dev/null @@ -1,25 +0,0 @@ -import assert from 'node:assert' -import { - getMessagesInMempool, - getRecentSendMessage -} from '../lib/cancel-stuck-txs.js' - -describe('cancel stuck transactions', function () { - this.timeout(10_000) - - describe('getMessagesInMempool(addr)', () => { - it('should return messages in mempool', async () => { - const messages = await getMessagesInMempool( - '0x000000000000000000000000000000000000dEaD' - ) - assert(Array.isArray(messages)) - }) - }) - describe('getRecentSendMessage()', () => { - it('should return a recent send message', async () => { - const recentSendMessage = await getRecentSendMessage() - assert(recentSendMessage.receipt.gasUsed) - assert(recentSendMessage.gasFeeCap) - }) - }) -}) diff --git a/test/evaluate.js b/test/evaluate.js index 69393867..235ac26b 100644 --- a/test/evaluate.js +++ b/test/evaluate.js @@ -65,6 +65,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } await evaluate({ round, roundIndex: 0n, @@ -73,7 +79,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -83,6 +90,7 @@ describe('evaluate', async function () { setScoresCalls[0].scores[0], 1000000000000000n ) + assert.strictEqual(addPendingCalls.length, 1) const point = telemetry.find(p => p.name === 'evaluate') assert(!!point, @@ -111,6 +119,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -121,7 +135,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -131,6 +146,7 @@ describe('evaluate', async function () { assert.deepStrictEqual(setScoresCalls[0].scores, [ MAX_SCORE ]) + assert.strictEqual(addPendingCalls.length, 1) let point = telemetry.find(p => p.name === 'evaluate') assert(!!point, @@ -162,6 +178,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -172,7 +194,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -182,6 +205,7 @@ describe('evaluate', async function () { assert.deepStrictEqual(setScoresCalls[0].scores, [ MAX_SCORE ]) + assert.strictEqual(addPendingCalls.length, 1) }) it('calculates reward shares', async () => { const round = new RoundData(0n) @@ -207,6 +231,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -217,7 +247,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].participantAddresses.sort(), ['0x123', '0x234']) @@ -230,6 +261,7 @@ describe('evaluate', async function () { `Sum of scores not close enough. Got ${sum}` ) assert.strictEqual(setScoresCalls[0].scores.length, 2) + assert.strictEqual(addPendingCalls.length, 1) const point = assertRecordedTelemetryPoint(telemetry, 'evaluate') assert(!!point, @@ -254,6 +286,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } const logger = { log: debug, error: debug } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) @@ -265,7 +303,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) const { scores, participantAddresses } = setScoresCalls[0] @@ -273,6 +312,7 @@ describe('evaluate', async function () { const sum = scores.reduce((prev, score) => (prev ?? 0) + score) assert.strictEqual(sum, MAX_SCORE) assert.strictEqual(participantAddresses.sort()[0], '0x000000000000000000000000000000000000dEaD') + assert.strictEqual(addPendingCalls.length, 1) }) it('reports retrieval stats - honest & all', async () => { @@ -299,6 +339,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -309,7 +355,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) let point = telemetry.find(p => p.name === 'retrieval_stats_honest')