From 24816a534d71166b93f078e880a60df3a655d2f9 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 20 Sep 2024 17:06:07 +0200 Subject: [PATCH 1/9] add `cancel-stuck-transactions` --- bin/spark-evaluate.js | 60 +++++- index.js | 6 +- lib/cancel-stuck-txs.js | 184 ------------------ lib/evaluate.js | 13 +- .../017.do.cancel-stuck-transactions.sql | 7 + package-lock.json | 9 + package.json | 1 + test/cancel-stuck-txs.test.js | 25 --- 8 files changed, 85 insertions(+), 220 deletions(-) delete mode 100644 lib/cancel-stuck-txs.js create mode 100644 migrations/017.do.cancel-stuck-transactions.sql delete mode 100644 test/cancel-stuck-txs.test.js diff --git a/bin/spark-evaluate.js b/bin/spark-evaluate.js index ee55a29f..a6720b92 100644 --- a/bin/spark-evaluate.js +++ b/bin/spark-evaluate.js @@ -10,13 +10,18 @@ import { fetchMeasurements } from '../lib/preprocess.js' import { migrateWithPgConfig } from '../lib/migrate.js' import pg from 'pg' import { createMeridianContract } from '../lib/ie-contract.js' -import { startCancelStuckTxs } from '../lib/cancel-stuck-txs.js' +import { CancelStuckTransactions } from 'cancel-stuck-transactions' +import ms from 'ms' +import timers from 'node:timers/promises' const { SENTRY_ENVIRONMENT = 'development', WALLET_SEED } = process.env +const ROUND_LENGTH_MS = ms('20 minutes') +const CHECK_STUCK_TXS_DELAY = ms('1 minute') + Sentry.init({ dsn: 'https://d0651617f9690c7e9421ab9c949d67a4@o1408530.ingest.sentry.io/4505906069766144', environment: SENTRY_ENVIRONMENT, @@ -43,6 +48,40 @@ const createPgClient = async () => { return pgClient } +const pgClient = await createPgClient() +const cancelStuckTransactions = new CancelStuckTransactions({ + async store ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { + await pgClient.query(` + INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) + VALUES ($1, $2, $3, $4, $5) + `, + [hash, timestamp, from, maxPriorityFeePerGas, nonce] + ) + }, + async list () { + const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) + return rows.map(row => ({ + hash: row.hash, + timestamp: row.timestamp, + from: row.from, + maxPriorityFeePerGas: row.max_priority_fee_per_gas, + nonce: row.nonce + })) + }, + async resolve (hash) { + await pgClient.query( + 'DELETE FROM transactions_pending WHERE hash = $1', + [hash] + ) + }, + log (str) { + console.log(str) + }, + sendTransaction (tx) { + return signer.sendTransaction(tx) + } +}) + await Promise.all([ startEvaluate({ ieContract, @@ -51,11 +90,18 @@ await Promise.all([ fetchRoundDetails, recordTelemetry, createPgClient, - logger: console + logger: console, + cancelStuckTransactions }), - startCancelStuckTxs({ - walletDelegatedAddress, - address: wallet.address, - signer - }) + (async () => { + while (true) { + try { + await cancelStuckTransactions.olderThan(2 * ROUND_LENGTH_MS) + } catch (err) { + console.error(err) + Sentry.captureException(err) + } + await timers.setTimeout(CHECK_STUCK_TXS_DELAY) + } + })() ]) diff --git a/index.js b/index.js index 1b7e90f1..b671429e 100644 --- a/index.js +++ b/index.js @@ -18,7 +18,8 @@ export const startEvaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + cancelStuckTransactions }) => { assert(typeof createPgClient === 'function', 'createPgClient must be a function') @@ -114,7 +115,8 @@ export const startEvaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + cancelStuckTransactions }).catch(err => { console.error('CANNOT EVALUATE ROUND %s:', evaluatedRoundIndex, err) Sentry.captureException(err, { diff --git a/lib/cancel-stuck-txs.js b/lib/cancel-stuck-txs.js deleted file mode 100644 index d3bb4ee7..00000000 --- a/lib/cancel-stuck-txs.js +++ /dev/null @@ -1,184 +0,0 @@ -import pRetry from 'p-retry' -import assert from 'node:assert' -import * as Sentry from '@sentry/node' -import ms from 'ms' -import timers from 'node:timers/promises' - -const ROUND_LENGTH_MS = ms('20 minutes') -const CHECK_STUCK_TXS_DELAY = ms('1 minute') - -/** - * @param {string} f4addr - * @returns 1000 oldest messages - */ -export async function getMessagesInMempool (f4addr) { - const res = await pRetry( - async () => { - const res = await fetch(`https://filfox.info/api/v1/message/mempool/filtered-list?address=${f4addr}&pageSize=1000`) - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - return res - }, - { - async onFailedAttempt (error) { - console.warn(error) - console.warn('Filfox request failed. Retrying...') - } - } - ) - - /** @type {{ - messages: { - cid: string; - from: string; - to: string; - nonce: number; - value: string; - gasLimit: number; - gasFeeCap: string; - gasPremium: string; - method: string; - methodNumber: number; - evmMethod: string; - createTimestamp: number; - }[]; - }} - */ - const { messages } = /** @type {any} */(await res.json()) - return messages -} - -/** - * @returns {Promise<{ - cid: string; - height: number; - timestamp: number; - gasLimit: number; - gasFeeCap: string; - gasPremium: string; - method: string; - methodNumber: number; - receipt: { - exitCode: number; - return: string; - gasUsed: number; - }, - size: number; - error: string; - baseFee: string; - fee: { - baseFeeBurn: string; - overEstimationBurn: string; - minerPenalty: string; - minerTip: string; - refund: string; - }, -}>} -*/ -export async function getRecentSendMessage () { - let res = await fetch('https://filfox.info/api/v1/message/list?method=Send') - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - const body = /** @type {any} */(await res.json()) - assert(body.messages.length > 0, '/message/list returned an empty list') - const sendMsg = body.messages.find(m => m.method === 'Send') - assert(!!sendMsg, 'No Send message found in the recent committed messages') - const cid = sendMsg.cid - - res = await fetch(`https://filfox.info/api/v1/message/${cid}`) - if (!res.ok) { - throw new Error(`Filfox request failed with ${res.status}: ${(await res.text()).trimEnd()}`) - } - - return /** @type {any} */(await res.json()) -} - -const cancelTx = async ({ tx, address, signer, recentGasUsed, recentGasFeeCap }) => { - const oldGasPremium = Number(tx.gasPremium) - const nonce = tx.nonce - // Increase by 25% + 1 attoFIL (easier: 25.2%) - const maxPriorityFeePerGas = Math.ceil(oldGasPremium * 1.252) - - console.log(`Replacing ${tx.cid}...`) - try { - const replacementTx = await signer.sendTransaction({ - to: address, - value: 0, - nonce, - gasLimit: Math.ceil(recentGasUsed * 1.1), - // priorityFee cannot be more than maxFee - maxFeePerGas: Math.max(recentGasFeeCap, maxPriorityFeePerGas), - maxPriorityFeePerGas - }) - console.log( - `Waiting for receipt of replacement ${replacementTx.hash} for ${tx.cid}` - ) - await replacementTx.wait() - console.log(`Replaced ${tx.cid} with ${replacementTx.hash}`) - } catch (err) { - console.error(err) - Sentry.captureException(err) - } -} - -const cancelStuckTxs = async ({ walletDelegatedAddress, address, signer }) => { - console.log('Checking for stuck transactions...') - - const messages = await getMessagesInMempool(walletDelegatedAddress) - const txsToCancel = messages.filter(m => { - return m.createTimestamp * 1000 < Date.now() - (2 * ROUND_LENGTH_MS) - }) - if (txsToCancel.length === 0) { - console.log('No transactions to cancel') - return false - } - - console.log('Transactions to cancel:') - for (const tx of txsToCancel) { - console.log( - '-', - tx.cid, - `(age ${ms(Date.now() - (tx.createTimestamp * 1000))})` - ) - } - - const recentSendMessage = await getRecentSendMessage() - console.log('Calculating gas fees from the recent Send message %s (created at %s)', - recentSendMessage.cid, - new Date(recentSendMessage.timestamp * 1000).toISOString() - ) - - const recentGasUsed = recentSendMessage.receipt.gasUsed - const recentGasFeeCap = Number(recentSendMessage.gasFeeCap) - - await Promise.all(txsToCancel.map(async tx => { - await cancelTx({ tx, address, signer, recentGasUsed, recentGasFeeCap }) - })) - - return true -} - -export const startCancelStuckTxs = async ({ - walletDelegatedAddress, - address, - signer -}) => { - while (true) { - let didCancelTxs = false - try { - didCancelTxs = await cancelStuckTxs({ - walletDelegatedAddress, - address, - signer - }) - } catch (err) { - console.error(err) - Sentry.captureException(err) - } - if (!didCancelTxs) { - await timers.setTimeout(CHECK_STUCK_TXS_DELAY) - } - } -} diff --git a/lib/evaluate.js b/lib/evaluate.js index b5d817d9..f013c252 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -55,6 +55,7 @@ export const createSetScoresBuckets = participants => { * @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry * @param {import('./typings.js').CreatePgClient} [args.createPgClient] * @param {Pick} args.logger + * @param {import('cancel-stuck-transactions').CancelStuckTransactions} args.cancelStuckTransactions */ export const evaluate = async ({ round, @@ -64,7 +65,8 @@ export const evaluate = async ({ fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + cancelStuckTransactions }) => { requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE @@ -147,9 +149,10 @@ export const evaluate = async ({ const start = Date.now() const buckets = createSetScoresBuckets(participants) + let tx for (const [bucketIndex, bucket] of Object.entries(buckets)) { try { - const tx = await ieContractWithSigner.setScores( + tx = await ieContractWithSigner.setScores( roundIndex, bucket.participants, bucket.scores @@ -161,6 +164,7 @@ export const evaluate = async ({ Number(bucketIndex) + 1, buckets.length ) + await cancelStuckTransactions.pending(tx) } catch (err) { logger.error('CANNOT SUBMIT SCORES FOR ROUND %s (CALL %s/%s): %s', roundIndex, @@ -179,6 +183,11 @@ export const evaluate = async ({ } const setScoresDuration = Date.now() - start + ;(async () => { + await tx.wait() + cancelStuckTransactions.successful(tx) + })().catch(console.error) + recordTelemetry('evaluate', point => { point.intField('round_index', roundIndex) point.intField('total_participants', Object.keys(participants).length) diff --git a/migrations/017.do.cancel-stuck-transactions.sql b/migrations/017.do.cancel-stuck-transactions.sql new file mode 100644 index 00000000..ec07785c --- /dev/null +++ b/migrations/017.do.cancel-stuck-transactions.sql @@ -0,0 +1,7 @@ +CREATE TABLE transactions_pending ( + hash TEXT NOT NULL PRIMARY KEY, + timestamp TEXT NOT NULL, + from TEXT NOT NULL, + max_priority_fee_per_gas BIGINT NOT NULL, + nonce NUMBER NOT NULL +); diff --git a/package-lock.json b/package-lock.json index fcd7a441..826ced58 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.30.0", "@web3-storage/car-block-validator": "^1.2.0", + "cancel-stuck-transactions": "^0.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", @@ -1854,6 +1855,14 @@ "node": ">=6" } }, + "node_modules/cancel-stuck-transactions": { + "version": "0.0.0", + "resolved": "https://registry.npmjs.org/cancel-stuck-transactions/-/cancel-stuck-transactions-0.0.0.tgz", + "integrity": "sha512-KyCngMGH8EvVn551Q7zt0q1ykKbpIiET7atXG7It58jzdBrAYKyieOvJqNSERwPZTxDHoSSm2efFZ5VBE4EzWg==", + "dependencies": { + "ms": "^2.1.3" + } + }, "node_modules/cborg": { "version": "4.0.5", "resolved": "https://registry.npmjs.org/cborg/-/cborg-4.0.5.tgz", diff --git a/package.json b/package.json index 7de30010..6d3a4174 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.30.0", "@web3-storage/car-block-validator": "^1.2.0", + "cancel-stuck-transactions": "^0.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", diff --git a/test/cancel-stuck-txs.test.js b/test/cancel-stuck-txs.test.js deleted file mode 100644 index 56f2a11c..00000000 --- a/test/cancel-stuck-txs.test.js +++ /dev/null @@ -1,25 +0,0 @@ -import assert from 'node:assert' -import { - getMessagesInMempool, - getRecentSendMessage -} from '../lib/cancel-stuck-txs.js' - -describe('cancel stuck transactions', function () { - this.timeout(10_000) - - describe('getMessagesInMempool(addr)', () => { - it('should return messages in mempool', async () => { - const messages = await getMessagesInMempool( - '0x000000000000000000000000000000000000dEaD' - ) - assert(Array.isArray(messages)) - }) - }) - describe('getRecentSendMessage()', () => { - it('should return a recent send message', async () => { - const recentSendMessage = await getRecentSendMessage() - assert(recentSendMessage.receipt.gasUsed) - assert(recentSendMessage.gasFeeCap) - }) - }) -}) From 030c5fb718273deb695bbb9ef9968bbdf4ae9912 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 11:46:10 +0200 Subject: [PATCH 2/9] update to latest version --- bin/spark-evaluate.js | 66 +++++++++++++++++++++++++------------------ index.js | 4 +-- lib/evaluate.js | 8 +++--- package-lock.json | 8 +++--- package.json | 2 +- 5 files changed, 50 insertions(+), 38 deletions(-) diff --git a/bin/spark-evaluate.js b/bin/spark-evaluate.js index a6720b92..acc83b26 100644 --- a/bin/spark-evaluate.js +++ b/bin/spark-evaluate.js @@ -10,7 +10,7 @@ import { fetchMeasurements } from '../lib/preprocess.js' import { migrateWithPgConfig } from '../lib/migrate.js' import pg from 'pg' import { createMeridianContract } from '../lib/ie-contract.js' -import { CancelStuckTransactions } from 'cancel-stuck-transactions' +import { StuckTransactionsCanceller } from 'cancel-stuck-transactions' import ms from 'ms' import timers from 'node:timers/promises' @@ -49,30 +49,32 @@ const createPgClient = async () => { } const pgClient = await createPgClient() -const cancelStuckTransactions = new CancelStuckTransactions({ - async store ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { - await pgClient.query(` - INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) - VALUES ($1, $2, $3, $4, $5) - `, - [hash, timestamp, from, maxPriorityFeePerGas, nonce] - ) - }, - async list () { - const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) - return rows.map(row => ({ - hash: row.hash, - timestamp: row.timestamp, - from: row.from, - maxPriorityFeePerGas: row.max_priority_fee_per_gas, - nonce: row.nonce - })) - }, - async resolve (hash) { - await pgClient.query( - 'DELETE FROM transactions_pending WHERE hash = $1', - [hash] - ) +const stuckTransactionsCanceller = new StuckTransactionsCanceller({ + store: { + async set ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { + await pgClient.query(` + INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) + VALUES ($1, $2, $3, $4, $5) + `, + [hash, timestamp, from, maxPriorityFeePerGas, nonce] + ) + }, + async list () { + const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) + return rows.map(row => ({ + hash: row.hash, + timestamp: row.timestamp, + from: row.from, + maxPriorityFeePerGas: row.max_priority_fee_per_gas, + nonce: row.nonce + })) + }, + async remove (hash) { + await pgClient.query( + 'DELETE FROM transactions_pending WHERE hash = $1', + [hash] + ) + }, }, log (str) { console.log(str) @@ -91,12 +93,22 @@ await Promise.all([ recordTelemetry, createPgClient, logger: console, - cancelStuckTransactions + stuckTransactionsCanceller }), (async () => { while (true) { try { - await cancelStuckTransactions.olderThan(2 * ROUND_LENGTH_MS) + const res = await stuckTransactionsCanceller.cancelOlderThan( + 2 * ROUND_LENGTH_MS + ) + if (res !== undefined) { + for (const { status, reason } of res) { + if (status === 'rejected') { + console.error('Failed to cancel transaction:', reason) + Sentry.captureException(reason) + } + } + } } catch (err) { console.error(err) Sentry.captureException(err) diff --git a/index.js b/index.js index b671429e..53ac51a2 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,7 @@ export const startEvaluate = async ({ recordTelemetry, createPgClient, logger, - cancelStuckTransactions + stuckTransactionsCanceller }) => { assert(typeof createPgClient === 'function', 'createPgClient must be a function') @@ -116,7 +116,7 @@ export const startEvaluate = async ({ recordTelemetry, createPgClient, logger, - cancelStuckTransactions + stuckTransactionsCanceller }).catch(err => { console.error('CANNOT EVALUATE ROUND %s:', evaluatedRoundIndex, err) Sentry.captureException(err, { diff --git a/lib/evaluate.js b/lib/evaluate.js index f013c252..06715ec1 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -55,7 +55,7 @@ export const createSetScoresBuckets = participants => { * @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry * @param {import('./typings.js').CreatePgClient} [args.createPgClient] * @param {Pick} args.logger - * @param {import('cancel-stuck-transactions').CancelStuckTransactions} args.cancelStuckTransactions + * @param {import('cancel-stuck-transactions').StuckTransactionsCanceller} args.stuckTransactionsCanceller */ export const evaluate = async ({ round, @@ -66,7 +66,7 @@ export const evaluate = async ({ recordTelemetry, createPgClient, logger, - cancelStuckTransactions + stuckTransactionsCanceller }) => { requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE @@ -164,7 +164,7 @@ export const evaluate = async ({ Number(bucketIndex) + 1, buckets.length ) - await cancelStuckTransactions.pending(tx) + await stuckTransactionsCanceller.addPending(tx) } catch (err) { logger.error('CANNOT SUBMIT SCORES FOR ROUND %s (CALL %s/%s): %s', roundIndex, @@ -185,7 +185,7 @@ export const evaluate = async ({ ;(async () => { await tx.wait() - cancelStuckTransactions.successful(tx) + stuckTransactionsCanceller.removeConfirmed(tx) })().catch(console.error) recordTelemetry('evaluate', point => { diff --git a/package-lock.json b/package-lock.json index 826ced58..95af7a7f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.30.0", "@web3-storage/car-block-validator": "^1.2.0", - "cancel-stuck-transactions": "^0.0.0", + "cancel-stuck-transactions": "^1.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", @@ -1856,9 +1856,9 @@ } }, "node_modules/cancel-stuck-transactions": { - "version": "0.0.0", - "resolved": "https://registry.npmjs.org/cancel-stuck-transactions/-/cancel-stuck-transactions-0.0.0.tgz", - "integrity": "sha512-KyCngMGH8EvVn551Q7zt0q1ykKbpIiET7atXG7It58jzdBrAYKyieOvJqNSERwPZTxDHoSSm2efFZ5VBE4EzWg==", + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/cancel-stuck-transactions/-/cancel-stuck-transactions-1.0.0.tgz", + "integrity": "sha512-MkJggKgtzCHKKvYshCxwfMv18ELapS48p+uXhi27frokV714/QugQ/hzcsvxjjYQ30IAdp9CIzUMtUekBT/LHw==", "dependencies": { "ms": "^2.1.3" } diff --git a/package.json b/package.json index 6d3a4174..15406155 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,7 @@ "@ipld/car": "^5.3.2", "@sentry/node": "^8.30.0", "@web3-storage/car-block-validator": "^1.2.0", - "cancel-stuck-transactions": "^0.0.0", + "cancel-stuck-transactions": "^1.0.0", "debug": "^4.3.7", "drand-client": "^1.2.6", "ethers": "^6.13.2", From 85089c3619dbf625e3aac0217a951953e43693e8 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 11:58:12 +0200 Subject: [PATCH 3/9] refactor --- bin/spark-evaluate.js | 64 ++----------------------------- lib/cancel-stuck-transactions.js | 66 ++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 61 deletions(-) create mode 100644 lib/cancel-stuck-transactions.js diff --git a/bin/spark-evaluate.js b/bin/spark-evaluate.js index acc83b26..df9d558d 100644 --- a/bin/spark-evaluate.js +++ b/bin/spark-evaluate.js @@ -10,18 +10,13 @@ import { fetchMeasurements } from '../lib/preprocess.js' import { migrateWithPgConfig } from '../lib/migrate.js' import pg from 'pg' import { createMeridianContract } from '../lib/ie-contract.js' -import { StuckTransactionsCanceller } from 'cancel-stuck-transactions' -import ms from 'ms' -import timers from 'node:timers/promises' +import { createStuckTransactionsCanceller, startCancelStuckTransactions } from '../lib/cancel-stuck-transactions.js' const { SENTRY_ENVIRONMENT = 'development', WALLET_SEED } = process.env -const ROUND_LENGTH_MS = ms('20 minutes') -const CHECK_STUCK_TXS_DELAY = ms('1 minute') - Sentry.init({ dsn: 'https://d0651617f9690c7e9421ab9c949d67a4@o1408530.ingest.sentry.io/4505906069766144', environment: SENTRY_ENVIRONMENT, @@ -49,40 +44,7 @@ const createPgClient = async () => { } const pgClient = await createPgClient() -const stuckTransactionsCanceller = new StuckTransactionsCanceller({ - store: { - async set ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { - await pgClient.query(` - INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) - VALUES ($1, $2, $3, $4, $5) - `, - [hash, timestamp, from, maxPriorityFeePerGas, nonce] - ) - }, - async list () { - const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) - return rows.map(row => ({ - hash: row.hash, - timestamp: row.timestamp, - from: row.from, - maxPriorityFeePerGas: row.max_priority_fee_per_gas, - nonce: row.nonce - })) - }, - async remove (hash) { - await pgClient.query( - 'DELETE FROM transactions_pending WHERE hash = $1', - [hash] - ) - }, - }, - log (str) { - console.log(str) - }, - sendTransaction (tx) { - return signer.sendTransaction(tx) - } -}) +const stuckTransactionsCanceller = createStuckTransactionsCanceller({ pgClient, signer }) await Promise.all([ startEvaluate({ @@ -95,25 +57,5 @@ await Promise.all([ logger: console, stuckTransactionsCanceller }), - (async () => { - while (true) { - try { - const res = await stuckTransactionsCanceller.cancelOlderThan( - 2 * ROUND_LENGTH_MS - ) - if (res !== undefined) { - for (const { status, reason } of res) { - if (status === 'rejected') { - console.error('Failed to cancel transaction:', reason) - Sentry.captureException(reason) - } - } - } - } catch (err) { - console.error(err) - Sentry.captureException(err) - } - await timers.setTimeout(CHECK_STUCK_TXS_DELAY) - } - })() + startCancelStuckTransactions(stuckTransactionsCanceller) ]) diff --git a/lib/cancel-stuck-transactions.js b/lib/cancel-stuck-transactions.js new file mode 100644 index 00000000..bd31119a --- /dev/null +++ b/lib/cancel-stuck-transactions.js @@ -0,0 +1,66 @@ +import * as Sentry from '@sentry/node' +import { StuckTransactionsCanceller } from 'cancel-stuck-transactions' +import ms from 'ms' +import timers from 'node:timers/promises' + +const ROUND_LENGTH_MS = ms('20 minutes') +const CHECK_STUCK_TXS_DELAY = ms('1 minute') + +export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { + return new StuckTransactionsCanceller({ + store: { + async set ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { + await pgClient.query(` + INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) + VALUES ($1, $2, $3, $4, $5) + `, + [hash, timestamp, from, maxPriorityFeePerGas, nonce] + ) + }, + async list () { + const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) + return rows.map(row => ({ + hash: row.hash, + timestamp: row.timestamp, + from: row.from, + maxPriorityFeePerGas: row.max_priority_fee_per_gas, + nonce: row.nonce + })) + }, + async remove (hash) { + await pgClient.query( + 'DELETE FROM transactions_pending WHERE hash = $1', + [hash] + ) + }, + }, + log (str) { + console.log(str) + }, + sendTransaction (tx) { + return signer.sendTransaction(tx) + } + }) +} + +export const startCancelStuckTransactions = async stuckTransactionsCanceller => { + while (true) { + try { + const res = await stuckTransactionsCanceller.cancelOlderThan( + 2 * ROUND_LENGTH_MS + ) + if (res !== undefined) { + for (const { status, reason } of res) { + if (status === 'rejected') { + console.error('Failed to cancel transaction:', reason) + Sentry.captureException(reason) + } + } + } + } catch (err) { + console.error(err) + Sentry.captureException(err) + } + await timers.setTimeout(CHECK_STUCK_TXS_DELAY) + } +} From d75ed0506481b88f6d2d7cf120b913891696fc14 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 12:27:04 +0200 Subject: [PATCH 4/9] fix tests --- bin/dry-run.js | 3 +- bin/fetch-recent-miner-measurements.js | 3 +- lib/cancel-stuck-transactions.js | 6 +-- test/evaluate.js | 59 +++++++++++++++++++++++--- 4 files changed, 60 insertions(+), 11 deletions(-) diff --git a/bin/dry-run.js b/bin/dry-run.js index ec181b79..9c8dbab4 100644 --- a/bin/dry-run.js +++ b/bin/dry-run.js @@ -142,7 +142,8 @@ const { ignoredErrors } = await evaluate({ ieContractWithSigner, logger: console, recordTelemetry, - createPgClient + createPgClient, + stuckTransactionsCanceller: { addPending: () => {} } }) console.log('Duration: %sms', Date.now() - started) diff --git a/bin/fetch-recent-miner-measurements.js b/bin/fetch-recent-miner-measurements.js index 58c8ffad..90173f74 100644 --- a/bin/fetch-recent-miner-measurements.js +++ b/bin/fetch-recent-miner-measurements.js @@ -216,7 +216,8 @@ async function processRound (roundIndex, measurementCids, resultCounts) { fetchRoundDetails, recordTelemetry, logger: { log: debug, error: debug }, - ieContractWithSigner + ieContractWithSigner, + stuckTransactionsCanceller: { addPending: () => {} } }) for (const m of round.measurements) { diff --git a/lib/cancel-stuck-transactions.js b/lib/cancel-stuck-transactions.js index bd31119a..d270316a 100644 --- a/lib/cancel-stuck-transactions.js +++ b/lib/cancel-stuck-transactions.js @@ -14,11 +14,11 @@ export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) VALUES ($1, $2, $3, $4, $5) `, - [hash, timestamp, from, maxPriorityFeePerGas, nonce] + [hash, timestamp, from, maxPriorityFeePerGas, nonce] ) }, async list () { - const { rows } = await pgClient.query(`SELECT * FROM transactions_pending`) + const { rows } = await pgClient.query('SELECT * FROM transactions_pending') return rows.map(row => ({ hash: row.hash, timestamp: row.timestamp, @@ -32,7 +32,7 @@ export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { 'DELETE FROM transactions_pending WHERE hash = $1', [hash] ) - }, + } }, log (str) { console.log(str) diff --git a/test/evaluate.js b/test/evaluate.js index 69393867..235ac26b 100644 --- a/test/evaluate.js +++ b/test/evaluate.js @@ -65,6 +65,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } await evaluate({ round, roundIndex: 0n, @@ -73,7 +79,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -83,6 +90,7 @@ describe('evaluate', async function () { setScoresCalls[0].scores[0], 1000000000000000n ) + assert.strictEqual(addPendingCalls.length, 1) const point = telemetry.find(p => p.name === 'evaluate') assert(!!point, @@ -111,6 +119,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -121,7 +135,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -131,6 +146,7 @@ describe('evaluate', async function () { assert.deepStrictEqual(setScoresCalls[0].scores, [ MAX_SCORE ]) + assert.strictEqual(addPendingCalls.length, 1) let point = telemetry.find(p => p.name === 'evaluate') assert(!!point, @@ -162,6 +178,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -172,7 +194,8 @@ describe('evaluate', async function () { fetchRoundDetails, recordTelemetry, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].roundIndex, 0n) @@ -182,6 +205,7 @@ describe('evaluate', async function () { assert.deepStrictEqual(setScoresCalls[0].scores, [ MAX_SCORE ]) + assert.strictEqual(addPendingCalls.length, 1) }) it('calculates reward shares', async () => { const round = new RoundData(0n) @@ -207,6 +231,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -217,7 +247,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) assert.deepStrictEqual(setScoresCalls[0].participantAddresses.sort(), ['0x123', '0x234']) @@ -230,6 +261,7 @@ describe('evaluate', async function () { `Sum of scores not close enough. Got ${sum}` ) assert.strictEqual(setScoresCalls[0].scores.length, 2) + assert.strictEqual(addPendingCalls.length, 1) const point = assertRecordedTelemetryPoint(telemetry, 'evaluate') assert(!!point, @@ -254,6 +286,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } const logger = { log: debug, error: debug } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) @@ -265,7 +303,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) assert.strictEqual(setScoresCalls.length, 1) const { scores, participantAddresses } = setScoresCalls[0] @@ -273,6 +312,7 @@ describe('evaluate', async function () { const sum = scores.reduce((prev, score) => (prev ?? 0) + score) assert.strictEqual(sum, MAX_SCORE) assert.strictEqual(participantAddresses.sort()[0], '0x000000000000000000000000000000000000dEaD') + assert.strictEqual(addPendingCalls.length, 1) }) it('reports retrieval stats - honest & all', async () => { @@ -299,6 +339,12 @@ describe('evaluate', async function () { return '0x811765AccE724cD5582984cb35f5dE02d587CA12' } } + const addPendingCalls = [] + const stuckTransactionsCanceller = { + async addPending (tx) { + addPendingCalls.push(tx) + } + } /** @returns {Promise} */ const fetchRoundDetails = async () => ({ ...SPARK_ROUND_DETAILS, retrievalTasks: [VALID_TASK] }) await evaluate({ @@ -309,7 +355,8 @@ describe('evaluate', async function () { recordTelemetry, fetchRoundDetails, createPgClient, - logger + logger, + stuckTransactionsCanceller }) let point = telemetry.find(p => p.name === 'retrieval_stats_honest') From 7dbbad8fa714f4d8813dcfd151e5c7b850899838 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 12:30:45 +0200 Subject: [PATCH 5/9] fix reserved word --- lib/cancel-stuck-transactions.js | 4 ++-- migrations/017.do.cancel-stuck-transactions.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/cancel-stuck-transactions.js b/lib/cancel-stuck-transactions.js index d270316a..af8e4444 100644 --- a/lib/cancel-stuck-transactions.js +++ b/lib/cancel-stuck-transactions.js @@ -11,7 +11,7 @@ export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { store: { async set ({ hash, timestamp, from, maxPriorityFeePerGas, nonce }) { await pgClient.query(` - INSERT INTO transactions_pending (hash, timestamp, from, max_priority_fee_per_gas, nonce) + INSERT INTO transactions_pending (hash, timestamp, from_address, max_priority_fee_per_gas, nonce) VALUES ($1, $2, $3, $4, $5) `, [hash, timestamp, from, maxPriorityFeePerGas, nonce] @@ -22,7 +22,7 @@ export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { return rows.map(row => ({ hash: row.hash, timestamp: row.timestamp, - from: row.from, + from: row.from_address, maxPriorityFeePerGas: row.max_priority_fee_per_gas, nonce: row.nonce })) diff --git a/migrations/017.do.cancel-stuck-transactions.sql b/migrations/017.do.cancel-stuck-transactions.sql index ec07785c..6cede180 100644 --- a/migrations/017.do.cancel-stuck-transactions.sql +++ b/migrations/017.do.cancel-stuck-transactions.sql @@ -1,7 +1,7 @@ CREATE TABLE transactions_pending ( hash TEXT NOT NULL PRIMARY KEY, timestamp TEXT NOT NULL, - from TEXT NOT NULL, + from_address TEXT NOT NULL, max_priority_fee_per_gas BIGINT NOT NULL, nonce NUMBER NOT NULL ); From c3c62806c9a5dc69f2bc5a83763ba6954ef7e29f Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 12:32:31 +0200 Subject: [PATCH 6/9] number -> int --- migrations/017.do.cancel-stuck-transactions.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/017.do.cancel-stuck-transactions.sql b/migrations/017.do.cancel-stuck-transactions.sql index 6cede180..447141d8 100644 --- a/migrations/017.do.cancel-stuck-transactions.sql +++ b/migrations/017.do.cancel-stuck-transactions.sql @@ -3,5 +3,5 @@ CREATE TABLE transactions_pending ( timestamp TEXT NOT NULL, from_address TEXT NOT NULL, max_priority_fee_per_gas BIGINT NOT NULL, - nonce NUMBER NOT NULL + nonce INT NOT NULL ); From aba9c707868cad79b3a9eeda7b8f45f9f5410210 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 13:55:16 +0200 Subject: [PATCH 7/9] refactor `list()` implementation --- lib/cancel-stuck-transactions.js | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/cancel-stuck-transactions.js b/lib/cancel-stuck-transactions.js index af8e4444..66125c7d 100644 --- a/lib/cancel-stuck-transactions.js +++ b/lib/cancel-stuck-transactions.js @@ -18,14 +18,16 @@ export const createStuckTransactionsCanceller = ({ pgClient, signer }) => { ) }, async list () { - const { rows } = await pgClient.query('SELECT * FROM transactions_pending') - return rows.map(row => ({ - hash: row.hash, - timestamp: row.timestamp, - from: row.from_address, - maxPriorityFeePerGas: row.max_priority_fee_per_gas, - nonce: row.nonce - })) + const { rows } = await pgClient.query(` + SELECT + hash, + timestamp, + from_address as "from", + max_priority_fee_per_gas as "maxPriorityFeePerGas", + nonce + FROM transactions_pending + `) + return rows }, async remove (hash) { await pgClient.query( From 303719c165559edc1dde192d597ae98fbfce25c1 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 13:56:35 +0200 Subject: [PATCH 8/9] fix awaiting --- lib/evaluate.js | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/evaluate.js b/lib/evaluate.js index 06715ec1..2431a2b5 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -149,10 +149,9 @@ export const evaluate = async ({ const start = Date.now() const buckets = createSetScoresBuckets(participants) - let tx for (const [bucketIndex, bucket] of Object.entries(buckets)) { try { - tx = await ieContractWithSigner.setScores( + const tx = await ieContractWithSigner.setScores( roundIndex, bucket.participants, bucket.scores @@ -165,6 +164,10 @@ export const evaluate = async ({ buckets.length ) await stuckTransactionsCanceller.addPending(tx) + ;(async () => { + await tx.wait() + stuckTransactionsCanceller.removeConfirmed(tx) + })().catch(console.error) } catch (err) { logger.error('CANNOT SUBMIT SCORES FOR ROUND %s (CALL %s/%s): %s', roundIndex, @@ -183,11 +186,6 @@ export const evaluate = async ({ } const setScoresDuration = Date.now() - start - ;(async () => { - await tx.wait() - stuckTransactionsCanceller.removeConfirmed(tx) - })().catch(console.error) - recordTelemetry('evaluate', point => { point.intField('round_index', roundIndex) point.intField('total_participants', Object.keys(participants).length) From 72c03e48df9c855ca883290b6405e2ee8371b5cf Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 24 Sep 2024 13:57:22 +0200 Subject: [PATCH 9/9] text -> timestamptz --- migrations/017.do.cancel-stuck-transactions.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/017.do.cancel-stuck-transactions.sql b/migrations/017.do.cancel-stuck-transactions.sql index 447141d8..aba9abfa 100644 --- a/migrations/017.do.cancel-stuck-transactions.sql +++ b/migrations/017.do.cancel-stuck-transactions.sql @@ -1,6 +1,6 @@ CREATE TABLE transactions_pending ( hash TEXT NOT NULL PRIMARY KEY, - timestamp TEXT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, from_address TEXT NOT NULL, max_priority_fee_per_gas BIGINT NOT NULL, nonce INT NOT NULL