diff --git a/.changeset/wise-panthers-collect.md b/.changeset/wise-panthers-collect.md new file mode 100644 index 000000000..c1f9ff261 --- /dev/null +++ b/.changeset/wise-panthers-collect.md @@ -0,0 +1,5 @@ +--- +'backend': patch +--- + +handle subgraph errors diff --git a/modules/actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc.ts b/modules/actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc.ts index 5f45351bd..8630c7faa 100644 --- a/modules/actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc.ts +++ b/modules/actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc.ts @@ -17,6 +17,7 @@ export const syncBptBalancesFromRpc = async ( client: ViemClient, chain: Chain, syncCategory: 'BPT_BALANCES_V2' | 'BPT_BALANCES_V3' | 'BPT_BALANCES_COW_AMM', + syncStore = true, // Set false when syncing should be repeated, eg when passing subset of poolIds just for new pools ) => { // Must have poolIds to sync if (poolIds.length === 0) { @@ -27,7 +28,11 @@ export const syncBptBalancesFromRpc = async ( const endBlock = await client.getBlockNumber().then(Number); // Get the balances synced block from the DB - const startBlock = await getLastSyncedBlock(chain, syncCategory); + let startBlock = await getLastSyncedBlock(chain, syncCategory); + if (!syncStore) { + // try to get all events for these poolIds + startBlock = endBlock - config[chain].rpcMaxBlockRange + 5; + } // Don't use RPC when balances weren't synced at all if (startBlock === 0) { @@ -108,7 +113,7 @@ export const syncBptBalancesFromRpc = async ( console.log(`syncBptBalancesFromRpc ${syncCategory} on ${chain} got ${poolShares.length} poolShares`); - const operations = balancesToDb(poolShares, endBlock, syncCategory); + const operations = balancesToDb(poolShares, endBlock, syncStore ? syncCategory : undefined); try { await prisma.$transaction(operations); } catch (e: any) { diff --git a/modules/controllers/cow-amm-controller.ts b/modules/controllers/cow-amm-controller.ts index 7fe50ca29..9599803db 100644 --- a/modules/controllers/cow-amm-controller.ts +++ b/modules/controllers/cow-amm-controller.ts @@ -10,6 +10,8 @@ import { syncBptBalancesFromSubgraph } from '../actions/user/bpt-balances/helper import { getLastSyncedBlock, upsertLastSyncedBlock } from '../actions/last-synced-block'; import { updateLifetimeValues } from '../actions/pool/update-liftetime-values'; import { syncTokenPairs } from '../actions/pool/v3/sync-tokenpairs'; +import { handleSubgraphErrors } from './error-handling'; +import { syncBptBalancesFromRpc } from '../actions/user/bpt-balances/helpers/sync-bpt-balances-from-rpc'; export function CowAmmController(tracer?: any) { const getSubgraphClient = (chain: Chain) => { @@ -104,7 +106,11 @@ export function CowAmmController(tracer?: any) { await updateSurplusAPRs(chain, ids); // Sync balances for the pools const newIds = ids.filter((id) => !existingIds.includes(id)); - await syncBptBalancesFromSubgraph(newIds, subgraphClient, chain); + if (useSubgraph) { + await syncBptBalancesFromSubgraph(newIds, subgraphClient, chain); + } else { + await syncBptBalancesFromRpc(newIds, viemClient, chain, 'BPT_BALANCES_COW_AMM', false); + } await upsertLastSyncedBlock(chain, PrismaLastBlockSyncedCategory.COW_AMM_POOLS, toBlock); @@ -112,7 +118,7 @@ export function CowAmmController(tracer?: any) { }, async syncSnapshots(chain: Chain) { const subgraphClient = getSubgraphClient(chain); - const ids = await syncSnapshots(subgraphClient, 'SNAPSHOTS_COW_AMM', chain); + const ids = await handleSubgraphErrors(() => syncSnapshots(subgraphClient, 'SNAPSHOTS_COW_AMM', chain)); // update lifetime values based on snapshots await updateLifetimeValues(chain, undefined, 'COW_AMM'); return ids; @@ -127,12 +133,12 @@ export function CowAmmController(tracer?: any) { }, async syncJoinExits(chain: Chain) { const subgraphClient = getSubgraphClient(chain); - const entries = await syncJoinExits(subgraphClient, chain); + const entries = await handleSubgraphErrors(() => syncJoinExits(subgraphClient, chain)); return entries; }, async syncSwaps(chain: Chain) { const subgraphClient = getSubgraphClient(chain); - const swaps = await syncSwaps(subgraphClient, chain); + const swaps = await handleSubgraphErrors(() => syncSwaps(subgraphClient, chain)); const poolIds = swaps .map((event) => event.poolId) .filter((value, index, self) => self.indexOf(value) === index); diff --git a/modules/controllers/error-handling.ts b/modules/controllers/error-handling.ts new file mode 100644 index 000000000..6dfb362cd --- /dev/null +++ b/modules/controllers/error-handling.ts @@ -0,0 +1,14 @@ +export async function handleSubgraphErrors(operation: () => Promise): Promise { + try { + return await operation(); + } catch (error: any) { + if ( + error.message.includes('Too many requests') || + error.message.includes('bad indexers') || + error.message.includes('Bad Gateway') + ) { + return []; + } + throw error; + } +} diff --git a/modules/controllers/event-controller.ts b/modules/controllers/event-controller.ts index ad3e4f133..d501bd123 100644 --- a/modules/controllers/event-controller.ts +++ b/modules/controllers/event-controller.ts @@ -9,6 +9,7 @@ import { syncSwaps as syncSwapsV3 } from '../actions/pool/v3/sync-swaps'; import { Chain } from '@prisma/client'; import { updateVolumeAndFees } from '../actions/pool/update-volume-and-fees'; import { getVaultSubgraphClient } from '../sources/subgraphs/balancer-v3-vault'; +import { handleSubgraphErrors } from './error-handling'; export function EventController() { return { @@ -23,8 +24,7 @@ export function EventController() { } const subgraphClient = new BalancerSubgraphService(balancer, chain); - const entries = await syncJoinExitsV2(subgraphClient, chain); - return entries; + return handleSubgraphErrors(() => syncJoinExitsV2(subgraphClient, chain)); }, async syncSwapsUpdateVolumeAndFeesV2(chain: Chain) { const { @@ -37,11 +37,12 @@ export function EventController() { } const subgraphClient = getV2SubgraphClient(balancer, chain); - const poolsWithNewSwaps = await syncSwapsV2(subgraphClient, chain); - await syncSwapsForLast48Hours(subgraphClient, chain); - await updateVolumeAndFees(chain, poolsWithNewSwaps); - - return poolsWithNewSwaps; + return handleSubgraphErrors(async () => { + const poolsWithNewSwaps = await syncSwapsV2(subgraphClient, chain); + await syncSwapsForLast48Hours(subgraphClient, chain); + await updateVolumeAndFees(chain, poolsWithNewSwaps); + return poolsWithNewSwaps; + }); }, async syncJoinExitsV3(chain: Chain) { const { @@ -54,8 +55,7 @@ export function EventController() { } const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain); - const entries = await syncJoinExitsV3(vaultSubgraphClient, chain); - return entries; + return handleSubgraphErrors(() => syncJoinExitsV3(vaultSubgraphClient, chain)); }, async syncSwapsV3(chain: Chain) { const { @@ -68,8 +68,7 @@ export function EventController() { } const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain); - const entries = await syncSwapsV3(vaultSubgraphClient, chain); - return entries; + return handleSubgraphErrors(() => syncSwapsV3(vaultSubgraphClient, chain)); }, async syncSwapsUpdateVolumeAndFeesV3(chain: Chain) { const { @@ -83,9 +82,11 @@ export function EventController() { const vaultSubgraphClient = getVaultSubgraphClient(balancerV3, chain); - const poolsWithNewSwaps = await syncSwapsV3(vaultSubgraphClient, chain); - await updateVolumeAndFees(chain, poolsWithNewSwaps); - return poolsWithNewSwaps; + return handleSubgraphErrors(async () => { + const poolsWithNewSwaps = await syncSwapsV3(vaultSubgraphClient, chain); + await updateVolumeAndFees(chain, poolsWithNewSwaps); + return poolsWithNewSwaps; + }); }, }; } diff --git a/modules/controllers/pool-controller.ts b/modules/controllers/pool-controller.ts index eb67a58a1..3ff7f8e4d 100644 --- a/modules/controllers/pool-controller.ts +++ b/modules/controllers/pool-controller.ts @@ -25,6 +25,7 @@ import { syncHookReviews } from '../actions/content/sync-hook-reviews'; import { HookData } from '../sources/transformers'; import { syncErc4626Tokens } from '../actions/token/sync-erc4626-tokens'; import { syncRateProviderReviews } from '../actions/content/sync-rate-provider-reviews'; +import { handleSubgraphErrors } from './error-handling'; export function PoolController(tracer?: any) { return { @@ -32,7 +33,7 @@ export function PoolController(tracer?: any) { const subgraphUrl = config[chain].subgraphs.balancer; const subgraphService = getV2SubgraphClient(subgraphUrl, chain); - return addPoolsV2(subgraphService, chain); + return handleSubgraphErrors(() => addPoolsV2(subgraphService, chain)); }, async syncOnchainDataForAllPoolsV2(chain: Chain) { diff --git a/tasks/index.ts b/tasks/index.ts index 5a6a36992..5b3673df5 100644 --- a/tasks/index.ts +++ b/tasks/index.ts @@ -61,7 +61,11 @@ async function run(job: string = process.argv[2], chainId: string = process.argv } else if (job === 'sync-join-exits-v3') { return EventController().syncJoinExitsV3(chain); } else if (job === 'sync-join-exits-v2') { - return EventController().syncJoinExitsV2(chain); + // make a loop to hit the rate limit + while (true) { + await EventController().syncJoinExitsV2(chain); + } + // return EventController().syncJoinExitsV2(chain); } else if (job === 'sync-swaps-v2') { return EventController().syncSwapsUpdateVolumeAndFeesV2(chain); } else if (job === 'sync-snapshots-v2') {